Administrator
2023-08-08 8c7519b0dc79d32a216765a1b46e736d53e3d786
l2/code_price_manager.py
@@ -10,100 +10,135 @@
class Buy1PriceManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __latest_data = {}
    __current_buy_1_price = {}
    __buy1_price_info_cache = {}
    __open_limit_up_lowest_price_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(Buy1PriceManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 保存买1价格信息
    @classmethod
    def __save_buy1_price_info(cls, code, limit_up_time, open_limit_up_time):
        tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time))
        RedisUtils.setex(cls.__get_redis(), f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                         json.dumps((limit_up_time, open_limit_up_time)))
    def __load_datas(cls):
        redis_ = cls.__get_redis()
        try:
            keys = RedisUtils.keys(redis_, "buy1_price_limit_up_info-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val)
    @classmethod
    def __get_buy1_price_info(cls, code):
        data = RedisUtils.get(cls.__get_redis(), f"buy1_price_limit_up_info-{code}")
            keys = RedisUtils.keys(redis_, "buy1_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__current_buy_1_price, code, val)
            keys = RedisUtils.keys(redis_, "open_limit_up_lowest_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__open_limit_up_lowest_price_cache, code, val)
        finally:
            RedisUtils.realse(redis_)
    # 保存买1价格信息
    def __save_buy1_price_info(self, code, limit_up_time, open_limit_up_time):
        tool.CodeDataCacheUtil.set_cache(self.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time))
        RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                               json.dumps((limit_up_time, open_limit_up_time)))
    def __get_buy1_price_info(self, code):
        data = RedisUtils.get(self.__get_redis(), f"buy1_price_limit_up_info-{code}")
        if not data:
            return None, None
        data = json.loads(data)
        return data[0], data[1]
    @classmethod
    def __get_buy1_price_info_cache(cls, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(cls.__buy1_price_info_cache, code)
    def __get_buy1_price_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_buy1_price_info(code)
        tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val)
        return val
        return None, None
    @classmethod
    def __save_buy1_price(cls, code, buy_1_price):
        # 不保存重复的数据
        if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
    def __save_buy1_price(self, code, buy_1_price):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0] and abs(cache_result[1] - float(buy_1_price)) < 0.001:
            return
        cls.__current_buy_1_price[code] = buy_1_price
        RedisUtils.setex(cls.__get_redis(), f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price)
        RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    @classmethod
    def __save_buy1_prices(cls, datas):
        pipe = cls.__get_redis().pipeline()
    def __save_buy1_prices(self, datas):
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
                continue
            cls.__current_buy_1_price[code] = buy_1_price
            RedisUtils.setex(pipe, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        pipe.execute()
            self.__save_buy1_price(code, buy_1_price)
    @classmethod
    def __get_buy1_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置炸板后的最低价
    @classmethod
    def __save_open_limit_up_lowest_price(cls, code, price):
        RedisUtils.setex(cls.__get_redis(), f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    @classmethod
    def __get_open_limit_up_lowest_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __save_open_limit_up_lowest_price(self, code, price):
        tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2))
        RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    @classmethod
    def set_open_limit_up_lowest_price(cls, code, price):
        old_price = cls.__get_open_limit_up_lowest_price(code)
    def __get_open_limit_up_lowest_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __get_open_limit_up_lowest_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def set_open_limit_up_lowest_price(self, code, price):
        old_price = self.__get_open_limit_up_lowest_price_cache(code)
        if not old_price or float(old_price) - float(price) > 0.001:
            cls.__save_open_limit_up_lowest_price(code, price)
            self.__save_open_limit_up_lowest_price(code, price)
    @classmethod
    def get_buy1_price(cls, code):
        if code in cls.__current_buy_1_price:
            return cls.__current_buy_1_price.get(code)
        return cls.__get_buy1_price(code)
    def get_buy1_price(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    @classmethod
    def get_open_limit_up_lowest_price(cls, code):
        price = cls.__get_open_limit_up_lowest_price(code)
    def get_open_limit_up_lowest_price(self, code):
        price = self.__get_open_limit_up_lowest_price_cache(code)
        return price
    # 处理
    @classmethod
    def process(cls, code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn):
    def process(self, code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn):
        data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
        if cls.__latest_data.get(code) == data_str:
        if self.__latest_data.get(code) == data_str:
            return
        cls.__latest_data[code] = data_str
        self.__latest_data[code] = data_str
        # 保存买1价格
        cls.__save_buy1_price(code, buy_1_price)
        self.__save_buy1_price(code, buy_1_price)
        # 记录日志
        logger_trade_queue_price_info.info(
@@ -113,30 +148,30 @@
            return
        is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        if old_limit_up_time and old_open_limit_up_time:
            return
        if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
            # 卖1消失,买1为涨停价则表示涨停
            cls.__save_buy1_price_info(code, time_str, None)
            self.__save_buy1_price_info(code, time_str, None)
        elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
            # 有涨停时间,当前没有涨停,之前没有打开涨停
            cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
            self.__save_buy1_price_info(code, old_limit_up_time, time_str)
        if old_limit_up_time and not is_limit_up:
            # 之前涨停过且现在尚未涨停
            cls.set_open_limit_up_lowest_price(code, buy_1_price)
            self.set_open_limit_up_lowest_price(code, buy_1_price)
    # datas:[ (code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn)  ]
    @classmethod
    def processes(cls, datas):
    def processes(self, datas):
        temp_buy1_prices = []
        for d in datas:
            code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn = d
            data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
            if cls.__latest_data.get(code) == data_str:
            if self.__latest_data.get(code) == data_str:
                continue
            cls.__latest_data[code] = data_str
            self.__latest_data[code] = data_str
            # 保存买1价格
            temp_buy1_prices.append((code, buy_1_price))
@@ -148,44 +183,44 @@
                continue
            is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
            old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
            old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
            if old_limit_up_time and old_open_limit_up_time:
                continue
            if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
                # 卖1消失,买1为涨停价则表示涨停
                cls.__save_buy1_price_info(code, time_str, None)
                self.__save_buy1_price_info(code, time_str, None)
            elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
                # 有涨停时间,当前没有涨停,之前没有打开涨停
                cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
                self.__save_buy1_price_info(code, old_limit_up_time, time_str)
            if old_limit_up_time and not is_limit_up:
                # 之前涨停过且现在尚未涨停
                cls.set_open_limit_up_lowest_price(code, buy_1_price)
                self.set_open_limit_up_lowest_price(code, buy_1_price)
        if temp_buy1_prices:
            cls.__save_buy1_prices(temp_buy1_prices)
            self.__save_buy1_prices(temp_buy1_prices)
    # 是否可以下单
    @classmethod
    def is_can_buy(cls, code):
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
    def is_can_buy(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        if old_limit_up_time and old_open_limit_up_time:
            return True
        return False
    # 获取涨停信息
    # 返回涨停时间与炸板时间
    @classmethod
    def get_limit_up_info(cls, code):
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
    def get_limit_up_info(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        return old_limit_up_time, old_open_limit_up_time
    # 设置涨停时间
    @classmethod
    def set_limit_up_time(cls, code, time_str):
        limit_up_time, open_limit_up_time = cls.get_limit_up_info(code)
    def set_limit_up_time(self, code, time_str):
        limit_up_time, open_limit_up_time = self.get_limit_up_info(code)
        if limit_up_time is None:
            cls.__save_buy1_price_info(code, time_str, None)
            self.__save_buy1_price_info(code, time_str, None)
if __name__ == "__main__":
    print(Buy1PriceManager.get_limit_up_info("002777"))
    print(Buy1PriceManager().get_limit_up_info("002777"))