# 涨停时间管理器 """ 今日最高价管理 """ import json from code_attribute import gpcode_manager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from log_module import async_log_util from log_module.log import logger_debug, logger_limit_up_record from utils import global_util, tool class MaxPriceInfoManager: __max_price_info_cache = {} # 历史涨停数据:{代码:[(涨停时间, 炸板时间)]} __limit_up_records_cache = {} # 最近的涨停数据:{代码:涨停时间} __limit_up_latest_info_cache = {} __db = 4 _redisManager = redis_manager.RedisManager(4) __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(MaxPriceInfoManager, cls).__new__(cls, *args, **kwargs) cls.load_max_price_info() return cls.__instance @classmethod def load_max_price_info(cls): redis = cls._redisManager.getRedis() keys = RedisUtils.keys(redis, "max_price_info-*", auto_free=False) for key in keys: code = key.replace("max_price_info-", "") val = RedisUtils.get(redis, key, auto_free=False) if val: val = json.loads(val) cls.__max_price_info_cache[code] = val def __is_limit_up(self, code, price, sell1_info): limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if not limit_up_price: return False if sell1_info[1] > 0: return False if abs(price - limit_up_price) >= 0.001: return False return True def set_price_info(self, code, price, time_str, sell1_info): """ 设置价格信息 @param sell1_info: 卖1信息:(卖1价, 卖1量) @param code: @param price: @param time_str: @return: """ price_info = (price, time_str, sell1_info) old_price_info = self.__max_price_info_cache.get(code) if not old_price_info or old_price_info[0] < price: tool.CodeDataCacheUtil.set_cache(self.__max_price_info_cache, code, price_info) RedisUtils.setex_async( self.__db, "max_price_info-{}".format(code), tool.get_expire(), json.dumps(price_info)) # async_log_util.info(logger_debug, f"最大现价:{code}-{price_info}") # 统计涨停持续时间 if self.__is_limit_up(code, price, sell1_info): if code not in self.__limit_up_latest_info_cache: self.__limit_up_latest_info_cache[code] = time_str else: if code not in self.__limit_up_records_cache: self.__limit_up_records_cache[code] = [] if code in self.__limit_up_latest_info_cache: limit_up_info = (self.__limit_up_latest_info_cache.get(code), time_str) self.__limit_up_records_cache[code].append(limit_up_info) self.__limit_up_latest_info_cache.pop(code) async_log_util.info(logger_limit_up_record, f"{code}-{limit_up_info}") def get_max_limit_up_time(self, code): """ 获取最大涨停持续时间 @param code: @return: """ try: max_space_time = 0 if code in self.__limit_up_latest_info_cache: max_space_time = tool.trade_time_sub(tool.get_now_time_str(), self.__limit_up_latest_info_cache[code]) if self.__limit_up_records_cache.get(code): max_item = max(self.__limit_up_records_cache[code], key=lambda x: tool.trade_time_sub(x[1], x[0])) max_space_time_his = tool.trade_time_sub(max_item[1], max_item[0]) if max_space_time_his > max_space_time: max_space_time = max_space_time_his return max_space_time except Exception as e: async_log_util.error(logger_debug, f"获取最大涨停时间出错:{code}-{str(e)}") return 0 def get_price_info_cache(self, code): return self.__max_price_info_cache.get(code) if __name__ == "__main__": list = [("1234578", "09:00:03", None), ("12345", "09:00:01", True), ("123456", "09:00:00", True), ("123457", "09:00:04", False)]