""" 代码行业关键词管理 """ # 涨停代码关键词板块管理 import json import constant from db.redis_manager import RedisUtils from third_data import kpl_block_util from utils import global_util, tool from log_module import log from db import redis_manager from log_module.log import logger_kpl_limit_up, logger_kpl_block_can_buy, logger_kpl_debug from third_data.kpl_util import KPLPlatManager from trade import trade_manager # 代码精选板块管理 class KPLCodeJXBlockManager: __redisManager = redis_manager.RedisManager(3) __code_blocks = {} def __get_redis(self): return self.__redisManager.getRedis() def save_jx_blocks(self, code, blocks): if blocks is None: return # 保存前2条数据 RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks)) self.__code_blocks[code] = blocks # 获取精选板块 def get_jx_blocks(self, code): if code in self.__code_blocks: return self.__code_blocks[code] val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}") if val is None: return None else: val = json.loads(val) self.__code_blocks[code] = val return self.__code_blocks[code] # 开盘啦禁止交易板块管理 class KPLPlateForbiddenManager: __redisManager = redis_manager.RedisManager(3) def __get_redis(self): return self.__redisManager.getRedis() def save_plate(self, plate): RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire()) def list_all(self): return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates") class LimitUpCodesPlateKeyManager: # 今日涨停原因 today_limit_up_reason_dict = {} today_total_limit_up_reason_dict = {} total_code_keys_dict = {} total_key_codes_dict = {} __redisManager = redis_manager.RedisManager(1) def __get_redis(self): return self.__redisManager.getRedis() # 获取今日涨停数据,格式:[(代码,涨停原因)] def set_today_limit_up(self, datas): temp_dict = {} if datas: for item in datas: temp_dict[item[0]] = item[1] self.today_limit_up_reason_dict = temp_dict if datas: for item in datas: self.__set_total_keys(item[0]) self.set_today_total_limit_up(datas) # 设置今日历史涨停数据 def set_today_total_limit_up(self, datas): for item in datas: code = item[0] self.today_total_limit_up_reason_dict[code] = item[1] # 今日涨停原因变化 def set_today_limit_up_reason_change(self, code, from_reason, to_reason): RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason) RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire()) self.__set_total_keys(code) # 设置代码的今日涨停原因 def __set_total_keys(self, code): keys = set() # keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}") # keys |= keys_his if code in self.today_limit_up_reason_dict: if self.today_limit_up_reason_dict.get(code) not in constant.KPL_INVALID_BLOCKS: keys.add(self.today_limit_up_reason_dict.get(code)) self.total_code_keys_dict[code] = keys for k in keys: if k not in self.total_key_codes_dict: self.total_key_codes_dict[k] = set() self.total_key_codes_dict[k].add(code) logger_kpl_limit_up.info("{}板块关键词:{}", code, keys) # 根据传入的关键词与涨停代码信息匹配身位 def get_codes_by_key_without_mine(self, key, code): # 只比较今日涨停原因 codes_set = set() if key in self.total_key_codes_dict: codes_set |= self.total_key_codes_dict[key] codes_set.discard(code) return codes_set # 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合} def match_limit_up_reason_keys(self, code, keys): fresult = {} for k in keys: if k in self.total_key_codes_dict: codes = set(self.total_key_codes_dict[k]) codes.discard(code) if codes: fresult[k] = codes return fresult # 实时开盘啦市场数据 class RealTimeKplMarketData: # 精选前5 top_5_reason_list = [] # 行业前5 top_5_industry_list = [] # top_5_key_dict = {} total_reason_dict = {} total_industry_dict = {} __KPLPlateForbiddenManager = KPLPlateForbiddenManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __KPLPlatManager = KPLPlatManager() @classmethod def set_top_5_reasons(cls, datas): temp_list = [] for d in datas: cls.total_reason_dict[d[1]] = d # 排序 for i in range(0, len(datas)): if datas[i][1] not in constant.KPL_INVALID_BLOCKS: # (名称,净流入金额,排名) temp_list.append((datas[i][1], datas[i][3], len(temp_list))) # 只获取前10个 if len(temp_list) > 10: break if datas[i][3] < 3 * 10000 * 10000: break for temp in temp_list: names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0]) for name in names: if name == temp[1]: continue temp_list.append((name, temp[1], temp[2])) cls.top_5_reason_list = temp_list cls.__reset_top_5_dict() @classmethod def set_top_5_industry(cls, datas): for d in datas: cls.total_industry_dict[d[1]] = d temp_list = [] for i in range(0, len(datas)): if datas[i][1] in constant.KPL_INVALID_BLOCKS: continue temp_list.append((datas[i][1], datas[i][2], len(temp_list))) if len(temp_list) > 10: break if datas[i][2] < 3 * 10000 * 10000: break cls.top_5_industry_list = temp_list cls.__reset_top_5_dict() @classmethod def __reset_top_5_dict(cls): temp_dict = {} for t in cls.top_5_industry_list: temp_dict[t[0]] = t for t in cls.top_5_reason_list: temp_dict[t[0]] = t cls.top_5_key_dict = temp_dict # 获取能够买的行业关键字set @classmethod def get_can_buy_key_set(cls): temp_set = cls.top_5_key_dict.keys() return temp_set # 通过关键字判断能买的代码数量 @classmethod def get_can_buy_codes_count(cls, code, key): # 判断行业涨停票数量,除开自己必须大于1个 temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key) if temp_codes is None: temp_codes = set() else: temp_codes = set(temp_codes) temp_codes.discard(code) if len(temp_codes) < 1: # 后排才能挂单 return 0, "身位不为后排" forbidden_plates = cls.__KPLPlateForbiddenManager.list_all() if key in forbidden_plates: return 0, "不买该板块" # 10:30以前可以挂2个单 if int(tool.get_now_time_str().replace(':', '')) < int("100000"): return 2, "10:00以前可以挂2个单" # 10:30以后 if key not in cls.top_5_key_dict: return 0, "净流入没在前5" if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000: return 2, "净流入在前5且大于3亿" else: return 1, "净流入在前5" @classmethod def is_in_top(cls, keys): reasons = cls.get_can_buy_key_set() log.logger_kpl_debug.debug("市场流入前5:{}", reasons) forbidden_plates = cls.__KPLPlateForbiddenManager.list_all() reasons = reasons - forbidden_plates temp_set = keys & reasons log.logger_kpl_debug.debug("市场流入前5匹配结果:{}", temp_set) if temp_set: return True, temp_set else: return False, None # 代码历史涨停原因与板块管理 class CodesHisReasonAndBlocksManager: __redisManager = redis_manager.RedisManager(1) # 历史涨停原因 __history_limit_up_reason_dict = {} # 板块 __blocks_dict = {} def __get_redis(self): return self.__redisManager.getRedis() def set_history_limit_up_reason(self, code, reasons): self.__history_limit_up_reason_dict[code] = set(reasons) RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons))) logger_kpl_debug.debug(f"设置历史涨停原因:{code}-{reasons}") # 如果返回值不为None表示已经加载过历史原因了 def get_history_limit_up_reason(self, code): reasons = self.__history_limit_up_reason_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_his_limit_up_reason-{code}") if val is not None: val = set(json.loads(val)) self.__history_limit_up_reason_dict[code] = val if code in self.__history_limit_up_reason_dict: return self.__history_limit_up_reason_dict.get(code) else: return None else: return reasons def set_blocks(self, code, blocks): self.__blocks_dict[code] = set(blocks) RedisUtils.setex(self.__get_redis(), f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) def get_blocks(self, code): reasons = self.__blocks_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_blocks-{code}") if val is not None: val = set(json.loads(val)) self.__blocks_dict[code] = val if code in self.__blocks_dict: return self.__blocks_dict.get(code) else: return None else: return reasons def get_total_keys(self, code): reasons = self.get_history_limit_up_reason(code) if reasons is None: reasons = set() blocks = self.get_blocks(code) if blocks is None: blocks = set() return reasons | blocks # 目标代码板块关键词管理 class TargetCodePlateKeyManager: __redisManager = redis_manager.RedisManager(1) __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() def __get_redis(self): return self.__redisManager.getRedis() # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,板块 def get_plate_keys(self, code): keys = set() k1 = set() if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict: k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]} # 加载今日历史原因 k11 = RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}") k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason(code) if k2 is None: k2 = set() k3 = set() industry = global_util.code_industry_map.get(code) if industry: k3 = {industry} k4 = self.__CodesPlateKeysManager.get_blocks(code) if k4 is None: k4 = set() for k in [k1, k11, k2, k3, k4]: keys |= k # 排除无效的涨停原因 keys = keys - set(constant.KPL_INVALID_BLOCKS) return keys, k1, k11, k2, k3, k4 class CodePlateKeyBuyManager: # 无板块 BLOCK_TYPE_NONE = -1 # 一般板块 BLOCK_TYPE_COMMON = 0 # 强势板块 BLOCK_TYPE_STRONG = 1 # 猛拉板块 BLOCK_TYPE_SOON_LIMIT_UP = 2 # 潜伏板块 BLOCK_TYPE_START_UP = 3 __TargetCodePlateKeyManager = TargetCodePlateKeyManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager() __KPLCodeJXBlockManager = KPLCodeJXBlockManager() # 获取可以买的板块 # current_limit_up_datas: 今日实时涨停 # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日) # limit_up_record_datas:今日历史涨停 # yesterday_current_limit_up_codes : 昨日涨停代码 # before_blocks_dict:历史涨停原因 @classmethod def get_can_buy_block(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict): # 加载涨停代码的目标板块 def load_code_block(): for d in limit_up_record_datas: if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict: code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0] else: code_limit_up_reason_dict[d[3]] = d[2] return code_limit_up_reason_dict now_time = int(tool.get_now_time_str().replace(":", "")) times = [100000, 103000, 110000, 133000, 150000] time_index = 0 for i in range(len(times)): if now_time < times[i]: time_index = i break # 获取目标代码板块 keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code) log.logger_kpl_debug.info("{}关键词:今日-{},今日历史-{},历史-{},二级行业-{},代码板块-{}", code, k1, k11, k2, k3, k4) keys = set() if k1: for k in k1: if k not in constant.KPL_INVALID_BLOCKS: keys.add(k) if not keys: for k in k2: if k not in constant.KPL_INVALID_BLOCKS: keys.add(k) if not keys: # 获取 jx_blocks = cls.__KPLCodeJXBlockManager.get_jx_blocks(code) if jx_blocks: keys |= set([k[1] for k in jx_blocks[:2]]) log.logger_kpl_debug.info("{}最终关键词:{}", code, keys) # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身 if not keys: return cls.BLOCK_TYPE_NONE, None, "尚未找到涨停原因" code_limit_up_reason_dict = {} load_code_block() msg_list = [] can_buy_blocks = [] for block in keys: is_top_8_record, top_8_record = kpl_block_util.is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, 20) is_top_4_current, top_4_current = kpl_block_util.is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, 10) is_top_4 = is_top_8_record and is_top_4_current msg_list.append(f"\n实时top10: {top_4_current}(涨停数量:{len(current_limit_up_datas)})") msg_list.append(f"历史top20: {top_8_record}") # 获取主板实时身位 current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict, shsz=True) record_shsz_rank = kpl_block_util.get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, shsz=True) # 获取主板历史身位 if is_top_4: pen_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_limit_up_reason_dict) if pen_limit_up_codes: # 主板开1 if current_shsz_rank < len(pen_limit_up_codes) + 1 and record_shsz_rank < len( pen_limit_up_codes) + 1: # 属于龙1,龙2 can_buy_blocks.append((block, f"{block}:top4涨停板块,主板开1({pen_limit_up_codes}),属于主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank})")) continue else: msg_list.append( f"板块-{block}: top4涨停板块,主板开1({pen_limit_up_codes}),不为主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})") continue else: if current_shsz_rank == 0 and record_shsz_rank < 2: can_buy_blocks.append((block, f"{block}:top4涨停板块,非主板开1,属于龙1")) continue else: msg_list.append( f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})") continue else: # 是否满足行业精选流入要求 is_in_top_input = RealTimeKplMarketData.is_in_top(set([block]))[0] if not is_in_top_input: msg_list.append( f"板块-{block}: 非top4涨停板块,不满足精选/行业流入要求") continue else: # 是否为主板龙1(实时龙1,历史龙2以内) if current_shsz_rank == 0 and record_shsz_rank < 2: can_buy_blocks.append((block, f"{block}:不是top4涨停板块,满足精选/行业流入要求,满足主板龙1")) continue else: msg_list.append( f"板块-{block}: 不是top4涨停板块,满足精选/行业流入要求,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})") continue if len(can_buy_blocks) == len(keys): blocks = "/".join([x[0] for x in can_buy_blocks]) blocks_msg = "\n".join([x[1] for x in can_buy_blocks]) return blocks, blocks_msg return None, "\n".join(msg_list) # 是否可以下单 # 返回:是否可以下单,消息,板块类型 @classmethod def can_buy(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict): if constant.TEST: return True, "", cls.BLOCK_TYPE_NONE block, block_msg = cls.get_can_buy_block(code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict) if block is None: return False, block_msg # ---------------------------------判断目标代码的板块-------------------start------------ # 判断匹配出的涨停原因,判断是否有已经下单的票 # reason_need_buy_dict = {} # for k in match_limit_up_result: # codes = match_limit_up_result[k] # final_codes_keys = [keys] # for code_ in codes: # temp_key_set = set() # temp_key_set |= cls.__CodesHisReasonAndBlocksManager.get_total_keys(code_) # temp = cls.__LimitUpCodesPlateKeyManager.total_code_keys_dict.get(code_) # if temp: # temp_key_set |= temp # # 二级 # industry = global_util.code_industry_map.get(code_) # if industry: # temp_key_set.add(industry) # # final_codes_keys.append(temp_key_set) # # 求共同的关键词 # intersection = set(final_codes_keys[0]) # for s in final_codes_keys: # intersection &= s # log.logger_kpl_debug.info("{}的板块求交集:{}-{}", code, k, intersection) # # # 求公共的板块是否在流入前5中 # is_in, valid_keys = RealTimeKplMarketData.is_in_top(intersection) # if is_in: # reason_need_buy_dict[k] = (is_in, valid_keys) # ---------------------------------判断目标代码的板块-------------------end------------ # 获取板块可以下单的个数 # can_buy_codes_count_dict = {} # # for key__ in match_limit_up_result: # can_buy_count, msg = RealTimeKplMarketData.get_can_buy_codes_count(code, key__) # can_buy_codes_count_dict[key__] = can_buy_count # has_available_key = False # for key in can_buy_codes_count_dict: # if can_buy_codes_count_dict[key] > 0: # has_available_key = True # break # if not has_available_key: # return False, f"匹配到的【{','.join(match_limit_up_result.keys())}】没在精选/行业可以买入的板块中" # ---------------------------------加载已经下单/成交的代码信息------------start------------- # match_reasons = match_limit_up_result.keys() # 判断匹配到的原因是否已经有下单/买入成功的代码 codes_delegate = set(trade_manager.get_codes_by_trade_states( {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER})) codes_success = set(trade_manager.get_codes_by_trade_states( {trade_manager.TRADE_STATE_BUY_SUCCESS})) codes = codes_delegate | codes_success # 统计成交代码的板块 trade_codes_blocks_dict = {} # 已经成交的板块 trade_success_blocks_count = {} for c in codes: keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c) # 实时涨停原因 trade_codes_blocks_dict[c] = k1_ # 统计板块中的代码 trade_block_codes_dict = {} for c in trade_codes_blocks_dict: for b in trade_codes_blocks_dict[c]: if c in codes_success: if b not in trade_success_blocks_count: trade_success_blocks_count[b] = set() trade_success_blocks_count[b].add(c) if b not in trade_block_codes_dict: trade_block_codes_dict[b] = set() trade_block_codes_dict[b].add(c) # ---------------------------------加载已经下单/成交的代码信息------------end------------- msg_list = [] for key in [block]: # 板块中已经有成交的就不下单了 if key in trade_success_blocks_count: success_codes_count = len(trade_success_blocks_count[key]) if success_codes_count >= 2: msg_list.append(f"【{key}】中已经有{success_codes_count}个成交代码") log.logger_kpl_debug.debug(f"{code}:板块({key})已经有成交【{trade_success_blocks_count[key]}】") continue # 10:30以后买1个 if int(tool.get_now_time_str().replace(":", "")) > int("103000") and success_codes_count >= 1: msg_list.append(f"【{key}】中已经有{success_codes_count}个成交代码") log.logger_kpl_debug.debug(f"{code}:板块({key})已经有成交【{trade_success_blocks_count[key]}】") continue return True, block_msg # 板块可以下单数量 # if trade_block_codes_dict.get(key) is None or len(trade_block_codes_dict.get(key)) < \ # can_buy_codes_count_dict[key]: # order_count = len(trade_block_codes_dict.get(key)) if key in trade_block_codes_dict else 0 # logger_kpl_block_can_buy.info( # f"code={code}:【{key}】可以下单,现有数量:{order_count} 最大数量:{can_buy_codes_count_dict[key]}") # return True, f"可以下单,板块:【{key}】,板块中已经下单的数量:{order_count}" # else: # order_count = len(trade_block_codes_dict.get(key)) # msg_list.append(f"【{key}】中下单代码数量{order_count}/允许下单数量{can_buy_codes_count_dict[key]}") return False, ",".join(msg_list) if __name__ == "__main__": pass