# 当前涨停数据 import constant from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from log_module import async_log_util from log_module.log import logger_debug from third_data import kpl_util from third_data.third_blocks_manager import BlockMapManager from trade import trade_record_log_util from utils import tool, global_util from utils.kpl_data_db_util import KPLLimitUpDataUtil # 用于计算激进买开1的板数:{"代码":(几版,{板块})} from utils.tool import singleton open_limit_up_code_dict_for_radical_buy = None @tool.singleton class ContainsLimitupCodesBlocksManager: """ 包含涨停代码的板块 """ __block_codes_dict = {} def __init__(self): self.__load_data() def __load_data(self): records = KPLLimitUpDataUtil.get_latest_block_infos_by_day() if records: codes = set([r[3] for r in records]) for code in codes: self.__add_code(code) def get_block_codes(self, block): return self.__block_codes_dict.get(block) def __add_code(self, code): blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if blocks: for b in blocks: if b not in self.__block_codes_dict: self.__block_codes_dict[b] = set() self.__block_codes_dict[b].add(code) def set_current_limit_up_datas(self, datas): """ 设置当前涨停数据 @param datas: @return: """ if datas: codes = [x[0] for x in datas] for code in codes: self.__add_code(code) @singleton class LimitUpCodesBlockRecordManager: """ 历史涨停代码的板块管理 """ # 涨停原因 __limit_up_reasons_dict = {} # 涨停推荐原因 __limit_up_recommend_reasons_dict = {} # 激进买的原因 __radical_buy_reasons_dict = {} # 原始数据 __radical_buy_reasons_origin_data_dict = {} # 今日的新概念 __new_blocks = set() # 代码的新题材 __code_new_blocks = {} __instance = None __day = tool.get_now_date_str() def __init__(self, day=tool.get_now_date_str()): self.__day = day self.__load_data() @classmethod def __load_data(cls): kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 180), max_day=cls.__day) limit_up_reasons_dict = {} limit_up_recommend_block_dict = {} days = set() for r in kpl_results: # 取60个交易日之内的题材 days.add(r[1]) days = list(days) days.sort(key=lambda x: int(x.replace("-", "")), reverse=True) days = days[:60] for r in kpl_results: # 取60个交易日之内的题材 code = r[0] if code not in limit_up_reasons_dict: limit_up_reasons_dict[code] = [] if code not in limit_up_recommend_block_dict: limit_up_recommend_block_dict[code] = [] if len(limit_up_reasons_dict[code]) >= 2: continue limit_up_reasons_dict[code].append(r[2]) if r[3]: limit_up_recommend_block_dict[code].extend(r[3].split("、")) for code in limit_up_reasons_dict: cls.__limit_up_reasons_dict[code] = set(limit_up_reasons_dict[code]) for code in limit_up_recommend_block_dict: cls.__limit_up_recommend_reasons_dict[code] = set(limit_up_recommend_block_dict[code]) # 加载为扫入买匹配的代码板块 kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 365), max_day=cls.__day) # {"代码":[(板块, 日期), (板块, 日期)]} kpl_block_dict = {} for r in kpl_results: # 当日炸板的不计算原因 if r[4] == 1: continue code = r[0] if code not in kpl_block_dict: kpl_block_dict[code] = [] kpl_block_dict[code].append((r[2], r[1])) # (板块, 日期) for code in kpl_block_dict: if code == '002361': print("") block_infos = kpl_block_dict.get(code) cls.__radical_buy_reasons_dict[code] = cls.__compute_limit_up_reasons(code, block_infos) @classmethod def __compute_limit_up_reasons(cls, code, block_infos): """ 计算涨停原因 @param code: @param block_infos: @return: """ # [(板块, 日期)] block_infos.sort(key=lambda x: x[1], reverse=True) # {"板块":[(出现次数, 最近出现日期)]} temp_dict = {} for b in block_infos: if b[0] in constant.KPL_INVALID_BLOCKS: continue if b[0] not in temp_dict: temp_dict[b[0]] = [0, b[1]] temp_dict[b[0]][0] += 1 if not temp_dict: return set() temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict] # 按照涨停次数与最近涨停时间排序 temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True) cls.__radical_buy_reasons_origin_data_dict[code] = temp_list blocks = {temp_list[0][0]} # 取涨停次数最多的和最近涨停的 blocks.add(block_infos[0][0]) blocks -= constant.KPL_INVALID_BLOCKS # 去除例如概念这些泛指词 return set([kpl_util.filter_block(x) for x in blocks]) def get_limit_up_reasons(self, code): """ 获取涨停原因 @param code: @return: """ if code in self.__limit_up_reasons_dict: return set(self.__limit_up_reasons_dict[code]) return set() def get_limit_up_reasons_with_recommend(self, code): """ 获取涨停原因与推荐原因 @param code: @return: """ blocks = set() if code in self.__limit_up_reasons_dict: blocks |= self.__limit_up_reasons_dict[code] if code in self.__limit_up_recommend_reasons_dict: blocks |= self.__limit_up_recommend_reasons_dict[code] return blocks def get_radical_buy_blocks_origin_data(self, code): """ 获取涨停买判断的板块的原始数据 @param code: @return: """ return self.__radical_buy_reasons_origin_data_dict.get(code) def get_radical_buy_blocks(self, code): """ 获取涨停买判断的板块 @param code: @return: """ if code in self.__radical_buy_reasons_dict: return set(self.__radical_buy_reasons_dict.get(code)) return None def add_new_blocks(self, code, block): """ 添加新题材的板块 @param code: @param block: @return: 返回增加新题材是否成功 """ # 自由流通股本要大于50亿 zyltgb = global_util.zyltgb_map.get(code) if not zyltgb or zyltgb < 10e8: return False if block in constant.KPL_INVALID_BLOCKS: return False old_blocks = self.__radical_buy_reasons_dict.get(code) if old_blocks is None: old_blocks = set() if block in old_blocks: return False self.__new_blocks.add(block) if code not in self.__code_new_blocks: self.__code_new_blocks[code] = set() self.__code_new_blocks[code].add(block) old_blocks.add(block) trade_record_log_util.add_temp_special_codes(code, f"新题材辨识度:{block}") async_log_util.info(logger_debug, f"今日新增概念:{code} - {block}") self.__radical_buy_reasons_dict[code] = old_blocks return True def get_new_blocks(self): """ 获取新概念 @return: """ return self.__new_blocks def is_new_block(self, block): """ 是否是新题材 @param block: @return: """ if self.__new_blocks and block in self.__new_blocks: return True return False def has_new_block(self, code): """ 是否有新题材 @param code: @return: """ if self.__code_new_blocks.get(code): return True return False class TodayLimitUpReasonChangeManager: """ 今日涨停原因变化 """ # 涨停原因 __today_change_reasons_dict = {} # 涨停 __instance = None __db = 1 __redisManager = redis_manager.RedisManager(1) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TodayLimitUpReasonChangeManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_data(cls): keys = RedisUtils.keys(cls.__get_redis(), "kpl_limit_up_reason_his-*") if keys: for key in keys: code = key.split("-")[1] reasons = RedisUtils.smembers(cls.__get_redis(), key) cls.__today_change_reasons_dict[code] = reasons def set_today_limit_up_reason_change(self, code, from_reason, to_reason): if code not in self.__today_change_reasons_dict: self.__today_change_reasons_dict[code] = set() self.__today_change_reasons_dict[code].add(from_reason) RedisUtils.sadd_async(self.__db, f"kpl_limit_up_reason_his-{code}", from_reason) RedisUtils.expire_async(self.__db, f"kpl_limit_up_reason_his-{code}", tool.get_expire()) def get_changed_reasons(self, code): if code in self.__today_change_reasons_dict: return self.__today_change_reasons_dict[code] return set() class LimitUpDataConstant: """ 当前涨停数据的数据 """ __history_code_blocks_dict = {} __history_code_data_dict = {} history_limit_up_datas = [] current_limit_up_datas = [] __current_limit_up_block_codes = {} __history_limit_up_block_codes = {} @classmethod def set_current_limit_up_datas(cls, current_limit_up_datas): cls.current_limit_up_datas = current_limit_up_datas # 统计板块当前涨停数量 block_codes = {} for d in current_limit_up_datas: code = d[0] blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if blocks: for b in blocks: if b not in block_codes: block_codes[b] = set() block_codes[b].add(code) cls.__current_limit_up_block_codes = block_codes @classmethod def set_history_limit_up_datas(cls, history_limit_up_datas_): if history_limit_up_datas_ is None: return cls.history_limit_up_datas = history_limit_up_datas_ for d in cls.history_limit_up_datas: # 参考原因:当前涨停原因+当日变化之前的原因+扫入参考原因 code = d[3] # 当前涨停原因 # blocks = {d[2]} blocks = set() # 扫入参考原因 history_reasons = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if history_reasons: blocks |= history_reasons # today_changed_reasons = TodayLimitUpReasonChangeManager().get_changed_reasons(code) # if today_changed_reasons: # blocks |= today_changed_reasons # 开1才能包含推荐原因 # if d[6] and kpl_block_util.open_limit_up_time_range[0] <= int(d[5]) < \ # kpl_block_util.open_limit_up_time_range[1]: # blocks |= set(d[6].split("、")) blocks -= constant.KPL_INVALID_BLOCKS if blocks: for b in blocks: if b not in cls.__history_limit_up_block_codes: cls.__history_limit_up_block_codes[b] = set() cls.__history_limit_up_block_codes[b].add(code) cls.__history_code_blocks_dict[code] = BlockMapManager().filter_blocks(blocks) cls.__history_code_data_dict[code] = d @classmethod def get_blocks_with_history(cls, code): """ 根据历史涨停获取板块 @param code: @return: """ return cls.__history_code_blocks_dict.get(code) @classmethod def get_limit_up_reason_with_history(cls, code): d = cls.__history_code_data_dict.get(code) if d: return d[2] return None @classmethod def get_first_limit_up_time(cls, code): if code in cls.__history_code_data_dict: return int(cls.__history_code_data_dict[code][5]) return None @classmethod def get_current_limit_up_block_codes(cls, block): return cls.__current_limit_up_block_codes.get(block) @classmethod def get_history_limit_up_block_codes(cls, block): return cls.__history_limit_up_block_codes.get(block) @classmethod def get_history_limit_up_codes(cls): """ 获取今日历史涨停代码 @return: """ if not cls.__history_code_data_dict: return set() return cls.__history_code_data_dict.keys() if __name__ == "__main__": LimitUpCodesBlockRecordManager()