""" 代码价格管理 """ import json import time from db.redis_manager import RedisUtils from utils import tool, history_k_data_util from db import redis_manager as redis_manager class Buy1PriceManager: __db = 1 __redisManager = redis_manager.RedisManager(1) __latest_data = {} __current_buy_1_price = {} __buy1_price_info_cache = {} __open_limit_up_lowest_price_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(Buy1PriceManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): redis_ = cls.__get_redis() try: keys = RedisUtils.keys(redis_, "buy1_price_limit_up_info-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val) keys = RedisUtils.keys(redis_, "buy1_price-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = round(float(val), 2) tool.CodeDataCacheUtil.set_cache(cls.__current_buy_1_price, code, val) keys = RedisUtils.keys(redis_, "open_limit_up_lowest_price-*") for key in keys: code = key.split("-")[-1] val = RedisUtils.get(redis_, key) val = round(float(val), 2) tool.CodeDataCacheUtil.set_cache(cls.__open_limit_up_lowest_price_cache, code, val) finally: RedisUtils.realse(redis_) # 保存买1价格信息 def __save_buy1_price_info(self, code, limit_up_time, open_limit_up_time): tool.CodeDataCacheUtil.set_cache(self.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time)) RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(), json.dumps((limit_up_time, open_limit_up_time))) def __get_buy1_price_info(self, code): data = RedisUtils.get(self.__get_redis(), f"buy1_price_limit_up_info-{code}") if not data: return None, None data = json.loads(data) return data[0], data[1] def __get_buy1_price_info_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_info_cache, code) if cache_result[0]: return cache_result[1] return None, None def __save_buy1_price(self, code, buy_1_price): cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code) if cache_result[0] and abs(cache_result[1] - float(buy_1_price)) < 0.001: return tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price) RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price) # datas:[(code, buy_1_price)] def __save_buy1_prices(self, datas): for d in datas: code = d[0] buy_1_price = d[1] # 不保存重复的数据 self.__save_buy1_price(code, buy_1_price) def __get_buy1_price(self, code): return RedisUtils.get(self.__get_redis(), f"buy1_price-{code}") def __get_buy1_price_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code) if cache_result[0]: return cache_result[1] return None # 设置炸板后的最低价 def __save_open_limit_up_lowest_price(self, code, price): tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2)) RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}") def __get_open_limit_up_lowest_price(self, code): return RedisUtils.get(self.__get_redis(), f"open_limit_up_lowest_price-{code}") def __get_open_limit_up_lowest_price_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code) if cache_result[0]: return cache_result[1] return None def set_open_limit_up_lowest_price(self, code, price): old_price = self.__get_open_limit_up_lowest_price_cache(code) if not old_price or float(old_price) - float(price) > 0.001: self.__save_open_limit_up_lowest_price(code, price) def get_buy1_price(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code) if cache_result[0]: return cache_result[1] return None def get_open_limit_up_lowest_price(self, code): price = self.__get_open_limit_up_lowest_price_cache(code) return price # 是否可以下单 def is_can_buy(self, code): old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code) if old_limit_up_time and old_open_limit_up_time: return True return False # 获取涨停信息 # 返回涨停时间与炸板时间 def get_limit_up_info(self, code): old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code) return old_limit_up_time, old_open_limit_up_time # 设置涨停时间 def set_limit_up_time(self, code, time_str): limit_up_time, open_limit_up_time = self.get_limit_up_info(code) if limit_up_time is None: self.__save_buy1_price_info(code, time_str, None) class CodesLimitRateManager: __pre_close_dict = {} __current_price_dict = {} """ 涨幅管理 """ @classmethod def __load_pre_close_prices(cls, codes): """ 获取昨天的收盘价格 :param codes: :return: """ fcodes = set() for code in codes: if code in cls.__pre_close_dict: continue fcodes.add(code) if not fcodes: return results = history_k_data_util.HistoryKDatasUtils.get_gp_latest_info(list(fcodes), fields="sec_id, pre_close") for result in results: cls.__pre_close_dict[result["sec_id"]] = round(result["pre_close"], 2) @classmethod def get_price_rates(cls, codes): """ 获取代码的价格涨幅 :param codes: :return: """ cls.__load_pre_close_prices(codes) # 获取现价 now_time = time.time() price_codes = set() for code in codes: if code not in cls.__current_price_dict or now_time - cls.__current_price_dict[code][1] > 5: price_codes.add(code) if price_codes: results = history_k_data_util.HistoryKDatasUtils.get_now_price(price_codes) for r in results: cls.__current_price_dict[r[0]] = (r[1], time.time()) rate_dict = {} for code in codes: pre_close = cls.__pre_close_dict.get(code) now_price_info = cls.__current_price_dict.get(code) if pre_close and now_price_info: rate = round((now_price_info[0] - pre_close) * 100 / pre_close, 2) rate_dict[code] = rate return rate_dict if __name__ == "__main__": print(Buy1PriceManager().get_limit_up_info("002777"))