""" 代码行业关键词管理 """ # 涨停代码关键词板块管理 import copy import datetime import itertools import json import time import constant from code_attribute import gpcode_manager from db.redis_manager_delegate import RedisUtils from third_data import kpl_block_util, kpl_api, kpl_util from settings.trade_setting import MarketSituationManager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, ContainsLimitupCodesBlocksManager from third_data.third_blocks_manager import BlockMapManager from utils import global_util, tool, buy_condition_util from log_module import async_log_util from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug, \ logger_kpl_latest_gaobiao from third_data.kpl_util import KPLPlatManager from trade import l2_trade_util, trade_constant # 代码精选板块管理 from utils.kpl_data_db_util import KPLLimitUpDataUtil class KPLCodeJXBlockManager: __db = 3 __redisManager = redis_manager.RedisManager(3) __code_blocks = {} # 备用 __code_by_blocks = {} # 激进买的代码板块 __code_blocks_for_radical_buy = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs) try: cls.__load_data() except Exception as e: logger_debug.exception(e) return cls.__instance @classmethod def __load_data(cls): keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_by-*") if keys: for k in keys: val = RedisUtils.get(cls.__get_redis(), k) val = json.loads(val) cls.__code_by_blocks[k.split("-")[1]] = (val, time.time()) keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks-*") if keys: for k in keys: val = RedisUtils.get(cls.__get_redis(), k) val = json.loads(val) cls.__code_blocks[k.split("-")[1]] = (val, time.time()) keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_radical-*") if keys: for k in keys: val = RedisUtils.get(cls.__get_redis(), k) val = json.loads(val) cls.__code_blocks_for_radical_buy[k.split("-")[1]] = (val, time.time()) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False): if not blocks: return final_blocks = copy.deepcopy(blocks) if len(blocks) > 2: final_blocks.clear() for b in blocks: if b not in constant.KPL_INVALID_BLOCKS: final_blocks.append(b) if len(final_blocks) < 2: final_blocks = blocks # 保存前2条数据 if by: RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(final_blocks)) self.__code_by_blocks[code] = (final_blocks, time.time()) else: RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks)) self.__code_blocks[code] = (final_blocks, time.time()) def save_jx_blocks_for_radical_buy(self, code, blocks: list): if not blocks: return RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_radical-{code}", tool.get_expire(), json.dumps(blocks)) self.__code_blocks_for_radical_buy[code] = (blocks, time.time()) # 获取精选板块(激进买) def get_jx_blocks_radical(self, code): blocks_info = self.__code_blocks_for_radical_buy.get(code) if blocks_info: return set(blocks_info[0]) return None def get_jx_blocks_cache(self, code, by=False): if by: return self.__code_by_blocks.get(code) else: return self.__code_blocks.get(code) # 从网络上加载精选板块, 当前涨停的板块 def load_jx_blocks(self, code, buy_1_price, limit_up_price, current_limit_up_blocks): try: # logger_kpl_block_can_buy.info(f"准备更新精选板块:{code}-{buy_1_price}-{limit_up_price}") if limit_up_price and buy_1_price: # 处理买1,卖1信息 pre_close_price = round(float(limit_up_price) / tool.get_limit_up_rate(code), 2) # 如果涨幅大于7%就读取板块 price_rate = (buy_1_price - pre_close_price) / pre_close_price if price_rate > 0.07: jx_blocks_info = self.get_jx_blocks_cache(code) if not jx_blocks_info: start_time = time.time() blocks = kpl_api.getCodeBlocks(code) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块-{blocks} 耗时:{int(time.time() - start_time)}s") self.save_jx_blocks(code, blocks, current_limit_up_blocks) # 跟随精选板块一起更新 self.load_jx_blocks_radical(code) else: # 还没涨停的需要更新精选板块 更新精选板块 if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001: # 非涨停状态 UPDATE_TIME_SPACE = 5 * 60 time_diff = tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") if time_diff < 0: UPDATE_TIME_SPACE = 60 * 60 else: UPDATE_TIME_SPACE = int(time_diff / 30) + 60 if UPDATE_TIME_SPACE > 5 * 60: UPDATE_TIME_SPACE = 5 * 60 if time.time() - jx_blocks_info[1] > UPDATE_TIME_SPACE: start_time = time.time() # 距离上次更新时间过去了5分钟 blocks = kpl_api.getCodeBlocks(code) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(更新)-{blocks} 耗时:{int(time.time() - start_time)}s") self.save_jx_blocks(code, blocks, current_limit_up_blocks) # 跟随精选板块一起更新 self.load_jx_blocks_radical(code) elif price_rate > 0.03: # 添加备用板块 if not self.get_jx_blocks_cache(code, by=True): start_time = time.time() blocks = kpl_api.getCodeBlocks(code) self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks} 耗时:{int(time.time() - start_time)}s") # 跟随精选板块一起更新 self.load_jx_blocks_radical(code) if price_rate > 0.03: if not self.__code_blocks_for_radical_buy.get(code): self.load_jx_blocks_radical(code) except Exception as e: logger_kpl_block_can_buy.error(f"{code} 获取板块出错") logger_kpl_block_can_buy.exception(e) def load_jx_blocks_radical(self, code): start_time = time.time() blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False) blocks = set([b[1] for b in blocks]) # fblocks = BlockMapManager().filter_blocks(blocks) async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s") self.save_jx_blocks_for_radical_buy(code, list(blocks)) # 禁止下单的板块 class ForbiddenBlockManager: __db = 3 __redisManager = redis_manager.RedisManager(3) __instance = None __forbidden_blocks = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(ForbiddenBlockManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 加载数据 @classmethod def __load_data(cls): blocks = cls.__get_redis().smembers("forbidden_blocks") if blocks: for b in blocks: cls.__forbidden_blocks.add(b) def add(self, block): self.__forbidden_blocks.add(block) RedisUtils.sadd_async(self.__db, "forbidden_blocks", block) RedisUtils.expire_async(self.__db, "forbidden_blocks", tool.get_expire()) def remove(self, block): if block in self.__forbidden_blocks: self.__forbidden_blocks.remove(block) RedisUtils.srem_async(self.__db, "forbidden_blocks", block) def get_blocks(self): return copy.deepcopy(self.__forbidden_blocks) def is_in(self, block): return block in self.__forbidden_blocks # 开盘啦禁止交易板块管理 class KPLPlateForbiddenManager: """ 不能买的板块管理 """ __redis_manager = redis_manager.RedisManager(3) __kpl_forbidden_plates_cache = set() # 已经删除了的板块 __deleted_kpl_forbidden_plates_cache = set() # 监控的高标板块代码字典:{"板块":{"代码1","代码2"}} __watch_block_high_codes = {} # 高标代码 __watch_high_codes = set() __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLPlateForbiddenManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: cls.__kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates") cls.__deleted_kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "deleted_kpl_forbidden_plates") finally: RedisUtils.realse(__redis) cls.__load_latest_gb() @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def save_plate(self, plate): self.__kpl_forbidden_plates_cache.add(plate) RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire()) self.__deleted_kpl_forbidden_plates_cache.discard(plate) RedisUtils.srem(self.__get_redis(), "deleted_kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire()) def delete_plate(self, plate): self.__kpl_forbidden_plates_cache.discard(plate) RedisUtils.srem(self.__get_redis(), "kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire()) self.__deleted_kpl_forbidden_plates_cache.add(plate) RedisUtils.sadd(self.__get_redis(), "deleted_kpl_forbidden_plates", plate) RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire()) def list_all(self): return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates") def list_all_cache(self): return self.__kpl_forbidden_plates_cache def list_all_deleted_cache(self): return self.__deleted_kpl_forbidden_plates_cache def is_in_cache(self, plate): if self.__kpl_forbidden_plates_cache and plate in self.__kpl_forbidden_plates_cache: return True return False @classmethod def __load_latest_gb(cls): """ 加载最近的市场高标 @return: """ # 获取最近10个交易日涨停的涨停数据 dates = HistoryKDatasUtils.get_latest_trading_date_cache(10) if not dates: return min_date = dates[-1] sql = f"SELECT r.`_code`, r.`_hot_block_name`, r.`_day`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{min_date}'" mysqldb = mysql_data.Mysqldb() results = mysqldb.select_all(sql) code_days_map = {} # 每炸板 f_code_days_map = {} for r in results: if r[0] not in code_days_map: code_days_map[r[0]] = set() code_days_map[r[0]].add(r[2]) if not r[3]: if r[0] not in f_code_days_map: f_code_days_map[r[0]] = set() f_code_days_map[r[0]].add(r[2]) # 过滤涨停次数>=3次的数据 target_codes = set() for code in code_days_map: if f_code_days_map.get(code) and (len(f_code_days_map.get(code)) >= 4 or ( tool.is_ge_code(code) and len(f_code_days_map.get(code)) >= 2)): # 且有3天属于连续涨停 day_list = list(code_days_map[code]) day_list.sort(reverse=True) step = 3 has_continue = False for i in range(0, len(day_list) - step + 1): item_list = day_list[i:i + step] # 是否属于连续涨停 is_sub = False for j in range(0, len(dates) - step): if f"{dates[j:j + step]}" == f"{item_list}": is_sub = True break if is_sub: has_continue = True break if not has_continue: continue has_big_deal = False # 最近10个交易日的成交额要大于10亿 volumes_data = HistoryKDataManager().get_history_bars(code, dates[0]) if volumes_data: for d in volumes_data[:10]: if d["amount"] > 10e8: has_big_deal = True break if not has_big_deal: continue target_codes.add(code) # 代码对应的板块 code_blocks = {} for r in results: if r[0] not in target_codes: continue if r[0] not in code_blocks: code_blocks[r[0]] = set() code_blocks[r[0]].add(kpl_util.filter_block(r[1])) # 所有板块对应的代码集合 block_codes = {} for code in code_blocks: for b in code_blocks[code]: if b in constant.KPL_INVALID_BLOCKS: continue if b not in block_codes: block_codes[b] = set() block_codes[b].add(code) print(block_codes) cls.__watch_block_high_codes = block_codes logger_kpl_latest_gaobiao.info(f"{block_codes}") cls.__watch_high_codes.clear() for b in block_codes: cls.__watch_high_codes |= block_codes[b] for k in block_codes: print(k, [(x, gpcode_manager.get_code_name(x)) for x in block_codes[k]]) def get_watch_high_codes(self): return self.__watch_high_codes def get_watch_high_codes_by_block(self, b): return self.__watch_block_high_codes.get(b) def compute(self, code_rate_dict: dict): """ 根据比例计算需要拉黑的代码 @param code_rate_dict: 涨幅百分数 @return: """ try: if self.__watch_block_high_codes: forbidden_blocks = set() for b in self.__watch_block_high_codes: total_rate = 0 for code in self.__watch_block_high_codes[b]: if code in code_rate_dict: total_rate += code_rate_dict.get(code) average_rate = total_rate / len(self.__watch_block_high_codes[b]) if average_rate < 1: forbidden_blocks.add(b) # async_log_util.info(logger_debug, f"板块平均涨幅 {b}-{average_rate}") self.__kpl_forbidden_plates_cache = forbidden_blocks async_log_util.info(logger_debug, f"拉黑板块:{forbidden_blocks}") except Exception as e: logger_debug.exception(e) class LimitUpCodesPlateKeyManager: # 今日涨停原因 today_limit_up_reason_dict = {} __today_total_limit_up_reason_dict = {} total_code_keys_dict = {} total_key_codes_dict = {} __redisManager = redis_manager.RedisManager(1) def __get_redis(self): return self.__redisManager.getRedis() # 获取今日涨停数据,格式:[(代码,涨停原因,精选板块列表)] def set_today_limit_up(self, datas): temp_dict = {} if datas: for item in datas: temp_dict[item[0]] = item[1] self.today_limit_up_reason_dict = temp_dict if datas: for item in datas: self.__set_total_keys(item[0]) self.set_today_total_limit_up(datas) # 设置今日历史涨停数据 # 格式:(代码,涨停原因,精选板块列表) @classmethod def set_today_total_limit_up(cls, datas): for item in datas: code = item[0] # 设置涨停代码的板块及原因 cls.__today_total_limit_up_reason_dict[code] = (item[1], item[2]) @classmethod def get_today_limit_up_reason(cls, code): return cls.__today_total_limit_up_reason_dict.get(code) # 设置代码的今日涨停原因 def __set_total_keys(self, code): keys = set() # keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}") # keys |= keys_his if code in self.today_limit_up_reason_dict: if self.today_limit_up_reason_dict.get(code) not in constant.KPL_INVALID_BLOCKS: keys.add(self.today_limit_up_reason_dict.get(code)) self.total_code_keys_dict[code] = keys for k in keys: if k not in self.total_key_codes_dict: self.total_key_codes_dict[k] = set() self.total_key_codes_dict[k].add(code) # logger_kpl_limit_up.info("{}板块关键词:{}", code, keys) # 根据传入的关键词与涨停代码信息匹配身位 def get_codes_by_key_without_mine(self, key, code): # 只比较今日涨停原因 codes_set = set() if key in self.total_key_codes_dict: codes_set |= self.total_key_codes_dict[key] codes_set.discard(code) return codes_set # 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合} def match_limit_up_reason_keys(self, code, keys): fresult = {} for k in keys: if k in self.total_key_codes_dict: codes = set(self.total_key_codes_dict[k]) codes.discard(code) if codes: fresult[k] = codes return fresult # 实时开盘啦市场数据 class RealTimeKplMarketData: # 流入缓存 [ID, 板块名称, 板块涨幅, 流入金额] top_in_list_cache = [] # 流出缓存 top_out_list_cache = [] # 行业前5 top_5_industry_list = [] # top_5_key_dict = {} total_reason_dict = {} total_industry_dict = {} __KPLPlateForbiddenManager = KPLPlateForbiddenManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __KPLPlatManager = KPLPlatManager() # 精选流入前几 __top_jx_blocks = [] # 精选流出前几 __top_jx_out_blocks = [] # 精选板块流入金额 __jx_blocks_in_money_dict = {} # 市场行情热度,默认为60 __market_strong = 60 @classmethod def get_jingxuan_in_block_threshold_count(cls): """ 获取买精选流入前几 @return: """ score = 60 if cls.__market_strong is not None: score = int(cls.__market_strong) for info in constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG: if info[0] <= score < info[1]: return info[2] return 10 @classmethod def set_market_jingxuan_blocks(cls, datas): """ 设置精选流入数据 @param datas:[(板块编号,板块名称,涨幅, 板块流入金额)] @return: """ # 流入阈值 # THRESHOLD_MONEY = 50 * (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") // 60) + 1000 # THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000) # THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000) # THRESHOLD_MONEY = THRESHOLD_MONEY * 10000 THRESHOLD_MONEY = 0 # 最大数量 # MAX_COUNT = cls.get_jingxuan_in_block_threshold_count() cls.top_in_list_cache = datas blocks = set() count = 0 fblock_money = {} for data in datas: cls.__jx_blocks_in_money_dict[data[1]] = data[3] if data[1] in constant.KPL_INVALID_BLOCKS: continue if data[3] < THRESHOLD_MONEY: continue # 过滤出来为同一个板块就只算1个数量 fb = BlockMapManager().filter_blocks({data[1]}) if blocks & fb: continue for b in fb: fblock_money[b] = data[3] blocks |= fb # 如果该原因没有涨停票要往后移一位 has_code = False for b in fb: if ContainsLimitupCodesBlocksManager().get_block_codes(b): has_code = True break if has_code: count += 1 if count == 10: strong = cls.get_market_strong() if strong is None: strong = 60 if data[3] > 3e7: # 大于3千万 THRESHOLD_MONEY = int((1 - strong / 200) * data[3]) else: THRESHOLD_MONEY = data[3] # if count >= MAX_COUNT: # break # 记录精选流出日志 async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:50]} 板块:{blocks}") blocks = list(blocks) blocks.sort(key=lambda x: fblock_money.get(x), reverse=True) cls.__top_jx_blocks = blocks @classmethod def set_market_jingxuan_out_blocks(cls, datas): """ 设置精选流出数据 @param datas: @return: """ cls.top_out_list_cache = datas count = 0 blocks = set() for data in datas: cls.__jx_blocks_in_money_dict[data[1]] = data[3] if data[1] in constant.KPL_INVALID_BLOCKS: continue if data[3] > -5e7: # 过滤5千万以上的 break # 过滤出来为同一个板块就只算1个数量 fb = BlockMapManager().filter_blocks({data[1]}) if blocks & fb: continue blocks |= fb count += 1 if count >= 10: break # 记录精选流出日志 async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}") cls.__top_jx_out_blocks = list(blocks) @classmethod def set_market_strong(cls, strong): """ 设置市场行情强度 @param strong: @return: """ cls.__market_strong = strong @classmethod def is_ignore_block_in_money(cls): if cls.__market_strong and cls.__market_strong >= constant.IGNORE_BLOCK_IN_MONEY_MARKET_STRONG: return True return False @classmethod def get_market_strong(cls): return cls.__market_strong @classmethod def get_top_market_jingxuan_blocks(cls): return cls.__top_jx_blocks @classmethod def get_top_market_jingxuan_out_blocks(cls): return cls.__top_jx_out_blocks @classmethod def get_block_info_at_block_in(cls, b): """ 获取板块的净流入情况 @param b: @return: (板块名称,身位,流入金额) """ for i in range(0, len(cls.top_in_list_cache)): if cls.top_in_list_cache[i][1] == b: return b, i, cls.top_in_list_cache[i][3] return b, -1, 0 @classmethod def set_top_5_industry(cls, datas): for d in datas: cls.total_industry_dict[d[1]] = d temp_list = [] for i in range(0, len(datas)): if datas[i][1] in constant.KPL_INVALID_BLOCKS: continue temp_list.append((datas[i][1], datas[i][2], len(temp_list))) if len(temp_list) > 10: break if datas[i][2] < 3 * 10000 * 10000: break cls.top_5_industry_list = temp_list cls.__reset_top_5_dict() @classmethod def __reset_top_5_dict(cls): temp_dict = {} for t in cls.top_5_industry_list: temp_dict[t[0]] = t for t in cls.top_5_reason_list: temp_dict[t[0]] = t cls.top_5_key_dict = temp_dict # 获取能够买的行业关键字set @classmethod def get_can_buy_key_set(cls): temp_set = cls.top_5_key_dict.keys() return temp_set @classmethod def is_in_top(cls, keys): reasons = cls.get_can_buy_key_set() forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache() reasons = reasons - forbidden_plates temp_set = keys & reasons if temp_set: return True, temp_set else: return False, None @classmethod def get_jx_block_in_money(cls, block): return cls.__jx_blocks_in_money_dict.get(block) # 代码历史涨停原因与板块管理 class CodesHisReasonAndBlocksManager: __redisManager = redis_manager.RedisManager(1) # 历史涨停原因 __history_limit_up_reason_dict = {} # 板块 __blocks_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodesHisReasonAndBlocksManager, cls).__new__(cls, *args, **kwargs) return cls.__instance def __get_redis(self): return self.__redisManager.getRedis() def set_history_limit_up_reason(self, code, reasons): self.__history_limit_up_reason_dict[code] = set(reasons) RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons))) # 如果返回值不为None表示已经加载过历史原因了 def get_history_limit_up_reason(self, code): reasons = self.__history_limit_up_reason_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_his_limit_up_reason-{code}") if val is not None: val = set(json.loads(val)) self.__history_limit_up_reason_dict[code] = val if code in self.__history_limit_up_reason_dict: return self.__history_limit_up_reason_dict.get(code) else: return None else: return reasons def get_history_limit_up_reason_cache(self, code): reasons = self.__history_limit_up_reason_dict.get(code) return reasons def set_blocks(self, code, blocks): self.__blocks_dict[code] = set(blocks) RedisUtils.setex(self.__get_redis(), f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) def get_blocks(self, code): reasons = self.__blocks_dict.get(code) if reasons is None: # 从内存中加载 val = RedisUtils.get(self.__get_redis(), f"kpl_blocks-{code}") if val is not None: val = set(json.loads(val)) self.__blocks_dict[code] = val if code in self.__blocks_dict: return self.__blocks_dict.get(code) else: return None else: return reasons def get_total_keys(self, code): reasons = self.get_history_limit_up_reason(code) if reasons is None: reasons = set() blocks = self.get_blocks(code) if blocks is None: blocks = set() return reasons | blocks __history_blocks_dict_cache = {} def get_history_blocks(self, code): """ 获取180天的历史涨停原因 @param code: @return: """ if code in self.__history_blocks_dict_cache: return self.__history_blocks_dict_cache.get(code) try: kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(code=code) # 取最近2条数据 if kpl_results and len(kpl_results) > 2: kpl_results = kpl_results[-2:] keys = set() if kpl_results: keys |= set([x[2] for x in kpl_results]) for r in kpl_results: if r[3]: keys |= set(r[3].split("、")) self.__history_blocks_dict_cache[code] = keys return keys except: pass return set() def get_history_blocks_cache(self, code): """ 获取180天的历史涨停原因缓存 @param code: @return: """ return self.__history_blocks_dict_cache.get(code) # 目标代码板块关键词管理 class TargetCodePlateKeyManager: __redisManager = redis_manager.RedisManager(1) __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() __KPLCodeJXBlockManager = KPLCodeJXBlockManager() def __get_redis(self): return self.__redisManager.getRedis() # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,精选板块 def get_plate_keys(self, code, contains_today=True): """ 获取代码的板块: (180天的涨停原因+推荐原因)+今日涨停原因+今日涨停推荐原因+今日推荐原因 @param code: @return: (板块关键词集合,今日涨停原因+涨停推荐原因,今日历史涨停原因,历史涨停原因,精选板块) """ keys = set() k1 = set() limit_up_reason_info = LimitUpCodesPlateKeyManager.get_today_limit_up_reason(code) if limit_up_reason_info: k1 = {limit_up_reason_info[0]} | set(limit_up_reason_info[1]) # 加载今日历史原因,暂时不需要历史原因了 k11 = set() # RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}") k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason_cache(code) if k2 is None: k2 = set() k3 = self.__CodesPlateKeysManager.get_history_blocks(code) if k3: keys |= k3 # industry = global_util.code_industry_map.get(code) # if industry: # k3 = {industry} k4 = set() jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code) if not jingxuan_block_info: jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True) if jingxuan_block_info: jingxuan_blocks = jingxuan_block_info[0] k4 |= set(jingxuan_blocks) # set([x[1] for x in jingxuan_blocks]) if k1 and contains_today: # 涨停过 keys |= k1 # 获取不到涨停原因 if contains_today: keys |= k4 keys = keys - set(constant.KPL_INVALID_BLOCKS) return keys, k1, k11, k2, k3, k4 def get_plate_keys_for_radical_buy(self, code): """ 激进买入的板块 @param code: @return: """ class CodePlateKeyBuyManager: # 无板块 BLOCK_TYPE_NONE = -1 # 一般板块 BLOCK_TYPE_COMMON = 0 # 强势板块 BLOCK_TYPE_STRONG = 1 # 猛拉板块 BLOCK_TYPE_SOON_LIMIT_UP = 2 # 潜伏板块 BLOCK_TYPE_START_UP = 3 __TargetCodePlateKeyManager = TargetCodePlateKeyManager() __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager() __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager() __can_buy_compute_result_dict = {} # 是否需要积极买 @classmethod def __is_need_active_buy(cls, code, block, current_rank, open_limit_up_count): """ 板块是否需要积极买入 规则:根据身位判断是否需要积极买,根据时间划分 @param code: 代码 @param block: 板块名称 @param current_rank: 目前在板块中的身位,从0开始 @param open_limit_up_count: 开1的数量 @return: """ real_current_rank = max(current_rank - open_limit_up_count, 0) TIME_STR_RANGES = ["10:00:00", "10:30:00", "11:00:00", "13:00:00", "13:30:00", "14:00:00", "14:30:00", "15:00:00"] TIME_INT_RANGES = [int(x.replace(':', '')) for x in TIME_STR_RANGES] MAX_RANKS = [3, 3, 2, 2, 1, 0, 0, 0] now_time_str = tool.get_now_time_str().replace(':', '') for i in range(len(TIME_INT_RANGES)): if int(now_time_str) <= TIME_INT_RANGES[i]: if MAX_RANKS[i] > real_current_rank: return True break return False # 返回内容(是否可买, 是否为独苗, 描述信息, 是否为强势主线, 是否需要积极买) @classmethod def __is_block_can_buy(cls, code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict, high_level_code_blocks=None, high_level_block_codes=None): # 独苗判断 if high_level_code_blocks is None: high_level_code_blocks = {} if high_level_block_codes is None: high_level_block_codes = {} block_codes = current_limit_up_block_codes_dict.get(block) if block_codes is None: block_codes = set() if not block_codes: # 高位板泛化板块中无板块 if not high_level_block_codes.get(block): return False, True, f"【{block}】:板块无涨停", False, False elif len(block_codes) == 1 and code in block_codes: if not high_level_block_codes.get(block): return False, True, f"{block}:板块只有当前代码涨停", False, False # 可以买的最大排名 # open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, # code_limit_up_reason_dict) current_open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas) # ---------------------------判断强势主线------------------------- is_strong_block = False for d in current_limit_up_datas: bs = kpl_util.get_current_limit_up_reasons(d) if block not in bs: general_blocks = high_level_code_blocks.get(d[0]) if not general_blocks or block not in general_blocks: # 没在泛化板块中 continue count = kpl_util.get_high_level_count(d[4]) if count >= 3: if d[4].find("连板") > 0: is_strong_block = True break elif d[0] in yesterday_current_limit_up_codes and len(block_codes) >= 2: # 几天几板,且最近2连板 # 看是否有首板后排 is_strong_block = True break if not is_strong_block: temp_block_codes = set(copy.deepcopy(block_codes)) temp_block_codes.discard(code) if len(temp_block_codes) >= 3: is_strong_block = True max_rank = 2 # 强势板块买老四 if is_strong_block: max_rank = 3 # 需要排除的老大的代码 exclude_first_codes = set() # HighIncreaseCodeManager().list_all() # 获取主板开1的代码 # 剔除高位板 if current_open_limit_up_codes and yesterday_current_limit_up_codes: current_open_limit_up_codes -= yesterday_current_limit_up_codes # 获取代码的初次涨停时间 first_limit_up_time = time.time() # if limit_up_record_datas: for r in limit_up_record_datas: if r[3] == code: first_limit_up_time = int(r[5]) # 获取主板实时身位,剔除高位板 current_shsz_rank, front_current_shsz_rank_codes = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, exclude_first_codes, len( current_open_limit_up_codes), shsz=True, limit_up_time=first_limit_up_time) # 计算是否需要积极买入 is_active_buy = cls.__is_need_active_buy(code, block, current_shsz_rank, len(current_open_limit_up_codes)) # record_shsz_rank, record_shsz_rank_codes = kpl_block_util.get_code_record_rank(code, block, # limit_up_record_datas, # code_limit_up_reason_dict, # yesterday_current_limit_up_codes, # shsz=True) if int(tool.get_now_time_str().replace(":", "")) <= int("094000") and is_strong_block: # 强势主线加强势10分钟 return True, False, f"【{block}】:强势主线+强势10分钟", is_strong_block, is_active_buy if current_shsz_rank < len(current_open_limit_up_codes) + max_rank: return True, False, f"【{block}】前排代码:{current_shsz_rank}", is_strong_block, is_active_buy else: # k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) # if k_format and k_format[8][0]: # # 具有辨识度 # return True, False, f"【{block}】具有辨识度", is_strong_block # 看自由流通市值是否小于20亿 if is_strong_block and current_shsz_rank < len(current_open_limit_up_codes) + max_rank + 1: zyltgb_as_yi = round(global_util.zyltgb_map.get(code) / 100000000, 2) if code in global_util.zyltgb_map else None situation = MarketSituationManager().get_situation_cache() zylt_threshold_as_yi = buy_condition_util.get_zyltgb_threshold(situation) if zyltgb_as_yi and zylt_threshold_as_yi[2] <= zyltgb_as_yi <= zylt_threshold_as_yi[3]: return True, False, f"【{block}】强势板块 自由流通市值({zyltgb_as_yi})大于{zylt_threshold_as_yi[2]}亿 小于{zylt_threshold_as_yi[3]}亿", is_strong_block, is_active_buy return False, False, f"【{block}】前排代码:{front_current_shsz_rank_codes} 超过{len(current_open_limit_up_codes) + max_rank}个", is_strong_block, is_active_buy # 过时的代码 # if open_limit_up_codes: # # 主板开1 # if current_shsz_rank < len(open_limit_up_codes) + 1 and record_shsz_rank < len(open_limit_up_codes) + 2: # # 属于龙1,龙2 # return True, f"{tool.get_now_time_str()} {block}:top10涨停板块,主板开1({open_limit_up_codes}),属于主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes}/{len(current_limit_up_datas)})" # else: # if record_shsz_rank >= len(open_limit_up_codes) + 1: # cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}") # return False, f"板块-{block}: top4涨停板块,主板开1({open_limit_up_codes}),不为主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})" # else: # if current_shsz_rank == 0 and record_shsz_rank < 2: # return True, f"{tool.get_now_time_str()} {block}:top4涨停板块,非主板开1,属于龙1,实时涨停列表数量({len(current_limit_up_datas)})" # else: # if record_shsz_rank >= 2: # cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}") # # return False, f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})" @classmethod def __is_block_can_buy_new(cls, code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict, high_level_code_blocks=None, high_level_block_codes=None): """ 该票的板块是否可以买 @param code: @param block: @param current_limit_up_datas: @param code_limit_up_reasons_dict: @param yesterday_current_limit_up_codes: @param limit_up_record_datas: @param current_limit_up_block_codes_dict: @param high_level_code_blocks: @param high_level_block_codes: @return: """ # 独苗判断 if high_level_code_blocks is None: high_level_code_blocks = {} if high_level_block_codes is None: high_level_block_codes = {} block_codes = current_limit_up_block_codes_dict.get(block) if block_codes is None: block_codes = set() # 历史涨停代码 block_codes_records = set() if limit_up_record_datas: for k in limit_up_record_datas: if block in code_limit_up_reasons_dict.get(k[3]): block_codes_records.add(k[3]) if not block_codes: # 高位板泛化板块中无板块 if not high_level_block_codes.get(block): return False, True, f"【{block}】:板块无涨停", False, False, 0, 0, 0 elif len(block_codes) == 1 and code in block_codes: if not high_level_block_codes.get(block): return False, True, f"{block}:板块只有当前代码涨停", False, False, 0, 0, 0 # 可以买的最大排名 # open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, # code_limit_up_reason_dict) current_open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas) is_strong_block = False # 最多买老几 RANKS = [6, 5, 4, 4, 3, 3, 2] RANK_TIMES = ["10:00:00", "10:30:00", "11:00:00", "11:30:00", "13:30:00", "14:00:00", "15:00:00"] now_time_str = tool.get_now_time_str() max_rank = 2 for i in range(len(RANK_TIMES)): if tool.trade_time_sub(now_time_str, RANK_TIMES[i]) <= 0: max_rank = RANKS[i] break # 需要排除的老大的代码 exclude_first_codes = set() # 获取主板开1的代码 # 剔除高位板 if current_open_limit_up_codes and yesterday_current_limit_up_codes: current_open_limit_up_codes -= yesterday_current_limit_up_codes # 获取代码的初次涨停时间 first_limit_up_time = time.time() # if limit_up_record_datas: for r in limit_up_record_datas: if r[3] == code: first_limit_up_time = int(r[5]) # 获取主板实时身位,剔除高位板 current_shsz_rank, front_current_shsz_rank_codes = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, exclude_first_codes, len( current_open_limit_up_codes), shsz=True, limit_up_time=first_limit_up_time) # 计算是否需要积极买入 is_active_buy = cls.__is_need_active_buy(code, block, current_shsz_rank, len(current_open_limit_up_codes)) if current_shsz_rank < len(current_open_limit_up_codes) + max_rank: return True, len(block_codes | { code}) <= 1, f"【{block}】前排代码:{current_shsz_rank}", is_strong_block, is_active_buy, current_shsz_rank, len( block_codes), len(block_codes_records) else: return False, len(block_codes | { code}) <= 1, f"【{block}】前排代码:{front_current_shsz_rank_codes} 超过{len(current_open_limit_up_codes) + max_rank}个", is_strong_block, is_active_buy, current_shsz_rank, len( block_codes), len(block_codes_records) # 获取可以买的板块 # current_limit_up_datas: 今日实时涨停 # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日) # limit_up_record_datas:今日历史涨停 # yesterday_current_limit_up_codes : 昨日涨停代码 # before_blocks_dict:历史涨停原因 # 返回板块的计算结果[(板块名称,是否可买,是否是独苗,信息)] @classmethod def get_can_buy_block(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, high_level_general_block_codes): # 加载涨停代码的目标板块 def load_code_block(): if limit_up_record_datas: # 获取今日9:30以前的时间 time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) for d in limit_up_record_datas: if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict: code_limit_up_reasons_dict[d[3]] = {list(before_blocks_dict.get(d[3]))[0]} else: code_limit_up_reasons_dict[d[3]] = {d[2]} # 开1才能包含推荐原因 if d[6] and int(d[5]) < timestamp: code_limit_up_reasons_dict[d[3]] |= set(d[6].split("、")) return code_limit_up_reasons_dict if current_limit_up_datas is None: current_limit_up_datas = [] # 获取目标代码板块 # keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code) keys = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not keys: keys = set() keys = BlockMapManager().filter_blocks(keys) if keys: keys -= constant.KPL_INVALID_BLOCKS # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys) # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身 fresults = [] if not keys: return fresults, set() code_limit_up_reasons_dict = {} load_code_block() for block in keys: can_buy, unique, msg, is_strong, is_active_buy, current_rank, block_limit_up_count, block_limit_up_record_count = cls.__is_block_can_buy_new( code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict, high_level_code_blocks=high_level_general_code_blocks, high_level_block_codes=high_level_general_block_codes) fresults.append((block, can_buy, unique, msg, is_strong, is_active_buy, current_rank, block_limit_up_count, block_limit_up_record_count)) return fresults, keys # 是否可以下单 # 返回:可以买的板块,是否独苗,消息 # 可买的板块, 是否独苗, 消息, 可买的强势板块, 关键词, 积极买的板块 @classmethod def can_buy(cls, code): if constant.TEST: return [("测试", 0, 1, 1)], True, cls.BLOCK_TYPE_NONE, [], set(), ["化工"] # if True: # # 测试 # return True, "不判断板块身位" return cls.__can_buy_compute_result_dict.get(code) # 返回:(可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 积极买入板块列表) @classmethod def __compute_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, codes_delegate, codes_success): # 根据代码泛化板块获取泛化板块的代码集合 high_level_general_block_codes = {} for c in high_level_general_code_blocks: blocks = high_level_general_code_blocks[c] for b in blocks: if b not in high_level_general_block_codes: high_level_general_block_codes[b] = set() high_level_general_block_codes[b].add(c) blocks_compute_results, keys = cls.get_can_buy_block(code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, high_level_general_block_codes) if not blocks_compute_results: return False, True, f"没有找到板块", [], keys, [] codes = codes_delegate | codes_success # 统计成交代码的板块 trade_codes_blocks_dict = {} # 已经成交的板块 trade_success_blocks_count = {} trade_delegate_blocks_count = {} for c in codes: keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c) # 实时涨停原因 + 推荐原因 if not k1_: trade_codes_blocks_dict[c] = k4_ else: trade_codes_blocks_dict[c] = k1_ # 统计板块中的代码 trade_block_codes_dict = {} for c in trade_codes_blocks_dict: for b in trade_codes_blocks_dict[c]: if c in codes_success: if b not in trade_success_blocks_count: trade_success_blocks_count[b] = set() trade_success_blocks_count[b].add(c) if c in codes_delegate: if b not in trade_delegate_blocks_count: trade_delegate_blocks_count[b] = set() trade_delegate_blocks_count[b].add(c) if b not in trade_block_codes_dict: trade_block_codes_dict[b] = set() trade_block_codes_dict[b].add(c) # ---------------------------------加载已经下单/成交的代码信息------------end------------- # can_buy_blocks = [] can_buy_strong_blocks = [] unique_count = 0 msg_list = [] active_buy_blocks = [] for r in blocks_compute_results: # r的数据结构(板块,是否可以买,是否独苗,消息,是否是强势板块, 积极买入信息) if r[2]: # 独苗 unique_count += 1 if r[1]: # 强势主线最多同时挂3只票,最多成交2只票 MAX_DELEGATE_COUNT = 3 if r[4] else 2 MAX_DEAL_COUNT = 2 if r[4] else 1 # if r[0] in trade_success_blocks_count and len(trade_success_blocks_count[r[0]]) >= MAX_DEAL_COUNT: # msg_list.append(f"【{r[0]}】有成交代码:{trade_success_blocks_count[r[0]]}") # continue # if r[0] in trade_delegate_blocks_count and len(trade_delegate_blocks_count[r[0]]) >= MAX_DELEGATE_COUNT: # msg_list.append(f"【{r[0]}】已挂单:{trade_delegate_blocks_count[r[0]]}") # continue if len(r) > 8: can_buy_blocks.append((r[0], r[6], r[7], r[8])) else: # (板块名称,身位,板块涨停数量) can_buy_blocks.append((r[0], 0, 1, 1)) if r[4]: can_buy_strong_blocks.append(r[0]) if r[3]: msg_list.append(r[3]) if r[5]: active_buy_blocks.append(r[0]) msg_list.append(f"【{r[0]}】积极买入({r[5]})") else: if r[3]: msg_list.append(r[3]) # 所有板块都是独苗 if unique_count == len(blocks_compute_results): return can_buy_blocks, True, ",".join(msg_list), can_buy_strong_blocks, keys, active_buy_blocks return can_buy_blocks, False, ",".join(msg_list), can_buy_strong_blocks, keys, active_buy_blocks # 更新代码板块判断是否可以买的结果 # high_level_general_code_blocks 高位泛化板块 @classmethod def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas, latest_current_limit_up_records, before_blocks_dict, current_limit_up_block_codes_dict, delegate_codes, deal_codes): yesterday_current_limit_up_codes = set() yesterday_current_limit_up_records_dict = {} yesterday_current_limit_up_records = latest_current_limit_up_records[0][1] if yesterday_current_limit_up_records: for r in yesterday_current_limit_up_records: yesterday_current_limit_up_codes.add(r[0]) yesterday_current_limit_up_records_dict[r[0]] = r high_level_general_code_blocks = {} # 是否是3板及以上的高位板 for r in current_limit_up_datas: count = kpl_util.get_high_level_count(r[4]) if count >= 3 and r[0] in yesterday_current_limit_up_codes: latest_datas = latest_current_limit_up_records[:count - 1] # 是高位板 # 当日精选 blocks = set(r[6].split("、")) for d in latest_datas: for dd in d[1]: if dd[0] == r[0]: blocks.add(dd[5]) break f_blocks = [] for b in blocks: if b: f_blocks.append(b) high_level_general_code_blocks[r[0]] = f_blocks can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks = cls.__compute_can_buy_blocks(code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict, current_limit_up_block_codes_dict, high_level_general_code_blocks, delegate_codes, deal_codes) # 保存板块计算结果 cls.__can_buy_compute_result_dict[code] = ( can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks) if __name__ == "__main__": KPLPlateForbiddenManager().compute()