""" 代码行业关键词管理 """ # 涨停代码关键词板块管理 import copy import json import time import constant from db.redis_manager_delegate import RedisUtils from third_data import kpl_block_util, kpl_api from utils import global_util, tool from log_module import log, async_log_util from db import redis_manager_delegate as redis_manager from log_module.log import logger_kpl_block_can_buy from third_data.kpl_util import KPLPlatManager from trade import trade_manager, l2_trade_util # 代码精选板块管理 class KPLCodeJXBlockManager: __db = 3 __redisManager = redis_manager.RedisManager(3) __code_blocks = {} # 备用 __code_by_blocks = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs) return cls.__instance def __get_redis(self): return self.__redisManager.getRedis() def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False): if not blocks: return final_blocks = copy.deepcopy(blocks) if len(blocks) > 2: final_blocks.clear() for b in blocks: if b not in constant.KPL_INVALID_BLOCKS: final_blocks.append(b) if len(final_blocks) < 2: final_blocks = blocks # 保存前2条数据 if by: RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(final_blocks)) self.__code_by_blocks[code] = (final_blocks, time.time()) else: RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks)) self.__code_blocks[code] = (final_blocks, time.time()) # 获取精选板块 def get_jx_blocks(self, code, by=False): if by: if code in self.__code_by_blocks: return self.__code_by_blocks[code] val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}") if val is None: return None else: val = json.loads(val) self.__code_by_blocks[code] = val return self.__code_by_blocks[code] else: if code in self.__code_blocks: return self.__code_blocks[code] val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}") if val is None: return None else: val = json.loads(val) self.__code_blocks[code] = val return self.__code_blocks[code] def get_jx_blocks_cache(self, code, by=False): if by: return self.__code_by_blocks.get(code) else: return self.__code_blocks.get(code) # 从网络上加载精选板块, 当前涨停的板块 def load_jx_blocks(self, code, buy_1_price, limit_up_price, current_limit_up_blocks): try: # logger_kpl_block_can_buy.info(f"准备更新精选板块:{code}-{buy_1_price}-{limit_up_price}") if limit_up_price and buy_1_price: # 处理买1,卖1信息 pre_close_price = round(float(limit_up_price) / 1.1, 2) # 如果涨幅大于7%就读取板块 price_rate = (buy_1_price - pre_close_price) / pre_close_price if price_rate > 0.07: jx_blocks_info = self.get_jx_blocks_cache(code) if not jx_blocks_info: blocks = kpl_api.getCodeBlocks(code) self.save_jx_blocks(code, blocks, current_limit_up_blocks) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块-{blocks}") else: # 还没涨停的需要更新精选板块 更新精选板块 if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001: # 非涨停状态 UPDATE_TIME_SPACE = 5 * 60 time_diff = tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") if time_diff < 0: UPDATE_TIME_SPACE = 60 * 60 else: UPDATE_TIME_SPACE = int(time_diff / 30) + 60 if UPDATE_TIME_SPACE > 5 * 60: UPDATE_TIME_SPACE = 5 * 60 if time.time() - jx_blocks_info[1] > UPDATE_TIME_SPACE: # 距离上次更新时间过去了5分钟 blocks = kpl_api.getCodeBlocks(code) self.save_jx_blocks(code, blocks, current_limit_up_blocks) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(更新)-{blocks}") elif price_rate > 0.03: # 添加备用板块 if not self.get_jx_blocks_cache(code, by=True): blocks = kpl_api.getCodeBlocks(code) self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks}") except Exception as e: logger_kpl_block_can_buy.error(f"{code} 获取板块出错") logger_kpl_block_can_buy.exception(e) # 开盘啦禁止交易板块管理 class KPLPlateForbiddenManager: __redisManager = redis_manager.RedisManager(3) __kpl_forbidden_plates_cache = set() __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLPlateForbiddenManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: __kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates") finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() def save_plate(self, plate): self.__kpl_forbidden_plates_cache.add(plate) RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire()) def list_all(self): return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates") def list_all_cache(self): return self.__kpl_forbidden_plates_cache class LimitUpCodesPlateKeyManager: # 今日涨停原因 today_limit_up_reason_dict = {} today_total_limit_up_reason_dict = {} total_code_keys_dict = {} total_key_codes_dict = {} __redisManager = redis_manager.RedisManager(1) def __get_redis(self): return self.__redisManager.getRedis() # 获取今日涨停数据,格式:[(代码,涨停原因)] def set_today_limit_up(self, datas): temp_dict = {} if datas: for item in datas: temp_dict[item[0]] = item[1] self.today_limit_up_reason_dict = temp_dict if datas: for item in datas: self.__set_total_keys(item[0]) self.set_today_total_limit_up(datas) # 设置今日历史涨停数据 def set_today_total_limit_up(self, datas): for item in datas: code = item[0] self.today_total_limit_up_reason_dict[code] = item[1] # 今日涨停原因变化 def set_today_limit_up_reason_change(self, code, from_reason, to_reason): RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason) RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire()) self.__set_total_keys(code) # 设置代码的今日涨停原因 def __set_total_keys(self, code): keys = set() # keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}") # keys |= keys_his if code in self.today_limit_up_reason_dict: if self.today_limit_up_reason_dict.get(code) not in constant.KPL_INVALID_BLOCKS: keys.add(self.today_limit_up_reason_dict.get(code)) self.total_code_keys_dict[code] = keys for k in keys: if k not in self.total_key_codes_dict: self.total_key_codes_dict[k] = set() self.total_key_codes_dict[k].add(code) # logger_kpl_limit_up.info("{}板块关键词:{}", code, keys) # 根据传入的关键词与涨停代码信息匹配身位 def get_codes_by_key_without_mine(self, key, code): # 只比较今日涨停原因 codes_set = set() if key in self.total_key_codes_dict: codes_set |= self.total_key_codes_dict[key] codes_set.discard(code) return codes_set # 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合} def match_limit_up_reason_keys(self, code, keys): fresult = {} for k in keys: if k in self.total_key_codes_dict: codes = set(self.total_key_codes_dict[k]) codes.discard(code) if codes: fresult[k] = codes return fresult # 实时开盘啦市场数据 class RealTimeKplMarketData: # 精选前5 top_5_reason_list = [] # 行业前5 top_5_industry_list = [] # top_5_key_dict = {} total_reason_dict = {} total_industry_dict = {} __KPLPlateForbiddenManager = KPLPlateForbiddenManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __KPLPlatManager = KPLPlatManager() @classmethod def set_top_5_reasons(cls, datas): temp_list = [] for d in datas: cls.total_reason_dict[d[1]] = d # 排序 for i in range(0, len(datas)): if datas[i][1] not in constant.KPL_INVALID_BLOCKS: # (名称,净流入金额,排名) temp_list.append((datas[i][1], datas[i][3], len(temp_list))) # 只获取前10个 if len(temp_list) > 10: break if datas[i][3] < 3 * 10000 * 10000: break for temp in temp_list: names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0]) for name in names: if name == temp[1]: continue temp_list.append((name, temp[1], temp[2])) cls.top_5_reason_list = temp_list cls.__reset_top_5_dict() @classmethod def set_top_5_industry(cls, datas): for d in datas: cls.total_industry_dict[d[1]] = d temp_list = [] for i in range(0, len(datas)): if datas[i][1] in constant.KPL_INVALID_BLOCKS: continue temp_list.append((datas[i][1], datas[i][2], len(temp_list))) if len(temp_list) > 10: break if datas[i][2] < 3 * 10000 * 10000: break cls.top_5_industry_list = temp_list cls.__reset_top_5_dict() @classmethod def __reset_top_5_dict(cls): temp_dict = {} for t in cls.top_5_industry_list: temp_dict[t[0]] = t for t in cls.top_5_reason_list: temp_dict[t[0]] = t cls.top_5_key_dict = temp_dict # 获取能够买的行业关键字set @classmethod def get_can_buy_key_set(cls): temp_set = cls.top_5_key_dict.keys() return temp_set # 通过关键字判断能买的代码数量 @classmethod def get_can_buy_codes_count(cls, code, key): # 判断行业涨停票数量,除开自己必须大于1个 temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key) if temp_codes is None: temp_codes = set() else: temp_codes = set(temp_codes) temp_codes.discard(code) if len(temp_codes) < 1: # 后排才能挂单 return 0, "身位不为后排" forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache() if key in forbidden_plates: return 0, "不买该板块" # 10:30以前可以挂2个单 if int(tool.get_now_time_str().replace(':', '')) < int("100000"): return 2, "10:00以前可以挂2个单" # 10:30以后 if key not in cls.top_5_key_dict: return 0, "净流入没在前5" if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000: return 2, "净流入在前5且大于3亿" else: return 1, "净流入在前5" @classmethod def is_in_top(cls, keys): reasons = cls.get_can_buy_key_set() forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache() reasons = reasons - forbidden_plates temp_set = keys & reasons if temp_set: return True, temp_set else: return False, None # 代码历史涨停原因与板块管理 class CodesHisReasonAndBlocksManager: __redisManager = redis_manager.RedisManager(1) # 历史涨停原因 __history_limit_up_reason_dict = {} # 板块 __blocks_dict = {} def __get_redis(self): return self.__redisManager.getRedis() def set_history_limit_up_reason(self, code, reasons): self.__history_limit_up_reason_dict[code] = set(reasons) RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons))) # 如果返回值不为None表示已经加载过历史原因了 def get_history_limit_up_reason(self, code): reasons = self.__history_limit_up_reason_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_his_limit_up_reason-{code}") if val is not None: val = set(json.loads(val)) self.__history_limit_up_reason_dict[code] = val if code in self.__history_limit_up_reason_dict: return self.__history_limit_up_reason_dict.get(code) else: return None else: return reasons def get_history_limit_up_reason_cache(self, code): reasons = self.__history_limit_up_reason_dict.get(code) return reasons def set_blocks(self, code, blocks): self.__blocks_dict[code] = set(blocks) RedisUtils.setex(self.__get_redis(), f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) def get_blocks(self, code): reasons = self.__blocks_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_blocks-{code}") if val is not None: val = set(json.loads(val)) self.__blocks_dict[code] = val if code in self.__blocks_dict: return self.__blocks_dict.get(code) else: return None else: return reasons def get_total_keys(self, code): reasons = self.get_history_limit_up_reason(code) if reasons is None: reasons = set() blocks = self.get_blocks(code) if blocks is None: blocks = set() return reasons | blocks # 目标代码板块关键词管理 class TargetCodePlateKeyManager: __redisManager = redis_manager.RedisManager(1) __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() __KPLCodeJXBlockManager = KPLCodeJXBlockManager() def __get_redis(self): return self.__redisManager.getRedis() # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,精选板块 def get_plate_keys(self, code): keys = set() k1 = set() if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict: k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]} # 加载今日历史原因,暂时不需要历史原因了 k11 = set() # RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}") k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason_cache(code) if k2 is None: k2 = set() k3 = set() industry = global_util.code_industry_map.get(code) if industry: k3 = {industry} k4 = set() jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code) if not jingxuan_block_info: jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True) if jingxuan_block_info: jingxuan_blocks = jingxuan_block_info[0] k4 |= jingxuan_blocks #set([x[1] for x in jingxuan_blocks]) for k in [k1, k11, k2, k3, k4]: keys |= k # 排除无效的涨停原因 keys = keys - set(constant.KPL_INVALID_BLOCKS) return keys, k1, k11, k2, k3, k4 class CodePlateKeyBuyManager: # 无板块 BLOCK_TYPE_NONE = -1 # 一般板块 BLOCK_TYPE_COMMON = 0 # 强势板块 BLOCK_TYPE_STRONG = 1 # 猛拉板块 BLOCK_TYPE_SOON_LIMIT_UP = 2 # 潜伏板块 BLOCK_TYPE_START_UP = 3 __TargetCodePlateKeyManager = TargetCodePlateKeyManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager() __CodesTradeStateManager = trade_manager.CodesTradeStateManager() __can_buy_compute_result_dict = {} @classmethod def __remove_from_l2(cls, code, msg): # 根据身位移除代码 # return # 下过单的代码不移除 if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_manager.TRADE_STATE_NOT_TRADE: # 只要下过单的就不移除 return l2_trade_util.forbidden_trade(code, msg=msg) logger_kpl_block_can_buy.info(msg) # 返回内容(是否可买, 是否为独苗, 描述信息) @classmethod def __is_block_can_buy(cls, code, block, current_limit_up_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict, high_level_code_blocks=None, high_level_block_codes=None): # 独苗判断 if high_level_code_blocks is None: high_level_code_blocks = {} if high_level_block_codes is None: high_level_block_codes = {} block_codes = current_limit_up_block_codes_dict.get(block) if not block_codes: # 高位板泛化板块中无板块 if not high_level_block_codes.get(block): return False, True, f"{block}:板块无涨停", False elif len(block_codes) == 1 and code in block_codes: if not high_level_block_codes.get(block): return False, True, f"{block}:板块只有当前代码涨停", False # 可以买的最大排名 # open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, # code_limit_up_reason_dict) current_open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas) # ---------------------------判断强势主线------------------------- is_strong_block = False for d in current_limit_up_datas: if d[5] != block: general_blocks = high_level_code_blocks.get(d[0]) if not general_blocks or block not in general_blocks: # 没在泛化板块中 continue if d[4].find("连板") > 0: if d[4].replace("连板", "").isdigit(): count = int(d[4].replace("连板", "")) if count >= 3: is_strong_block = True break max_rank = 2 # 强势板块买老四 if is_strong_block: max_rank = 3 # 需要排除的老大的代码 exclude_first_codes = set() # HighIncreaseCodeManager().list_all() # 获取主板开1的代码 # 剔除高位板 if current_open_limit_up_codes and yesterday_current_limit_up_codes: current_open_limit_up_codes -= yesterday_current_limit_up_codes # 获取代码的初次涨停时间 first_limit_up_time = time.time() for r in limit_up_record_datas: if r[3] == code: first_limit_up_time = int(r[5]) # 获取主板实时身位,剔除高位板 current_shsz_rank, front_current_shsz_rank_codes = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, exclude_first_codes, len(current_open_limit_up_codes), shsz=True, limit_up_time=first_limit_up_time) # record_shsz_rank, record_shsz_rank_codes = kpl_block_util.get_code_record_rank(code, block, # limit_up_record_datas, # code_limit_up_reason_dict, # yesterday_current_limit_up_codes, # shsz=True) if int(tool.get_now_time_str().replace(":", "")) <= int("094000") and is_strong_block: # 强势主线加强势10分钟 return True, False, f"【{block}】:强势主线+强势10分钟", is_strong_block if current_shsz_rank < len(current_open_limit_up_codes) + max_rank: return True, False, f"【{block}】前排代码:{current_shsz_rank}", is_strong_block else: return False, False, f"【{block}】前排代码:{front_current_shsz_rank_codes} 超过{len(current_open_limit_up_codes) + max_rank}个", is_strong_block # 过时的代码 # if open_limit_up_codes: # # 主板开1 # if current_shsz_rank < len(open_limit_up_codes) + 1 and record_shsz_rank < len(open_limit_up_codes) + 2: # # 属于龙1,龙2 # return True, f"{tool.get_now_time_str()} {block}:top10涨停板块,主板开1({open_limit_up_codes}),属于主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes}/{len(current_limit_up_datas)})" # else: # if record_shsz_rank >= len(open_limit_up_codes) + 1: # cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}") # return False, f"板块-{block}: top4涨停板块,主板开1({open_limit_up_codes}),不为主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})" # else: # if current_shsz_rank == 0 and record_shsz_rank < 2: # return True, f"{tool.get_now_time_str()} {block}:top4涨停板块,非主板开1,属于龙1,实时涨停列表数量({len(current_limit_up_datas)})" # else: # if record_shsz_rank >= 2: # cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}") # # return False, f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})" # 获取可以买的板块 # current_limit_up_datas: 今日实时涨停 # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日) # limit_up_record_datas:今日历史涨停 # yesterday_current_limit_up_codes : 昨日涨停代码 # before_blocks_dict:历史涨停原因 # 返回板块的计算结果[(板块名称,是否可买,是否是独苗,信息)] @classmethod def get_can_buy_block(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, high_level_general_block_codes): # 加载涨停代码的目标板块 def load_code_block(): if limit_up_record_datas: for d in limit_up_record_datas: if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict: code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0] else: code_limit_up_reason_dict[d[3]] = d[2] return code_limit_up_reason_dict if current_limit_up_datas is None: current_limit_up_datas = [] # 获取目标代码板块 keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code) # log.logger_kpl_debug.info("{}关键词:今日-{},今日历史-{},历史-{},二级行业-{},代码板块-{}", code, k1, k11, k2, k3, k4) keys = set() if k1: for k in k1: if k not in constant.KPL_INVALID_BLOCKS: keys.add(k) # 始终获取精选板块 if True: # 获取 if k4 and not keys: # 当涨停原因没有时才取精选板块 keys |= k4 keys = keys - constant.KPL_INVALID_BLOCKS # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys) # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身 fresults = [] if not keys: return fresults, set() code_limit_up_reason_dict = {} load_code_block() for block in keys: can_buy, unique, msg, is_strong = cls.__is_block_can_buy(code, block, current_limit_up_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict, high_level_code_blocks=high_level_general_code_blocks, high_level_block_codes=high_level_general_block_codes) fresults.append((block, can_buy, unique, msg, is_strong)) return fresults, keys # 是否可以下单 # 返回:可以买的板块,是否独苗,消息 @classmethod def can_buy(cls, code): if constant.TEST: return ["测试"], True, cls.BLOCK_TYPE_NONE, [], set() # if True: # # 测试 # return True, "不判断板块身位" return cls.__can_buy_compute_result_dict.get(code) # 返回:(可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线) @classmethod def __compute_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks): # 根据代码泛化板块获取泛化板块的代码集合 high_level_general_block_codes = {} for c in high_level_general_code_blocks: blocks = high_level_general_code_blocks[c] for b in blocks: if b not in high_level_general_block_codes: high_level_general_block_codes[b] = set() high_level_general_block_codes[b].add(c) blocks_compute_results, keys = cls.get_can_buy_block(code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, high_level_general_block_codes) if not blocks_compute_results: return False, True, f"没有找到板块", [], keys codes_delegate = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache( {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER})) codes_success = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache( {trade_manager.TRADE_STATE_BUY_SUCCESS})) codes = codes_delegate | codes_success # 统计成交代码的板块 trade_codes_blocks_dict = {} # 已经成交的板块 trade_success_blocks_count = {} trade_delegate_blocks_count = {} for c in codes: keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c) # 实时涨停原因 + 推荐原因 if not k1_: trade_codes_blocks_dict[c] = k4_ else: trade_codes_blocks_dict[c] = k1_ # 统计板块中的代码 trade_block_codes_dict = {} for c in trade_codes_blocks_dict: for b in trade_codes_blocks_dict[c]: if c in codes_success: if b not in trade_success_blocks_count: trade_success_blocks_count[b] = set() trade_success_blocks_count[b].add(c) if c in codes_delegate: if b not in trade_delegate_blocks_count: trade_delegate_blocks_count[b] = set() trade_delegate_blocks_count[b].add(c) if b not in trade_block_codes_dict: trade_block_codes_dict[b] = set() trade_block_codes_dict[b].add(c) # ---------------------------------加载已经下单/成交的代码信息------------end------------- # can_buy_blocks = [] can_buy_strong_blocks = [] unique_count = 0 msg_list = [] for r in blocks_compute_results: # r的数据结构(板块,是否可以买,是否独苗,消息,是否是强势板块) if r[2]: # 独苗 unique_count += 1 if r[1]: # 强势主线最多同时挂3只票,最多成交2只票 MAX_DELEGATE_COUNT = 3 if r[4] else 2 MAX_DEAL_COUNT = 2 if r[4] else 1 if r[0] in trade_success_blocks_count and len(trade_success_blocks_count[r[0]]) >= MAX_DEAL_COUNT: msg_list.append(f"【{r[0]}】有成交代码:{trade_success_blocks_count[r[0]]}") continue if r[0] in trade_delegate_blocks_count and len(trade_delegate_blocks_count[r[0]]) >= MAX_DELEGATE_COUNT: msg_list.append(f"【{r[0]}】已挂单:{trade_delegate_blocks_count[r[0]]}") continue can_buy_blocks.append(r[0]) if r[4]: can_buy_strong_blocks.append(r[0]) if r[3]: msg_list.append(r[3]) else: if r[3]: msg_list.append(r[3]) # 所有板块都是独苗 if unique_count == len(blocks_compute_results): return can_buy_blocks, True, ",".join(msg_list), can_buy_strong_blocks, keys return can_buy_blocks, False, ",".join(msg_list), can_buy_strong_blocks, keys # 更新代码板块判断是否可以买的结果 # high_level_general_code_blocks 高位泛化板块 @classmethod def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_records, before_blocks_dict, current_limit_up_block_codes_dict): yesterday_current_limit_up_codes = set() yesterday_current_limit_up_records_dict = {} if yesterday_current_limit_up_records: for r in yesterday_current_limit_up_records: yesterday_current_limit_up_codes.add(r[0]) yesterday_current_limit_up_records_dict[r[0]] = r high_level_general_code_blocks = {} # 是否是3板及以上的高位板 for r in current_limit_up_datas: if r[4].find("连板") > 0: if r[4].replace("连板", "").isdigit(): count = int(r[4].replace("连板", "")) if count >= 3: # 是高位板 # 当日精选 blocks = set(r[6].split("、")) if r[0] in yesterday_current_limit_up_records_dict: # 昨日涨停原因 blocks.add(yesterday_current_limit_up_records_dict.get(r[0])[5]) f_blocks = [] for b in blocks: if b: f_blocks.append(b) high_level_general_code_blocks[r[0]] = f_blocks can_buy_blocks, unique, msg, can_buy_strong_blocks, keys = cls.__compute_can_buy_blocks(code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks) # 保存板块计算结果 cls.__can_buy_compute_result_dict[code] = (can_buy_blocks, unique, msg, can_buy_strong_blocks, keys) if __name__ == "__main__": pass