# 涨停时间管理器
|
"""
|
今日最高价管理
|
"""
|
import json
|
|
from code_attribute import gpcode_manager
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import async_log_util
|
from log_module.log import logger_debug, logger_limit_up_record
|
from utils import global_util, tool
|
|
|
class MaxPriceInfoManager:
|
__max_price_info_cache = {}
|
# 历史涨停数据:{代码:[(涨停时间, 炸板时间)]}
|
__limit_up_records_cache = {}
|
# 最近的涨停数据:{代码:涨停时间}
|
__limit_up_latest_info_cache = {}
|
|
__db = 4
|
_redisManager = redis_manager.RedisManager(4)
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(MaxPriceInfoManager, cls).__new__(cls, *args, **kwargs)
|
cls.load_max_price_info()
|
return cls.__instance
|
|
@classmethod
|
def load_max_price_info(cls):
|
redis = cls._redisManager.getRedis()
|
keys = RedisUtils.keys(redis, "max_price_info-*", auto_free=False)
|
for key in keys:
|
code = key.replace("max_price_info-", "")
|
val = RedisUtils.get(redis, key, auto_free=False)
|
if val:
|
val = json.loads(val)
|
cls.__max_price_info_cache[code] = val
|
|
def __is_limit_up(self, code, price, sell1_info):
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
|
if not limit_up_price:
|
return False
|
if sell1_info[1] > 0:
|
return False
|
if abs(price - limit_up_price) >= 0.001:
|
return False
|
return True
|
|
def set_price_info(self, code, price, time_str, sell1_info):
|
"""
|
设置价格信息
|
@param sell1_info: 卖1信息:(卖1价, 卖1量)
|
@param code:
|
@param price:
|
@param time_str:
|
@return:
|
"""
|
price_info = (price, time_str, sell1_info)
|
old_price_info = self.__max_price_info_cache.get(code)
|
if not old_price_info or old_price_info[0] < price:
|
tool.CodeDataCacheUtil.set_cache(self.__max_price_info_cache, code, price_info)
|
RedisUtils.setex_async(
|
self.__db, "max_price_info-{}".format(code), tool.get_expire(), json.dumps(price_info))
|
# async_log_util.info(logger_debug, f"最大现价:{code}-{price_info}")
|
# 统计涨停持续时间
|
if self.__is_limit_up(code, price, sell1_info):
|
if code not in self.__limit_up_latest_info_cache:
|
self.__limit_up_latest_info_cache[code] = time_str
|
else:
|
if code not in self.__limit_up_records_cache:
|
self.__limit_up_records_cache[code] = []
|
if code in self.__limit_up_latest_info_cache:
|
limit_up_info = (self.__limit_up_latest_info_cache.get(code), time_str)
|
self.__limit_up_records_cache[code].append(limit_up_info)
|
self.__limit_up_latest_info_cache.pop(code)
|
async_log_util.info(logger_limit_up_record, f"{code}-{limit_up_info}")
|
|
def get_max_limit_up_time(self, code):
|
"""
|
获取最大涨停持续时间
|
@param code:
|
@return:
|
"""
|
try:
|
max_space_time = 0
|
if code in self.__limit_up_latest_info_cache:
|
max_space_time = tool.trade_time_sub(tool.get_now_time_str(), self.__limit_up_latest_info_cache[code])
|
if self.__limit_up_records_cache.get(code):
|
max_item = max(self.__limit_up_records_cache[code], key=lambda x: tool.trade_time_sub(x[1], x[0]))
|
max_space_time_his = tool.trade_time_sub(max_item[1], max_item[0])
|
if max_space_time_his > max_space_time:
|
max_space_time = max_space_time_his
|
return max_space_time
|
except Exception as e:
|
async_log_util.error(logger_debug, f"获取最大涨停时间出错:{code}-{str(e)}")
|
return 0
|
|
def get_price_info_cache(self, code):
|
return self.__max_price_info_cache.get(code)
|
|
|
if __name__ == "__main__":
|
list = [("1234578", "09:00:03", None), ("12345", "09:00:01", True), ("123456", "09:00:00", True),
|
("123457", "09:00:04", False)]
|