"""
|
数据工具类
|
提供各种K线/涨停形态、技术指标和价格模式的计算方法
|
"""
|
import constant
|
from utils import tool
|
|
|
class KTickLineAnalyzer:
|
"""
|
TICK级K线分析工具类
|
"""
|
|
@classmethod
|
def calculate_upper_limit_price(cls, stock_code, pre_close):
|
"""
|
计算价格上限
|
@param stock_code: 证券代码
|
@param pre_close: 前一日收盘价
|
@return: 价格上限
|
"""
|
return round(tool.get_limit_up_rate(stock_code) * pre_close, 2)
|
|
@classmethod
|
def calculate_lower_limit_price(cls, stock_code, pre_close):
|
"""
|
计算价格下限
|
@param stock_code: 证券代码
|
@param pre_close: 前一日收盘价
|
@return: 价格下限
|
"""
|
# A股ST股票涨跌幅为5%,其他为10%
|
return round(tool.get_limit_down_rate(stock_code) * pre_close, 2)
|
|
@classmethod
|
def get_yesterday_close(cls, k_data):
|
"""
|
获取昨日收盘价
|
@param k_data: K线数据列表
|
@return: 昨日收盘价
|
"""
|
return k_data[0]['close']
|
|
@classmethod
|
def get_yesterday_high(cls, k_data):
|
"""
|
获取昨日最高价
|
@param k_data: K线数据列表
|
@return: 昨日最高价
|
"""
|
return k_data[0]['high']
|
|
@classmethod
|
def get_yesterday_amount(cls, k_data):
|
"""
|
获取昨日成交额
|
@param k_data: K线数据列表
|
@return: 昨日最高价
|
"""
|
return k_data[0]['amount']
|
|
@classmethod
|
def get_yesterday_low_price(cls, k_data):
|
"""
|
获取昨日最低价
|
@param k_data: K线数据列表
|
@return: 昨日最高价
|
"""
|
return k_data[0]['low']
|
|
@classmethod
|
def get_yesterday_open_price(cls, k_data):
|
"""
|
获取昨日开盘价
|
@param k_data: K线数据列表
|
@return: 昨日最高价
|
"""
|
return k_data[0]['open']
|
|
@classmethod
|
def get_recent_days_high(cls, k_data, days):
|
"""
|
获取近几个交易日的最高价
|
@param k_data: K线数据列表
|
@param days: 交易日数量
|
@return: 最高价
|
"""
|
return max([d['high'] for d in k_data[:days]])
|
|
@classmethod
|
def is_yesterday_limit_up(cls, k_data):
|
"""
|
判断昨日是否涨停
|
@param k_data: K线数据列表
|
@return: True/False
|
"""
|
return k_data[0]['close'] >= cls.calculate_upper_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"])
|
|
@classmethod
|
def is_yesterday_limit_down(cls, k_data):
|
"""
|
判断昨日是否跌停
|
@param k_data: K线数据列表
|
@return: True/False
|
"""
|
return k_data[0]['close'] <= cls.calculate_lower_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"])
|
|
@classmethod
|
def is_yesterday_exploded(cls, k_data):
|
"""
|
判断昨日是否炸板
|
@param k_data: K线数据列表
|
@return: True/False
|
"""
|
return k_data[0]['high'] >= cls.calculate_upper_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"]) > \
|
k_data[0]['close']
|
|
@classmethod
|
def get_yesterday_volume(cls, k_data):
|
"""
|
获取昨日成交量
|
@param k_data: K线数据列表
|
@return: 成交量
|
"""
|
return k_data[0]['volume']
|
|
@classmethod
|
def get_recent_days_max_volume(cls, k_data, days):
|
"""
|
获取近几个交易日的最高量及对应K线数据
|
@param k_data: K线数据列表
|
@param days: 交易日数量
|
@return: (最高量, 对应K线数据)
|
"""
|
max_volume_day = max(k_data[:days], key=lambda d: d['volume'])
|
return max_volume_day['volume'], max_volume_day
|
|
@classmethod
|
def get_recent_limit_up_count(cls, k_data, days):
|
"""
|
获取近几个交易日的涨停次数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 涨停次数
|
"""
|
return sum(
|
1 for d in k_data[:days] if d['close'] >= cls.calculate_upper_limit_price(d["sec_id"], d["pre_close"]))
|
|
@classmethod
|
def get_recent_exploded_count(cls, k_data, days):
|
"""
|
获取近几个交易日的炸板次数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 炸板次数
|
"""
|
return sum(1 for d in k_data[:days] if
|
d['high'] >= cls.calculate_upper_limit_price(d["sec_id"], d["pre_close"]) > d['close'])
|
|
@classmethod
|
def get_recent_limit_down_count(cls, k_data, days):
|
"""
|
获取近几个交易日的跌停次数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 跌停次数
|
"""
|
return sum(
|
1 for d in k_data[:days] if d['close'] <= cls.calculate_lower_limit_price(d["sec_id"], d["pre_close"]))
|
|
@classmethod
|
def get_recent_days_double_volume_date(cls, k_data, days):
|
"""
|
获取最近几个交易日的倍量日期
|
@param k_data: K线数据列表
|
@param days: 交易日数量
|
@return: 倍量的日期
|
"""
|
k_datas: list = k_data[:days]
|
k_datas.reverse()
|
for i in range(1, len(k_datas)):
|
latest_volume = k_datas[i - 1]["volume"]
|
if k_datas[i]["volume"] > 2 * latest_volume:
|
return k_datas[i]["bob"]
|
return None
|
|
@classmethod
|
def get_first_limit_up_avg_premium(cls, k_data, days):
|
"""
|
获取近几个交易日的首板涨停平均溢价率
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 平均溢价率
|
"""
|
premiums = []
|
for i in range(days):
|
if i + 1 >= len(k_data):
|
continue
|
if i < 1:
|
continue
|
# 判断当日涨停且前日非涨停
|
if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])):
|
# 计算溢价率 = (当日最高价 - 前日收盘价)/前日收盘价
|
premium = (k_data[i - 1]['high'] - k_data[i]['close']) / k_data[i]['close']
|
premiums.append(premium)
|
return sum(premiums) / len(premiums) if premiums else 0
|
|
@classmethod
|
def get_first_exploded_avg_premium(cls, k_data, days):
|
"""
|
获取近几个交易日的首板炸板平均溢价率
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 平均溢价率
|
"""
|
premiums = []
|
for i in range(days):
|
if i + 1 >= len(k_data):
|
continue
|
if i < 1:
|
continue
|
# 判断当日炸板且前日非涨停
|
if (k_data[i]['high'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"]) >
|
k_data[i]['close']) and \
|
(k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])):
|
# 计算溢价率 = (当日最高价 - 前日收盘价)/前日收盘价
|
premium = (k_data[i + 1]['high'] - k_data[i]['close']) / k_data[i]['close']
|
premiums.append(premium)
|
return sum(premiums) / len(premiums) if premiums else 0
|
|
@classmethod
|
def get_first_limit_up_days(cls, k_data, days):
|
"""
|
获取近几个交易日的首板天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 首板天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 1 >= len(k_data):
|
continue
|
# 判断当日涨停且前日非涨停
|
if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def get_second_limit_up_days(cls, k_data, days):
|
"""
|
获取近几个交易日的二板天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 二板天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 2 >= len(k_data):
|
continue
|
# 判断连续两日涨停且第三日非涨停
|
if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])) and \
|
(k_data[i + 2]['close'] < cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"],
|
k_data[i + 2]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def __is_limit_up(cls, code, close, pre_close):
|
"""
|
是否涨停
|
@param code:
|
@param close:
|
@param pre_close:
|
@return:
|
"""
|
return abs(close - cls.calculate_upper_limit_price(code,
|
pre_close)) < 0.01
|
|
@classmethod
|
def get_third_limit_up_days(cls, k_data, days):
|
"""
|
获取近几个交易日的三板天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 三板天数
|
"""
|
count = 0
|
k_data = k_data[:days]
|
k_data = k_data[::-1]
|
for i in range(days):
|
if i + 3 >= len(k_data):
|
continue
|
# 判断连续三日涨停且第四日非涨停
|
if cls.__is_limit_up(k_data[i]["sec_id"], k_data[i]['close'], k_data[i]["pre_close"]):
|
if cls.__is_limit_up(k_data[i+1]["sec_id"], k_data[i+1]['close'], k_data[i+1]["pre_close"]):
|
if cls.__is_limit_up(k_data[i+2]["sec_id"], k_data[i+2]['close'], k_data[i+2]["pre_close"]):
|
if not cls.__is_limit_up(k_data[i+3]["sec_id"], k_data[i+3]['close'], k_data[i+3]["pre_close"]):
|
count += 1
|
return count
|
|
@classmethod
|
def get_fourth_or_more_limit_up_days(cls, k_data, days):
|
"""
|
获取近几个交易日的四板及以上天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 四板及以上天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 3 >= len(k_data):
|
continue
|
# 判断连续三日涨停且第四日涨停
|
if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])) and \
|
(k_data[i + 2]['close'] >= cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"],
|
k_data[i + 2]["pre_close"])) and \
|
(k_data[i + 3]['close'] >= cls.calculate_upper_limit_price(k_data[i + 3]["sec_id"],
|
k_data[i + 3]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def get_first_limit_down_days(cls, k_data, days):
|
"""
|
获取近几个交易日的首跌停天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 首跌停天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 1 >= len(k_data):
|
continue
|
# 判断当日跌停且前日非跌停
|
if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] > cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def get_second_limit_down_days(cls, k_data, days):
|
"""
|
获取近几个交易日的二跌停天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 二跌停天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 2 >= len(k_data):
|
continue
|
# 判断连续两日跌停且第三日非跌停
|
if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])) and \
|
(k_data[i + 2]['close'] > cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"],
|
k_data[i + 2]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def get_third_limit_down_days(cls, k_data, days):
|
"""
|
获取近几个交易日的三跌停天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 三跌停天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 3 >= len(k_data):
|
continue
|
# 判断连续三日跌停且第四日非跌停
|
if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])) and \
|
(k_data[i + 2]['close'] <= cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"],
|
k_data[i + 2]["pre_close"])) and \
|
(k_data[i + 3]['close'] > cls.calculate_lower_limit_price(k_data[i + 3]["sec_id"],
|
k_data[i + 3]["pre_close"])):
|
count += 1
|
return count
|
|
@classmethod
|
def get_fourth_or_more_limit_down_days(cls, k_data, days):
|
"""
|
获取近几个交易日的四跌停及以上天数
|
@param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
|
@param days: 交易日数量
|
@return: 四跌停及以上天数
|
"""
|
count = 0
|
for i in range(days):
|
if i + 3 >= len(k_data):
|
continue
|
# 判断连续三日跌停且第四日跌停
|
if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
|
(k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"],
|
k_data[i + 1]["pre_close"])) and \
|
(k_data[i + 2]['close'] <= cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"],
|
k_data[i + 2]["pre_close"])) and \
|
(k_data[i + 3]['close'] <= cls.calculate_lower_limit_price(k_data[i + 3]["sec_id"],
|
k_data[i + 3]["pre_close"])):
|
count += 1
|
return count
|
|
|
class K60SLineAnalyzer:
|
"""
|
60sk线分析
|
"""
|
|
@classmethod
|
def get_close_price_of_max_volume(cls, k_data):
|
"""
|
获取60sk线中最高量对应的收盘价
|
@param k_data: 60秒K线数据列表
|
@return: 最高成交量对应的收盘价,60sK线数据
|
"""
|
max_volume_item = max(k_data, key=lambda x: x['volume'])
|
return max_volume_item['close'], max_volume_item
|
|
|
class KPLLimitUpDataAnalyzer:
|
"""
|
开盘啦涨停数据分析
|
"""
|
|
@classmethod
|
def get_most_common_reasons(cls, limit_up_data, min_day, max_day):
|
"""
|
获取某段日期开盘啦出现最多原因出现的天数,排除特殊涨停原因,多个原因有一样的涨停天数,返回所有
|
@param limit_up_data: 近150个交易日某个代码的涨停数据,格式:[(代码,日期,涨停原因)]
|
@param min_day: 最小日期
|
@param max_day: 最大日期
|
@return: [(原因, 出现的天数)]
|
"""
|
reason_counts = {}
|
special_reasons = constant.KPL_INVALID_BLOCKS
|
if limit_up_data:
|
for _, date, reason, is_open, _blocks in limit_up_data:
|
if is_open:
|
continue
|
if min_day <= date <= max_day and reason not in special_reasons:
|
reason_counts[reason] = reason_counts.get(reason, 0) + 1
|
if not reason_counts:
|
return []
|
max_count = max(reason_counts.values())
|
return [(reason, count) for reason, count in reason_counts.items() if count == max_count]
|
|
@classmethod
|
def get_limit_up_reasons(cls, limit_up_data_list, min_day, max_day, include_recommend_reason=False):
|
"""
|
获取最近一段时间的涨停原因
|
@param include_recommend_reason: 是否包含推荐原因
|
@param max_day:
|
@param limit_up_data_list:
|
@param min_day:
|
@return:
|
"""
|
special_reasons = constant.KPL_INVALID_BLOCKS
|
day_block_codes = {}
|
if limit_up_data_list:
|
for _, date, reason, is_open, _blocks in limit_up_data_list:
|
if reason in special_reasons:
|
continue
|
if date > max_day or date < min_day:
|
continue
|
# 每天的板块涨停数量
|
if date not in day_block_codes:
|
day_block_codes[date] = {}
|
reasons = {reason}
|
if include_recommend_reason and _blocks:
|
reasons |= set(_blocks.split("、"))
|
for r in reasons:
|
if r not in day_block_codes[date]:
|
# {日期:{板块:[{真正涨停集合}, {炸板集合}]}}
|
day_block_codes[date][r] = [set(), set()]
|
if not is_open:
|
# 真正涨停
|
day_block_codes[date][r][0].add(_)
|
else:
|
# 炸板
|
day_block_codes[date][r][1].add(_)
|
blocks = set()
|
for date in day_block_codes:
|
for reason in day_block_codes[date]:
|
if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4:
|
# 最后涨停数>=2 炸板数>=4
|
blocks.add(reason)
|
return blocks
|
return set()
|
|
@classmethod
|
def get_continuous_limit_up_reasons(cls, limit_up_data_list, days_list):
|
"""
|
连续老题材:days_list交易日都在走的题材
|
@param limit_up_data_list:
|
@param days_list: ["2025-01-01"]
|
@return:
|
"""
|
special_reasons = constant.KPL_INVALID_BLOCKS
|
day_block_codes = {}
|
if limit_up_data_list:
|
for _, date, reason, is_open, _blocks in limit_up_data_list:
|
if reason in special_reasons:
|
continue
|
if date not in days_list:
|
continue
|
# 每天的板块涨停数量
|
if date not in day_block_codes:
|
day_block_codes[date] = {}
|
reasons = {reason}
|
for r in reasons:
|
if r not in day_block_codes[date]:
|
# {日期:{板块:[{真正涨停集合}, {炸板集合}]}}
|
day_block_codes[date][r] = [set(), set()]
|
if not is_open:
|
# 真正涨停
|
day_block_codes[date][r][0].add(_)
|
else:
|
# 炸板
|
day_block_codes[date][r][1].add(_)
|
block_days = {}
|
for date in day_block_codes:
|
for reason in day_block_codes[date]:
|
if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4:
|
# 最后涨停数>=2 炸板数>=4
|
if reason not in block_days:
|
block_days[reason] = set()
|
block_days[reason].add(date)
|
return set([b for b in block_days if len(block_days[b]) == len(days_list)])
|
return set()
|