# 当前涨停数据 import constant from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from third_data.third_blocks_manager import BlockMapManager from utils import tool from utils.kpl_data_db_util import KPLLimitUpDataUtil # 用于计算激进买开1的板数:{"代码":(几版,{板块})} from utils.tool import singleton open_limit_up_code_dict_for_radical_buy = None @tool.singleton class ContainsLimitupCodesBlocksManager: """ 包含涨停代码的板块 """ __block_codes_dict = {} def __init__(self): self.__load_data() def __load_data(self): records = KPLLimitUpDataUtil.get_latest_block_infos_by_day() if records: codes = set([r[3] for r in records]) for code in codes: self.__add_code(code) def get_block_codes(self, block): return self.__block_codes_dict.get(block) def __add_code(self, code): blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if blocks: for b in blocks: if b not in self.__block_codes_dict: self.__block_codes_dict[b] = set() self.__block_codes_dict[b].add(code) def set_current_limit_up_datas(self, datas): """ 设置当前涨停数据 @param datas: @return: """ if datas: codes = [x[0] for x in datas] for code in codes: self.__add_code(code) @singleton class LimitUpCodesBlockRecordManager: """ 历史涨停代码的板块管理 """ # 涨停原因 __limit_up_reasons_dict = {} # 涨停推荐原因 __limit_up_recommend_reasons_dict = {} # 激进买的原因 __radical_buy_reasons_dict = {} # 原始数据 __radical_buy_reasons_origin_data_dict = {} __instance = None __day = tool.get_now_date_str() def __init__(self, day=tool.get_now_date_str()): self.__day = day self.__load_data() @classmethod def __load_data(cls): kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 180), max_day=cls.__day) limit_up_reasons_dict = {} limit_up_recommend_block_dict = {} for r in kpl_results: code = r[0] if code not in limit_up_reasons_dict: limit_up_reasons_dict[code] = [] if code not in limit_up_recommend_block_dict: limit_up_recommend_block_dict[code] = [] if len(limit_up_reasons_dict[code]) >= 2: continue limit_up_reasons_dict[code].append(r[2]) if r[3]: limit_up_recommend_block_dict[code].extend(r[3].split("、")) for code in limit_up_reasons_dict: cls.__limit_up_reasons_dict[code] = set(limit_up_reasons_dict[code]) for code in limit_up_recommend_block_dict: cls.__limit_up_recommend_reasons_dict[code] = set(limit_up_recommend_block_dict[code]) # 加载为扫入买匹配的代码板块 kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 365), max_day=cls.__day) kpl_block_dict = {} for r in kpl_results: code = r[0] if code not in kpl_block_dict: kpl_block_dict[code] = [] kpl_block_dict[code].append((r[2], r[1])) # (板块, 时间) for code in kpl_block_dict: block_infos = kpl_block_dict.get(code) block_infos.sort(key=lambda x: x[1], reverse=True) temp_dict = {} # {"板块":[出现次数, 最近出现时间]} for b in block_infos: if b[0] not in temp_dict: temp_dict[b[0]] = [0, b[1]] temp_dict[b[0]][0] += 1 temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict] # 按照涨停次数与最近涨停时间排序 temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True) cls.__radical_buy_reasons_origin_data_dict[code] = temp_list blocks = {temp_list[0][0]} if len(temp_list) > 1: if temp_list[1][1] >= 2: blocks.add(temp_list[1][0]) blocks -= constant.KPL_INVALID_BLOCKS cls.__radical_buy_reasons_dict[code] = blocks def get_limit_up_reasons(self, code): """ 获取涨停原因 @param code: @return: """ if code in self.__limit_up_reasons_dict: return set(self.__limit_up_reasons_dict[code]) return set() def get_limit_up_reasons_with_recommend(self, code): """ 获取涨停原因与推荐原因 @param code: @return: """ blocks = set() if code in self.__limit_up_reasons_dict: blocks |= self.__limit_up_reasons_dict[code] if code in self.__limit_up_recommend_reasons_dict: blocks |= self.__limit_up_recommend_reasons_dict[code] return blocks def get_radical_buy_blocks_origin_data(self, code): """ 获取涨停买判断的板块的原始数据 @param code: @return: """ return self.__radical_buy_reasons_origin_data_dict.get(code) def get_radical_buy_blocks(self, code): """ 获取涨停买判断的板块 @param code: @return: """ if code in self.__radical_buy_reasons_dict: return set(self.__radical_buy_reasons_dict.get(code)) return None class TodayLimitUpReasonChangeManager: """ 今日涨停原因变化 """ # 涨停原因 __today_change_reasons_dict = {} # 涨停 __instance = None __db = 1 __redisManager = redis_manager.RedisManager(1) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TodayLimitUpReasonChangeManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_data(cls): keys = RedisUtils.keys(cls.__get_redis(), "kpl_limit_up_reason_his-*") if keys: for key in keys: code = key.split("-")[1] reasons = RedisUtils.smembers(cls.__get_redis(), key) cls.__today_change_reasons_dict[code] = reasons def set_today_limit_up_reason_change(self, code, from_reason, to_reason): if code not in self.__today_change_reasons_dict: self.__today_change_reasons_dict[code] = set() self.__today_change_reasons_dict[code].add(from_reason) RedisUtils.sadd_async(self.__db, f"kpl_limit_up_reason_his-{code}", from_reason) RedisUtils.expire_async(self.__db, f"kpl_limit_up_reason_his-{code}", tool.get_expire()) def get_changed_reasons(self, code): if code in self.__today_change_reasons_dict: return self.__today_change_reasons_dict[code] return set() class LimitUpDataConstant: """ 当前涨停数据的数据 """ __history_code_blocks_dict = {} __history_code_data_dict = {} history_limit_up_datas = [] current_limit_up_datas = [] @classmethod def set_current_limit_up_datas(cls, current_limit_up_datas): cls.current_limit_up_datas = current_limit_up_datas @classmethod def set_history_limit_up_datas(cls, history_limit_up_datas_): if history_limit_up_datas_ is None: return cls.history_limit_up_datas = history_limit_up_datas_ for d in cls.history_limit_up_datas: # 参考原因:当前涨停原因+当日变化之前的原因+扫入参考原因 code = d[3] # 当前涨停原因 # blocks = {d[2]} blocks = set() # 扫入参考原因 history_reasons = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if history_reasons: blocks |= history_reasons # today_changed_reasons = TodayLimitUpReasonChangeManager().get_changed_reasons(code) # if today_changed_reasons: # blocks |= today_changed_reasons # 开1才能包含推荐原因 # if d[6] and kpl_block_util.open_limit_up_time_range[0] <= int(d[5]) < \ # kpl_block_util.open_limit_up_time_range[1]: # blocks |= set(d[6].split("、")) blocks -= constant.KPL_INVALID_BLOCKS cls.__history_code_blocks_dict[code] = BlockMapManager().filter_blocks(blocks) cls.__history_code_data_dict[code] = d @classmethod def get_blocks_with_history(cls, code): """ 根据历史涨停获取板块 @param code: @return: """ return cls.__history_code_blocks_dict.get(code) @classmethod def get_limit_up_reason_with_history(cls, code): d = cls.__history_code_data_dict.get(code) if d: return d[2] return None @classmethod def get_first_limit_up_time(cls, code): if code in cls.__history_code_data_dict: return int(cls.__history_code_data_dict[code][5]) return None if __name__ == "__main__": ContainsLimitupCodesBlocksManager()