""" 首板筛票机制 """ # 设置首板未筛选的目标票 import json from db.redis_manager_delegate import RedisUtils from utils import tool from db import redis_manager_delegate as redis_manager from third_data import block_info __redisManager = redis_manager.RedisManager(0) def __get_redis(): return __redisManager.getRedis() __first_code_data_cache = {} # 保存首板票的数据 # 1.首次涨停时间 # 2.最近涨停时间 # 3.首次炸开时间 # 4.最近炸开时间 # 5.是否已经涨停 def __save_first_code_data(code, data): tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, data) RedisUtils.setex(__redisManager.getRedis(), f"first_code_data-{code}", tool.get_expire(), json.dumps(data)) def __get_first_code_data(code): val = RedisUtils.get(__get_redis(), f"first_code_data-{code}") if val is None: return None return json.loads(val) def __get_first_code_data_cache(code): cache_result = tool.CodeDataCacheUtil.get_cache(__first_code_data_cache, code) if cache_result[0]: return cache_result[1] val = __get_first_code_data(code) tool.CodeDataCacheUtil.set_cache(__first_code_data_cache, code, val) return val # 添加进首板未筛选票 def __add_first_no_screen_codes(codes): redis = __redisManager.getRedis() try: if codes: for code in codes: RedisUtils.sadd(redis, "first_no_screen_codes", code, auto_free=False) RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire(), auto_free=False) finally: RedisUtils.realse(redis) def clear_first_no_screen_codes(): RedisUtils.delete(__redisManager.getRedis(), "first_no_screen_codes") def __remove_first_no_screen_codes(codes): redis = __redisManager.getRedis() try: if codes: for code in codes: RedisUtils.srem(redis, "first_no_screen_codes", code, auto_free= False) finally: RedisUtils.realse(redis) def __get_first_no_screen_codes(): codes = RedisUtils.smembers(__get_redis(), "first_no_screen_codes") if not codes: return set() return codes # 处理ticks数据 def process_ticks(prices): for price in prices: code = price["code"] time_ = price["time"] old_data = __get_first_code_data_cache(code) if old_data is None: continue limit_up = price["limit_up"] last_limit_up = old_data[4] if not last_limit_up and limit_up: # 涨停 old_data[1] = time_ elif last_limit_up and not limit_up: # 炸开 if not old_data[2]: old_data[2] = time_ old_data[3] = time_ old_data[4] = limit_up __save_first_code_data(code, old_data) def set_target_no_screen_codes(codes): old_codes = get_target_no_screen_codes() if old_codes is None: old_codes = set() codes_set = set(codes) if old_codes == codes_set: return set() del_codes = old_codes - codes_set add_codes = codes_set - old_codes if del_codes: __remove_first_no_screen_codes(del_codes) if add_codes: # 添加进首板未选票 __add_first_no_screen_codes(add_codes) return add_codes # 获取首板未筛选的目标票 def get_target_no_screen_codes(): return __get_first_no_screen_codes() # 是否需要加入首板 def is_need_add_to_first(code): # 被禁止交易的票不能加入首板 # 15个交易日的最低价与当前价比较,涨幅大于等于50%的需要剔除 # pass # 是否需要从L2监控卡位移除 # now_rate 当前涨幅 # is_limit_up 是否涨停 # limit_up_time 首次涨停时间 # latest_open_time 最近一次开板时间 def need_remove_from_l2_watch(code, now_rate, is_limit_up, limit_up_time, latest_open_time): if is_limit_up: # 处理当前是涨停状态 if tool.trade_time_sub(tool.get_now_time_str(), "11:30:00") > 0: # 11:30 过后,10:30就涨停了的票,中途还未炸过的就需要剔除 if latest_open_time is None and tool.trade_time_sub(limit_up_time, "10:30:00") <= 0: return True, "早上10点30前就涨停了,且11点30前未炸板" else: # 处理当前不是涨停状态 if latest_open_time is not None: open_time_seconds = tool.trade_time_sub(tool.get_now_time_str(), latest_open_time) if open_time_seconds > 60 * 60: return True, "炸板后,60分钟内都未回封" if now_rate <= 6: return True, "炸板后,涨幅小于6%" blocks = block_info.get_code_blocks(code) if blocks and len(blocks) == 1: codes = block_info.get_block_codes(blocks[0]) if codes: pass return False, "首板炸开后,涨幅≤6%"