""" 首板筛票机制 """ # 设置首板未筛选的目标票 import json from db.redis_manager_delegate import RedisUtils from utils import tool from db import redis_manager_delegate as redis_manager class FirstCodeDataManager: __instance = None __redisManager = redis_manager.RedisManager(0) __db = 0 __first_code_data_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(FirstCodeDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance # 加载数据 @classmethod def __load_data(cls): keys = RedisUtils.keys(cls.__get_redis(), "first_code_data-*") for k in keys: val = RedisUtils.get(cls.__get_redis(), k) code = k.split("-")[-1] if val: cls.__first_code_data_cache[code] = json.loads(val) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存首板票的数据 # 1.首次涨停时间 # 2.最近涨停时间 # 3.首次炸开时间 # 4.最近炸开时间 # 5.是否已经涨停 def save_first_code_data(self, code, data): tool.CodeDataCacheUtil.set_cache(self.__first_code_data_cache, code, data) RedisUtils.setex_async(self.__db, f"first_code_data-{code}", tool.get_expire(), json.dumps(data)) def get_first_code_data(self, code): val = RedisUtils.get(self.__get_redis(), f"first_code_data-{code}") if val is None: return None return json.loads(val) def get_first_code_data_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__first_code_data_cache, code) if cache_result[0]: return cache_result[1] return None class FirstNoScreenCodesManager: __instance = None __redisManager = redis_manager.RedisManager(0) __db = 0 __first_no_screen_codes_cache = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(FirstNoScreenCodesManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance # 加载数据 @classmethod def __load_data(cls): codes = RedisUtils.smembers(cls.__get_redis(), "first_no_screen_codes") cls.__first_no_screen_codes_cache = codes @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 添加进首板未筛选票 def add_first_no_screen_codes(self, codes): redis = self.__get_redis() if codes: for code in codes: self.__first_no_screen_codes_cache.add(code) RedisUtils.sadd_async(self.__db, "first_no_screen_codes", code, auto_free=False) RedisUtils.expire_async(self.__db, "first_no_screen_codes", tool.get_expire(), auto_free=False) def clear_first_no_screen_codes(self): self.__first_no_screen_codes_cache.clear() RedisUtils.delete(self.__get_redis(), "first_no_screen_codes") def remove_first_no_screen_codes(self, codes): if codes: for code in codes: self.__first_no_screen_codes_cache.discard(code) RedisUtils.srem_async(self.__db, "first_no_screen_codes", code, auto_free=False) def __get_first_no_screen_codes(self): codes = RedisUtils.smembers(self.__get_redis(), "first_no_screen_codes") if not codes: return set() return codes def get_first_no_screen_codes_cache(self): return self.__first_no_screen_codes_cache # 处理ticks数据 def process_ticks(prices): for price in prices: code = price["code"] time_ = price["time"] old_data = FirstCodeDataManager().get_first_code_data_cache(code) if old_data is None: continue limit_up = price["limit_up"] last_limit_up = old_data[4] if not last_limit_up and limit_up: # 涨停 old_data[1] = time_ elif last_limit_up and not limit_up: # 炸开 if not old_data[2]: old_data[2] = time_ old_data[3] = time_ old_data[4] = limit_up FirstCodeDataManager().save_first_code_data(code, old_data) def set_target_no_screen_codes(codes): old_codes = get_target_no_screen_codes() if old_codes is None: old_codes = set() codes_set = set(codes) if old_codes == codes_set: return set() del_codes = old_codes - codes_set add_codes = codes_set - old_codes if del_codes: FirstNoScreenCodesManager().remove_first_no_screen_codes(del_codes) if add_codes: # 添加进首板未选票 FirstNoScreenCodesManager().add_first_no_screen_codes(add_codes) return add_codes # 获取首板未筛选的目标票 def get_target_no_screen_codes(): return FirstNoScreenCodesManager().get_first_no_screen_codes_cache() # 是否需要加入首板 def is_need_add_to_first(code): # 被禁止交易的票不能加入首板 # 15个交易日的最低价与当前价比较,涨幅大于等于50%的需要剔除 # pass # 是否需要从L2监控卡位移除 # now_rate 当前涨幅 # is_limit_up 是否涨停 # limit_up_time 首次涨停时间 # latest_open_time 最近一次开板时间 def need_remove_from_l2_watch(code, now_rate, is_limit_up, limit_up_time, latest_open_time): if is_limit_up: # 处理当前是涨停状态 if tool.trade_time_sub(tool.get_now_time_str(), "11:30:00") > 0: # 11:30 过后,10:30就涨停了的票,中途还未炸过的就需要剔除 if latest_open_time is None and tool.trade_time_sub(limit_up_time, "10:30:00") <= 0: return True, "早上10点30前就涨停了,且11点30前未炸板" else: # 处理当前不是涨停状态 if latest_open_time is not None: open_time_seconds = tool.trade_time_sub(tool.get_now_time_str(), latest_open_time) if open_time_seconds > 60 * 60: return True, "炸板后,60分钟内都未回封" if now_rate <= 6: return True, "炸板后,涨幅小于6%" return False, "首板炸开后,涨幅≤6%"