""" 代码价格管理 """ import json import threading import time from code_attribute.gpcode_manager import CodePrePriceManager from db.redis_manager_delegate import RedisUtils from utils import tool, middle_api_protocol from db import redis_manager_delegate as redis_manager from log_module.log import logger_trade_queue_price_info class Buy1PriceManager: __db = 1 __redisManager = redis_manager.RedisManager(1) __latest_data = {} __current_buy_1_price = {} __buy1_price_limit_up_info_cache = {} # 买1的金额 __latest_buy1_money_dict = {} # 最近3分钟内的买1金额 __latest_3m_buy1_money_list_dict = {} __open_limit_up_lowest_price_cache = {} # 均价 __average_rate_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(Buy1PriceManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): redis_ = cls.__get_redis() try: keys = RedisUtils.keys(redis_, "buy1_price_limit_up_info-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_limit_up_info_cache, code, val) keys = RedisUtils.keys(redis_, "buy1_price-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = round(float(val), 2) tool.CodeDataCacheUtil.set_cache(cls.__current_buy_1_price, code, val) keys = RedisUtils.keys(redis_, "open_limit_up_lowest_price-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = round(float(val), 2) tool.CodeDataCacheUtil.set_cache(cls.__open_limit_up_lowest_price_cache, code, val) except Exception as e: pass finally: RedisUtils.realse(redis_) # --------------------------------上板与炸板时间记录-------------------------------- def __save_buy1_price_limit_up_info(self, code, limit_up_time, open_limit_up_time): if limit_up_time and open_limit_up_time and tool.trade_time_sub(limit_up_time, open_limit_up_time) > 0: # 炸板时间不可能比上板时间还早 return tool.CodeDataCacheUtil.set_cache(self.__buy1_price_limit_up_info_cache, code, (limit_up_time, open_limit_up_time)) RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(), json.dumps((limit_up_time, open_limit_up_time))) def __get_buy1_price_limit_up_info_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_limit_up_info_cache, code) if cache_result[0]: return cache_result[1] return None, None def __save_buy1_price(self, code, buy_1_price): cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code) if cache_result[0] and abs(cache_result[1] - float(buy_1_price)) < 0.001: return tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price) RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price) # ------------------------炸板后的最低价------------------------------ def __save_open_limit_up_lowest_price(self, code, price): tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2)) RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}") def __get_open_limit_up_lowest_price_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code) if cache_result[0]: return cache_result[1] return None def set_open_limit_up_lowest_price(self, code, price): # 判断是否炸板 old_price = self.__get_open_limit_up_lowest_price_cache(code) if not old_price or float(old_price) > float(price): self.__save_open_limit_up_lowest_price(code, price) def get_open_limit_up_lowest_price(self, code): price = self.__get_open_limit_up_lowest_price_cache(code) return price def get_buy1_price(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code) if cache_result[0]: return cache_result[1] return None def get_average_rate(self, code): return self.__average_rate_dict.get(code) # 处理 def process(self, code, buy_1_price, buy_1_volume, time_str, limit_up_price, sell_1_price, sell_1_volumn, average_rate=None): self.__average_rate_dict[code] = average_rate data_str = f"{buy_1_price},{buy_1_volume},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}" if self.__latest_data.get(code) == data_str: return # 记录日志 logger_trade_queue_price_info.info( f"code={code} data: time_str-{time_str}, buy_1_price-{buy_1_price},limit_up_price-{limit_up_price},sell_1_price-{sell_1_price},sell_1_volumn-{sell_1_volumn},average_rate-{average_rate}") # 买1价格不能小于1块 if float(buy_1_price) < 1.0: return ## 记录最近的买1金额 if code not in self.__latest_3m_buy1_money_list_dict: self.__latest_3m_buy1_money_list_dict[code] = [] self.__latest_3m_buy1_money_list_dict[code].append((time_str, int(buy_1_price * buy_1_volume))) if len(self.__latest_3m_buy1_money_list_dict[code]) > 80: self.__latest_3m_buy1_money_list_dict[code] = self.__latest_3m_buy1_money_list_dict[code][-80:] self.__latest_data[code] = data_str # 保存买1价格 self.__save_buy1_price(code, buy_1_price) is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.001 old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_limit_up_info_cache(code) # 判断真正涨停 if is_limit_up and not old_limit_up_time and int(sell_1_volumn) <= 0: # 卖1消失,买1为涨停价则表示涨停 self.__save_buy1_price_limit_up_info(code, time_str, None) elif old_limit_up_time and not is_limit_up and not old_open_limit_up_time: # 有涨停时间,当前没有涨停,之前没有打开涨停 self.__save_buy1_price_limit_up_info(code, old_limit_up_time, time_str) if old_limit_up_time and not is_limit_up: limit_up_time, open_limit_up_time = self.__get_buy1_price_limit_up_info_cache(code) if limit_up_time and open_limit_up_time: # 获取计算当前的涨幅 # pre_close_price = round(float(limit_up_price) / 1.1, 2) # if (float(buy_1_price) - pre_close_price) / pre_close_price < 0.05: # # 炸开且涨幅小于5% # l2_trade_util.forbidden_trade(code, "涨停炸开后现价涨幅小于5%之后不买") # async_log_util.info(logger_trade_queue_price_info, f"涨停炸开后现价涨幅小于5%之后不买:code-{code}") # 之前涨停过且现在尚未涨停 self.set_open_limit_up_lowest_price(code, buy_1_price) # 是否可以下单 def is_can_buy(self, code): old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_limit_up_info_cache(code) if old_limit_up_time and old_open_limit_up_time: return True return False # 获取涨停信息 # 返回涨停时间与炸板时间 def get_limit_up_info(self, code): old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_limit_up_info_cache(code) return old_limit_up_time, old_open_limit_up_time # 设置涨停时间 def set_limit_up_time(self, code, time_str): limit_up_time, open_limit_up_time = self.get_limit_up_info(code) if not limit_up_time: self.__save_buy1_price_limit_up_info(code, time_str, None) # 设置买1金额 def set_latest_buy1_money(self, code, buy_1_price, buy_1_volume): self.__latest_buy1_money_dict[code] = buy_1_price * buy_1_volume # 获取最近的买1金额 def get_latest_buy1_money(self, code): return self.__latest_buy1_money_dict.get(code) def get_latest_3m_buy1_money_list(self, code): return self.__latest_3m_buy1_money_list_dict.get(code) class CurrentPriceManager: """ 现价管理 """ # 代码涨幅数据 __current_rate_dict = {} # 最近上传时间 __latest_upload_time = 0 @classmethod def set_current_price(cls, code, price): """ 设置现价 @param code: @param price: @return: """ pre_close_price = CodePrePriceManager.get_price_pre_cache(code) if pre_close_price: rate = round((price - pre_close_price) * 100 / pre_close_price, 2) cls.__current_rate_dict[code] = rate # 判断是否要上传 if time.time() - cls.__latest_upload_time >= 3: # 间隔3s上传 cls.__latest_upload_time = time.time() threading.Thread(target=lambda: middle_api_protocol.request( middle_api_protocol.load_l2_subscript_codes_rate(cls.__current_rate_dict)), daemon=True).start() if __name__ == "__main__": print(Buy1PriceManager().get_limit_up_info("002777"))