""" 数据工具类 提供各种K线/涨停形态、技术指标和价格模式的计算方法 """ import constant from utils import tool class KTickLineAnalyzer: """ TICK级K线分析工具类 """ @classmethod def calculate_upper_limit_price(cls, stock_code, pre_close): """ 计算价格上限 @param stock_code: 证券代码 @param pre_close: 前一日收盘价 @return: 价格上限 """ return round(tool.get_limit_up_rate(stock_code) * pre_close, 2) @classmethod def calculate_lower_limit_price(cls, stock_code, pre_close): """ 计算价格下限 @param stock_code: 证券代码 @param pre_close: 前一日收盘价 @return: 价格下限 """ # A股ST股票涨跌幅为5%,其他为10% return round(tool.get_limit_down_rate(stock_code) * pre_close, 2) @classmethod def get_yesterday_close(cls, k_data): """ 获取昨日收盘价 @param k_data: K线数据列表 @return: 昨日收盘价 """ return k_data[0]['close'] @classmethod def get_yesterday_high(cls, k_data): """ 获取昨日最高价 @param k_data: K线数据列表 @return: 昨日最高价 """ return k_data[0]['high'] @classmethod def get_yesterday_amount(cls, k_data): """ 获取昨日成交额 @param k_data: K线数据列表 @return: 昨日最高价 """ return k_data[0]['amount'] @classmethod def get_yesterday_low_price(cls, k_data): """ 获取昨日最低价 @param k_data: K线数据列表 @return: 昨日最高价 """ return k_data[0]['low'] @classmethod def get_yesterday_open_price(cls, k_data): """ 获取昨日开盘价 @param k_data: K线数据列表 @return: 昨日最高价 """ return k_data[0]['open'] @classmethod def get_recent_days_high(cls, k_data, days): """ 获取近几个交易日的最高价 @param k_data: K线数据列表 @param days: 交易日数量 @return: 最高价 """ return max([d['high'] for d in k_data[:days]]) @classmethod def is_yesterday_limit_up(cls, k_data): """ 判断昨日是否涨停 @param k_data: K线数据列表 @return: True/False """ return k_data[0]['close'] >= cls.calculate_upper_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"]) @classmethod def is_yesterday_limit_down(cls, k_data): """ 判断昨日是否跌停 @param k_data: K线数据列表 @return: True/False """ return k_data[0]['close'] <= cls.calculate_lower_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"]) @classmethod def is_yesterday_exploded(cls, k_data): """ 判断昨日是否炸板 @param k_data: K线数据列表 @return: True/False """ return k_data[0]['high'] >= cls.calculate_upper_limit_price(k_data[0]["sec_id"], k_data[0]["pre_close"]) > \ k_data[0]['close'] @classmethod def get_yesterday_volume(cls, k_data): """ 获取昨日成交量 @param k_data: K线数据列表 @return: 成交量 """ return k_data[0]['volume'] @classmethod def get_recent_days_max_volume(cls, k_data, days): """ 获取近几个交易日的最高量及对应K线数据 @param k_data: K线数据列表 @param days: 交易日数量 @return: (最高量, 对应K线数据) """ max_volume_day = max(k_data[:days], key=lambda d: d['volume']) return max_volume_day['volume'], max_volume_day @classmethod def get_recent_limit_up_count(cls, k_data, days): """ 获取近几个交易日的涨停次数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 涨停次数 """ return sum( 1 for d in k_data[:days] if d['close'] >= cls.calculate_upper_limit_price(d["sec_id"], d["pre_close"])) @classmethod def get_recent_exploded_count(cls, k_data, days): """ 获取近几个交易日的炸板次数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 炸板次数 """ return sum(1 for d in k_data[:days] if d['high'] >= cls.calculate_upper_limit_price(d["sec_id"], d["pre_close"]) > d['close']) @classmethod def get_recent_limit_down_count(cls, k_data, days): """ 获取近几个交易日的跌停次数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 跌停次数 """ return sum( 1 for d in k_data[:days] if d['close'] <= cls.calculate_lower_limit_price(d["sec_id"], d["pre_close"])) @classmethod def get_recent_days_double_volume_date(cls, k_data, days): """ 获取最近几个交易日的倍量日期 @param k_data: K线数据列表 @param days: 交易日数量 @return: 倍量的日期 """ k_datas: list = k_data[:days] k_datas.reverse() for i in range(1, len(k_datas)): latest_volume = k_datas[i - 1]["volume"] if k_datas[i]["volume"] > 2 * latest_volume: return k_datas[i]["bob"] return None @classmethod def get_first_limit_up_avg_premium(cls, k_data, days): """ 获取近几个交易日的首板涨停平均溢价率 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 平均溢价率 """ premiums = [] for i in range(days): if i + 1 >= len(k_data): continue if i < 1: continue # 判断当日涨停且前日非涨停 if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])): # 计算溢价率 = (当日最高价 - 前日收盘价)/前日收盘价 premium = (k_data[i - 1]['high'] - k_data[i]['close']) / k_data[i]['close'] premiums.append(premium) return sum(premiums) / len(premiums) if premiums else 0 @classmethod def get_first_exploded_avg_premium(cls, k_data, days): """ 获取近几个交易日的首板炸板平均溢价率 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 平均溢价率 """ premiums = [] for i in range(days): if i + 1 >= len(k_data): continue if i < 1: continue # 判断当日炸板且前日非涨停 if (k_data[i]['high'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"]) > k_data[i]['close']) and \ (k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])): # 计算溢价率 = (当日最高价 - 前日收盘价)/前日收盘价 premium = (k_data[i + 1]['high'] - k_data[i]['close']) / k_data[i]['close'] premiums.append(premium) return sum(premiums) / len(premiums) if premiums else 0 @classmethod def get_first_limit_up_days(cls, k_data, days): """ 获取近几个交易日的首板天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 首板天数 """ count = 0 for i in range(days): if i + 1 >= len(k_data): continue # 判断当日涨停且前日非涨停 if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] < cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])): count += 1 return count @classmethod def get_second_limit_up_days(cls, k_data, days): """ 获取近几个交易日的二板天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 二板天数 """ count = 0 for i in range(days): if i + 2 >= len(k_data): continue # 判断连续两日涨停且第三日非涨停 if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])) and \ (k_data[i + 2]['close'] < cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"], k_data[i + 2]["pre_close"])): count += 1 return count @classmethod def __is_limit_up(cls, code, close, pre_close): """ 是否涨停 @param code: @param close: @param pre_close: @return: """ return abs(close - cls.calculate_upper_limit_price(code, pre_close)) < 0.01 @classmethod def get_third_limit_up_days(cls, k_data, days): """ 获取近几个交易日的三板天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 三板天数 """ count = 0 k_data = k_data[:days] k_data = k_data[::-1] for i in range(days): if i + 3 >= len(k_data): continue # 判断连续三日涨停且第四日非涨停 day_count = 3 for n in range(day_count + 1): if n < day_count: if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'], k_data[i + n]["pre_close"]): # 非涨停 break else: if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'], k_data[i + n]["pre_close"]): count += 1 break return count @classmethod def get_fourth_or_more_limit_up_days(cls, k_data, days): """ 获取近几个交易日的四板及以上天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 四板及以上天数 """ count = 0 for i in range(days): if i + 3 >= len(k_data): continue # 判断连续三日涨停且第四日涨停 if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])) and \ (k_data[i + 2]['close'] >= cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"], k_data[i + 2]["pre_close"])) and \ (k_data[i + 3]['close'] >= cls.calculate_upper_limit_price(k_data[i + 3]["sec_id"], k_data[i + 3]["pre_close"])): count += 1 return count @classmethod def get_first_limit_down_days(cls, k_data, days): """ 获取近几个交易日的首跌停天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 首跌停天数 """ count = 0 for i in range(days): if i + 1 >= len(k_data): continue # 判断当日跌停且前日非跌停 if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] > cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])): count += 1 return count @classmethod def get_second_limit_down_days(cls, k_data, days): """ 获取近几个交易日的二跌停天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 二跌停天数 """ count = 0 for i in range(days): if i + 2 >= len(k_data): continue # 判断连续两日跌停且第三日非跌停 if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])) and \ (k_data[i + 2]['close'] > cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"], k_data[i + 2]["pre_close"])): count += 1 return count @classmethod def get_third_limit_down_days(cls, k_data, days): """ 获取近几个交易日的三跌停天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 三跌停天数 """ count = 0 for i in range(days): if i + 3 >= len(k_data): continue # 判断连续三日跌停且第四日非跌停 if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])) and \ (k_data[i + 2]['close'] <= cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"], k_data[i + 2]["pre_close"])) and \ (k_data[i + 3]['close'] > cls.calculate_lower_limit_price(k_data[i + 3]["sec_id"], k_data[i + 3]["pre_close"])): count += 1 return count @classmethod def get_fourth_or_more_limit_down_days(cls, k_data, days): """ 获取近几个交易日的四跌停及以上天数 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @param days: 交易日数量 @return: 四跌停及以上天数 """ count = 0 for i in range(days): if i + 3 >= len(k_data): continue # 判断连续三日跌停且第四日跌停 if (k_data[i]['close'] <= cls.calculate_lower_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \ (k_data[i + 1]['close'] <= cls.calculate_lower_limit_price(k_data[i + 1]["sec_id"], k_data[i + 1]["pre_close"])) and \ (k_data[i + 2]['close'] <= cls.calculate_lower_limit_price(k_data[i + 2]["sec_id"], k_data[i + 2]["pre_close"])) and \ (k_data[i + 3]['close'] <= cls.calculate_lower_limit_price(k_data[i + 3]["sec_id"], k_data[i + 3]["pre_close"])): count += 1 return count @classmethod def is_too_high_and_not_relase_volume(cls, code, k_data): """ 长得太高且没放量:30个交易日内,出现过最低价(最高价之前的交易日)到最高价之间的涨幅≥35%的票,且今日距离最高价那日无涨停/无炸板 @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序) @return: 四跌停及以上天数 """ k_data = k_data[:30] # 获取最高价信息 max_high_price_data = max(k_data, key=lambda x: x["high"]) min_close_price_data = min([d for d in k_data if d['bob'] < max_high_price_data['bob']], key=lambda x: x["close"]) if (max_high_price_data['high'] - min_close_price_data['close'])/min_close_price_data['close'] < 0.35: # 涨幅小于35% return False k_data = [d for d in k_data if d['bob'] > max_high_price_data['bob']] # 判断是否涨停过 if len([d for d in k_data if cls.__is_limit_up(code, d["high"], d["pre_close"])]) >0: # 最高价之后有过涨停 return False return True class K60SLineAnalyzer: """ 60sk线分析 """ @classmethod def get_close_price_of_max_volume(cls, k_data): """ 获取60sk线中最高量对应的收盘价 @param k_data: 60秒K线数据列表 @return: 最高成交量对应的收盘价,60sK线数据 """ max_volume_item = max(k_data, key=lambda x: x['volume']) return max_volume_item['close'], max_volume_item class KPLLimitUpDataAnalyzer: """ 开盘啦涨停数据分析 """ @classmethod def get_most_common_reasons(cls, limit_up_data, min_day, max_day): """ 获取某段日期开盘啦出现最多原因出现的天数,排除特殊涨停原因,多个原因有一样的涨停天数,返回所有 @param limit_up_data: 近150个交易日某个代码的涨停数据,格式:[(代码,日期,涨停原因)] @param min_day: 最小日期 @param max_day: 最大日期 @return: [(原因, 出现的天数)] """ reason_counts = {} special_reasons = constant.KPL_INVALID_BLOCKS if limit_up_data: for _, date, reason, is_open, _blocks in limit_up_data: if is_open: continue if min_day <= date <= max_day and reason not in special_reasons: reason_counts[reason] = reason_counts.get(reason, 0) + 1 if not reason_counts: return [] max_count = max(reason_counts.values()) return [(reason, count) for reason, count in reason_counts.items() if count == max_count] @classmethod def get_limit_up_reasons(cls, limit_up_data_list, min_day, max_day, include_recommend_reason=False): """ 获取最近一段时间的涨停原因 @param include_recommend_reason: 是否包含推荐原因 @param max_day: @param limit_up_data_list: @param min_day: @return: """ special_reasons = constant.KPL_INVALID_BLOCKS day_block_codes = {} if limit_up_data_list: for _, date, reason, is_open, _blocks in limit_up_data_list: if reason in special_reasons: continue if date > max_day or date < min_day: continue # 每天的板块涨停数量 if date not in day_block_codes: day_block_codes[date] = {} reasons = {reason} if include_recommend_reason and _blocks: reasons |= set(_blocks.split("、")) for r in reasons: if r not in day_block_codes[date]: # {日期:{板块:[{真正涨停集合}, {炸板集合}]}} day_block_codes[date][r] = [set(), set()] if not is_open: # 真正涨停 day_block_codes[date][r][0].add(_) else: # 炸板 day_block_codes[date][r][1].add(_) blocks = set() for date in day_block_codes: for reason in day_block_codes[date]: if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4: # 最后涨停数>=2 炸板数>=4 blocks.add(reason) return blocks return set() @classmethod def get_continuous_limit_up_reasons(cls, limit_up_data_list, days_list): """ 连续老题材:days_list交易日都在走的题材 @param limit_up_data_list: @param days_list: ["2025-01-01"] @return: """ special_reasons = constant.KPL_INVALID_BLOCKS day_block_codes = {} if limit_up_data_list: for _, date, reason, is_open, _blocks in limit_up_data_list: if reason in special_reasons: continue if date not in days_list: continue # 每天的板块涨停数量 if date not in day_block_codes: day_block_codes[date] = {} reasons = {reason} for r in reasons: if r not in day_block_codes[date]: # {日期:{板块:[{真正涨停集合}, {炸板集合}]}} day_block_codes[date][r] = [set(), set()] if not is_open: # 真正涨停 day_block_codes[date][r][0].add(_) else: # 炸板 day_block_codes[date][r][1].add(_) block_days = {} for date in day_block_codes: for reason in day_block_codes[date]: if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4: # 最后涨停数>=2 炸板数>=4 if reason not in block_days: block_days[reason] = set() block_days[reason].add(date) return set([b for b in block_days if len(block_days[b]) == len(days_list)]) return set()