""" 代码行业关键词管理 """ # 涨停代码关键词板块管理 import json import constant import global_data_loader import global_util import log import tool from db import redis_manager from log import logger_kpl_limit_up from trade import trade_manager # 实时开盘啦市场数据 class RealTimeKplMarketData: # 精选前5 top_5_reason_set = set() # 行业前5 top_5_industry_set = set() @classmethod def set_top_5_reasons(cls, datas): temp_set = set() base_count = 5 for i in range(0, len(datas)): if datas[i][1] in constant.KPL_INVALID_BLOCKS: base_count += 1 if i >= base_count: break if datas[i][3] > 5000 * 10000: temp_set.add(datas[i][1]) cls.top_5_reason_set = temp_set @classmethod def set_top_5_industry(cls, datas): temp_set = set() base_count = 5 for i in range(0, len(datas)): if datas[i][1] in constant.KPL_INVALID_BLOCKS: base_count += 1 if i >= base_count: break if datas[i][2] > 5000 * 10000: temp_set.add(datas[i][1]) cls.top_5_reason_set = temp_set # 获取能够买的行业关键字set @classmethod def get_can_buy_key_set(cls): temp_set = cls.top_5_reason_set | cls.top_5_industry_set return temp_set @classmethod def is_in_top(cls, keys): reasons = cls.get_can_buy_key_set() temp_set = keys & reasons if temp_set: return True, temp_set else: return False, None class LimitUpCodesPlateKeyManager: # 今日涨停原因 today_limit_up_reason_dict = {} today_total_limit_up_reason_dict = {} total_code_keys_dict = {} total_key_codes_dict = {} __redisManager = redis_manager.RedisManager(1) def __get_redis(self): return self.__redisManager.getRedis() # 获取今日涨停数据,格式:[(代码,涨停原因)] def set_today_limit_up(self, datas): temp_dict = {} if datas: for item in datas: temp_dict[item[0]] = item[1] self.today_limit_up_reason_dict = temp_dict if datas: for item in datas: self.__set_total_keys(item[0]) self.set_today_total_limit_up(datas) # 设置今日历史涨停数据 def set_today_total_limit_up(self, datas): for item in datas: code = item[0] self.today_total_limit_up_reason_dict[code] = item[1] # 今日涨停原因变化 def set_today_limit_up_reason_change(self, code, from_reason, to_reason): self.__get_redis().sadd(f"kpl_limit_up_reason_his-{code}", from_reason) self.__get_redis().expire(f"kpl_limit_up_reason_his-{code}", tool.get_expire()) self.__set_total_keys(code) def __set_total_keys(self, code): keys = set() keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}") keys |= keys_his if code in self.today_limit_up_reason_dict: keys.add(self.today_limit_up_reason_dict.get(code)) self.total_code_keys_dict[code] = keys for k in keys: if k not in self.total_key_codes_dict: self.total_key_codes_dict[k] = set() self.total_key_codes_dict[k].add(code) logger_kpl_limit_up.info("{}板块关键词:{}", code, keys) def get_codes_by_key_without_mine(self, key, code): # 只比较今日涨停原因 codes_set = set() if key in self.total_key_codes_dict: codes_set |= self.total_key_codes_dict[key] codes_set.discard(code) return codes_set # 目标代码关键词管理 class TargetCodePlateKeyManager: __redisManager = redis_manager.RedisManager(1) # 历史涨停原因 __history_limit_up_reason_dict = {} # 二级行业 __second_industry_dict = {} # 板块 __blocks_dict = {} def __get_redis(self): return self.__redisManager.getRedis() def set_history_limit_up_reason(self, code, reasons): self.__history_limit_up_reason_dict[code] = set(reasons) self.__get_redis().setex(f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons))) # 如果返回值不为None表示已经加载过历史原因了 def get_history_limit_up_reason(self, code): reasons = self.__history_limit_up_reason_dict.get(code) if reasons is None: # 从内存中加载 val = self.__get_redis().get(f"kpl_his_limit_up_reason-{code}") if val is not None: val = set(json.loads(val)) self.__history_limit_up_reason_dict[code] = val return self.__history_limit_up_reason_dict.get(code) else: return reasons def set_blocks(self, code, blocks): self.__blocks_dict[code] = set(blocks) self.__get_redis().setex(f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) def get_blocks(self, code): reasons = self.__blocks_dict.get(code) if reasons is None: # 从内存中加载 val = self.__get_redis().get(f"kpl_blocks-{code}") if val is not None: val = set(json.loads(val)) self.__blocks_dict[code] = val return self.__blocks_dict.get(code) else: return reasons # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,板块 def get_plate_keys(self, code): keys = set() k1 = set() if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict: k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]} # 加载历史原因 k11 = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}") k2 = set() if code in self.__history_limit_up_reason_dict: k2 = self.__history_limit_up_reason_dict[code] k3 = set() industry = global_util.code_industry_map.get(code) if industry: k3 = {industry} k4 = set() if code in self.__blocks_dict: k4 = self.__blocks_dict[code] for k in [k1, k11, k2, k3, k4]: keys |= k # 排除无效的涨停原因 keys = keys - set(constant.KPL_INVALID_BLOCKS) return keys, k1, k11, k2, k3, k4 class CodePlateKeyBuyManager: __TargetCodePlateKeyManager = TargetCodePlateKeyManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() # 是否可以下单 @classmethod def can_buy(cls, code): keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code) # 板块Key是否在市场前5key中 is_in, valid_keys = RealTimeKplMarketData.is_in_top(keys) if not valid_keys: return False, "板块未在市场流入前5" # 相同板块中是否已经有别的票涨停 is_back = False, '' for key in valid_keys: codes = cls.__LimitUpCodesPlateKeyManager.get_codes_by_key_without_mine(key, code) if codes and len(codes) > 0: is_back = True, key break if not is_back[0]: return False, f"板块中首个涨停:{valid_keys}" # 看板块中是否已经有已经下单的或者成交的代码 codes = trade_manager.get_codes_by_trade_states( {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER, trade_manager.TRADE_STATE_BUY_SUCCESS}) # 遍历已经成交/下单的代码,获取其涨停原因,然后和当前代码涨停原因做比较,有相同代码的不能买 for c in codes: keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c) # 实时涨停原因 for k_ in k1_: # 当前代码已经有挂的或者成交的 if k_ in valid_keys: return False, f"{k_}板块中的{c}已经下单/买入成功,同一板块中只能买1个票" return True, f"涨停原因:{is_back[1]}" if __name__ == "__main__": datas = log.load_kpl_reason_changes() for k in datas: LimitUpCodesPlateKeyManager().set_today_limit_up_reason_change(k[0], k[1], k[2])