Administrator
2023-08-08 8c7519b0dc79d32a216765a1b46e736d53e3d786
Buy1PriceManager单例+缓存改造
14个文件已修改
598 ■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 82 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 59 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -201,10 +201,10 @@
             "limit_up": is_limit_up})
        if code in new_add_codes:
            if is_limit_up:
                place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(
                place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(
                    code)
                if place_order_count == 0:
                    trade_data_manager.PlaceOrderCountManager.place_order(code)
                    trade_data_manager.PlaceOrderCountManager().place_order(code)
    gpcode_first_screen_manager.process_ticks(prices)
l2/cancel_buy_strategy.py
@@ -156,7 +156,7 @@
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = process_index_old
        # 下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
@@ -477,7 +477,7 @@
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
@@ -1006,17 +1006,25 @@
# --------------------------------封单额变化撤------------------------
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _db = 1
    _redisManager = redis_manager.RedisManager(1)
    _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(L2LimitUpMoneyStatisticUtil, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
    def __set_l2_second_money_record(self, code, time, num, from_index, to_index):
        old_num, old_from, old_to = self.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
@@ -1027,38 +1035,34 @@
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
    def __get_l2_second_money_record(self, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = RedisUtils.get(cls.__get_redis(), key)
        return cls.__format_second_money_record_val(val)
        val = RedisUtils.get(self.__get_redis(), key)
        return self.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
    def __format_second_money_record_val(self, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
    def __get_l2_second_money_record_keys(self, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = RedisUtils.keys(cls.__get_redis(), key)
        keys = RedisUtils.keys(self.__get_redis(), key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
    def __set_l2_latest_money_record(self, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((num, index)))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
    def __get_l2_latest_money_record(self, code):
        key = "l2_limit_up_money-{}".format(code)
        result = RedisUtils.get(cls.__get_redis(), key)
        result = RedisUtils.get(self.__get_redis(), key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
@@ -1067,8 +1071,7 @@
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
    def verify_num(self, code, num, time_str):
        # 记录买1矫正日志
        logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
        time_ = time_str.replace(":", "")
@@ -1081,13 +1084,13 @@
                # 只处理9:30后的数据
                if int(temp_time.replace(":", "")) < int("093000"):
                    break
                keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
                keys_ = self.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
                if len(keys_) > 0:
                    keys.append(keys_[0])
                if len(keys) >= 1:
                    break
        else:
            keys_ = cls.__get_l2_second_money_record_keys(code, "*")
            keys_ = self.__get_l2_second_money_record_keys(code, "*")
            key_list = []
            for k in keys_:
                time__ = k.split("-")[-1]
@@ -1101,11 +1104,11 @@
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
            val = RedisUtils.get(cls.__get_redis(), key)
            old_num, old_from, old_to = cls.__format_second_money_record_val(val)
            val = RedisUtils.get(self.__get_redis(), key)
            old_num, old_from, old_to = self.__format_second_money_record_val(val)
            end_index = old_to
            # 保存最近的数据
            cls.__set_l2_latest_money_record(code, end_index, num)
            self.__set_l2_latest_money_record(code, end_index, num)
            logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
        else:
            logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
@@ -1150,8 +1153,7 @@
        #     logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
    def __compute_num(self, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
@@ -1165,15 +1167,13 @@
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
    def clear(self, code):
        key = "l2_limit_up_money-{}".format(code)
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete(self.__get_redis(), key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
    def process_data(self, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
                     with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
@@ -1201,17 +1201,17 @@
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num = self.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
            self.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                              time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(time.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        total_num, index = self.__get_l2_latest_money_record(code)
        record_msg = f"同花顺买1信息 {total_num},{index}"
        if index == -1:
@@ -1237,7 +1237,7 @@
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 获取最大封单额
        max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
        max_buy1_volume = self._thsBuy1VolumnManager.get_max_buy1_volume(code)
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
@@ -1283,7 +1283,7 @@
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
                val = self.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 在处理数据的范围内,就需要判断是否要撤单了
            if start_index <= i <= end_index:
l2/code_price_manager.py
@@ -10,100 +10,135 @@
class Buy1PriceManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __latest_data = {}
    __current_buy_1_price = {}
    __buy1_price_info_cache = {}
    __open_limit_up_lowest_price_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(Buy1PriceManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 保存买1价格信息
    @classmethod
    def __save_buy1_price_info(cls, code, limit_up_time, open_limit_up_time):
        tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time))
        RedisUtils.setex(cls.__get_redis(), f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                         json.dumps((limit_up_time, open_limit_up_time)))
    def __load_datas(cls):
        redis_ = cls.__get_redis()
        try:
            keys = RedisUtils.keys(redis_, "buy1_price_limit_up_info-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val)
    @classmethod
    def __get_buy1_price_info(cls, code):
        data = RedisUtils.get(cls.__get_redis(), f"buy1_price_limit_up_info-{code}")
            keys = RedisUtils.keys(redis_, "buy1_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__current_buy_1_price, code, val)
            keys = RedisUtils.keys(redis_, "open_limit_up_lowest_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__open_limit_up_lowest_price_cache, code, val)
        finally:
            RedisUtils.realse(redis_)
    # 保存买1价格信息
    def __save_buy1_price_info(self, code, limit_up_time, open_limit_up_time):
        tool.CodeDataCacheUtil.set_cache(self.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time))
        RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                               json.dumps((limit_up_time, open_limit_up_time)))
    def __get_buy1_price_info(self, code):
        data = RedisUtils.get(self.__get_redis(), f"buy1_price_limit_up_info-{code}")
        if not data:
            return None, None
        data = json.loads(data)
        return data[0], data[1]
    @classmethod
    def __get_buy1_price_info_cache(cls, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(cls.__buy1_price_info_cache, code)
    def __get_buy1_price_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_buy1_price_info(code)
        tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val)
        return val
        return None, None
    @classmethod
    def __save_buy1_price(cls, code, buy_1_price):
        # 不保存重复的数据
        if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
    def __save_buy1_price(self, code, buy_1_price):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0] and abs(cache_result[1] - float(buy_1_price)) < 0.001:
            return
        cls.__current_buy_1_price[code] = buy_1_price
        RedisUtils.setex(cls.__get_redis(), f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price)
        RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    @classmethod
    def __save_buy1_prices(cls, datas):
        pipe = cls.__get_redis().pipeline()
    def __save_buy1_prices(self, datas):
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            if code in cls.__current_buy_1_price and cls.__current_buy_1_price[code] == buy_1_price:
                continue
            cls.__current_buy_1_price[code] = buy_1_price
            RedisUtils.setex(pipe, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
        pipe.execute()
            self.__save_buy1_price(code, buy_1_price)
    @classmethod
    def __get_buy1_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置炸板后的最低价
    @classmethod
    def __save_open_limit_up_lowest_price(cls, code, price):
        RedisUtils.setex(cls.__get_redis(), f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    @classmethod
    def __get_open_limit_up_lowest_price(cls, code):
        return RedisUtils.get(cls.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __save_open_limit_up_lowest_price(self, code, price):
        tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2))
        RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    @classmethod
    def set_open_limit_up_lowest_price(cls, code, price):
        old_price = cls.__get_open_limit_up_lowest_price(code)
    def __get_open_limit_up_lowest_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __get_open_limit_up_lowest_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def set_open_limit_up_lowest_price(self, code, price):
        old_price = self.__get_open_limit_up_lowest_price_cache(code)
        if not old_price or float(old_price) - float(price) > 0.001:
            cls.__save_open_limit_up_lowest_price(code, price)
            self.__save_open_limit_up_lowest_price(code, price)
    @classmethod
    def get_buy1_price(cls, code):
        if code in cls.__current_buy_1_price:
            return cls.__current_buy_1_price.get(code)
        return cls.__get_buy1_price(code)
    def get_buy1_price(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    @classmethod
    def get_open_limit_up_lowest_price(cls, code):
        price = cls.__get_open_limit_up_lowest_price(code)
    def get_open_limit_up_lowest_price(self, code):
        price = self.__get_open_limit_up_lowest_price_cache(code)
        return price
    # 处理
    @classmethod
    def process(cls, code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn):
    def process(self, code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn):
        data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
        if cls.__latest_data.get(code) == data_str:
        if self.__latest_data.get(code) == data_str:
            return
        cls.__latest_data[code] = data_str
        self.__latest_data[code] = data_str
        # 保存买1价格
        cls.__save_buy1_price(code, buy_1_price)
        self.__save_buy1_price(code, buy_1_price)
        # 记录日志
        logger_trade_queue_price_info.info(
@@ -113,30 +148,30 @@
            return
        is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        if old_limit_up_time and old_open_limit_up_time:
            return
        if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
            # 卖1消失,买1为涨停价则表示涨停
            cls.__save_buy1_price_info(code, time_str, None)
            self.__save_buy1_price_info(code, time_str, None)
        elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
            # 有涨停时间,当前没有涨停,之前没有打开涨停
            cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
            self.__save_buy1_price_info(code, old_limit_up_time, time_str)
        if old_limit_up_time and not is_limit_up:
            # 之前涨停过且现在尚未涨停
            cls.set_open_limit_up_lowest_price(code, buy_1_price)
            self.set_open_limit_up_lowest_price(code, buy_1_price)
    # datas:[ (code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn)  ]
    @classmethod
    def processes(cls, datas):
    def processes(self, datas):
        temp_buy1_prices = []
        for d in datas:
            code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn = d
            data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
            if cls.__latest_data.get(code) == data_str:
            if self.__latest_data.get(code) == data_str:
                continue
            cls.__latest_data[code] = data_str
            self.__latest_data[code] = data_str
            # 保存买1价格
            temp_buy1_prices.append((code, buy_1_price))
@@ -148,44 +183,44 @@
                continue
            is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
            old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
            old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
            if old_limit_up_time and old_open_limit_up_time:
                continue
            if is_limit_up and old_limit_up_time is None and float(sell_1_price) < 0.1 and int(sell_1_volumn) <= 0:
                # 卖1消失,买1为涨停价则表示涨停
                cls.__save_buy1_price_info(code, time_str, None)
                self.__save_buy1_price_info(code, time_str, None)
            elif old_limit_up_time and not is_limit_up and old_open_limit_up_time is None:
                # 有涨停时间,当前没有涨停,之前没有打开涨停
                cls.__save_buy1_price_info(code, old_limit_up_time, time_str)
                self.__save_buy1_price_info(code, old_limit_up_time, time_str)
            if old_limit_up_time and not is_limit_up:
                # 之前涨停过且现在尚未涨停
                cls.set_open_limit_up_lowest_price(code, buy_1_price)
                self.set_open_limit_up_lowest_price(code, buy_1_price)
        if temp_buy1_prices:
            cls.__save_buy1_prices(temp_buy1_prices)
            self.__save_buy1_prices(temp_buy1_prices)
    # 是否可以下单
    @classmethod
    def is_can_buy(cls, code):
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
    def is_can_buy(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        if old_limit_up_time and old_open_limit_up_time:
            return True
        return False
    # 获取涨停信息
    # 返回涨停时间与炸板时间
    @classmethod
    def get_limit_up_info(cls, code):
        old_limit_up_time, old_open_limit_up_time = cls.__get_buy1_price_info_cache(code)
    def get_limit_up_info(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        return old_limit_up_time, old_open_limit_up_time
    # 设置涨停时间
    @classmethod
    def set_limit_up_time(cls, code, time_str):
        limit_up_time, open_limit_up_time = cls.get_limit_up_info(code)
    def set_limit_up_time(self, code, time_str):
        limit_up_time, open_limit_up_time = self.get_limit_up_info(code)
        if limit_up_time is None:
            cls.__save_buy1_price_info(code, time_str, None)
            self.__save_buy1_price_info(code, time_str, None)
if __name__ == "__main__":
    print(Buy1PriceManager.get_limit_up_info("002777"))
    print(Buy1PriceManager().get_limit_up_info("002777"))
l2/l2_data_manager.py
@@ -6,7 +6,6 @@
from db import redis_manager
from db.redis_manager import RedisUtils
from utils import tool
from log_module.log import logger_l2_trade_buy
from utils.tool import CodeDataCacheUtil
_db = 1
@@ -33,45 +32,70 @@
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
class TradePointManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __buy_compute_index_info_cache = {}
    __buy_cancel_single_pos_cache = {}
    __instance = None
    @staticmethod
    def __get_redis():
        return _redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs)
            cls.load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def load_data(cls):
        redis_ = cls.__get_redis()
        keys = RedisUtils.keys(redis_, "buy_compute_index_info-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            val = json.loads(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    @staticmethod
    def delete_buy_point(code):
        CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(_db, "buy_compute_index_info-{}".format(code))
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
    def get_buy_compute_start_data(self, code):
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = RedisUtils.get(TradePointManager.__get_redis(), _key)
        _data_json = RedisUtils.get(self.__get_redis(), _key)
        if _data_json is None:
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    @staticmethod
    def get_buy_compute_start_data_cache(code):
        cache_result = CodeDataCacheUtil.get_cache(TradePointManager.__buy_compute_index_info_cache, code)
    def get_buy_compute_start_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = TradePointManager.get_buy_compute_start_data(code)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, val)
        return val
        return None, None, None, 0, 0, [], 0
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets,
    def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count,
                                   max_num_sets,
                                   volume_rate):
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
@@ -81,80 +105,41 @@
                     volume_rate)
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = TradePointManager.get_buy_compute_start_data_cache(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache(
                code)
            data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(
            _db, _key, expire,
            json.dumps(data_))
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
    def get_buy_cancel_single_pos(self, code):
        info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    def get_buy_cancel_single_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
    def set_buy_cancel_single_pos(self, code, index):
        tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index)
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code), expire, index)
        RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        RedisUtils.delete_async(_db, "buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(_db, "compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code), expire,
                         json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(_db, "count_info_for_cancel_buy-{}".format(code))
    def delete_buy_cancel_point(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code)
        RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code))
# 清除l2数据
@@ -203,8 +188,8 @@
# 是否在l2固定监控代码中
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    return RedisUtils.sismember( _redisManager.getRedis(), key, code)
    return RedisUtils.sismember(_redisManager.getRedis(), key, code)
if __name__ == "__main__":
    TradePointManager.get_buy_compute_start_data_cache("603912")
    TradePointManager().get_buy_compute_start_data_cache("603912")
l2/l2_data_manager_new.py
@@ -441,9 +441,9 @@
        def buy_1_cancel():
            _start_time = round(t.time() * 1000)
            # 撤单计算,只看买1
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index,
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil().process_data(code, start_index,
                                                                                 end_index,
                                                                                 buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
@@ -520,8 +520,8 @@
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil().process(code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
                                                                               end_index,
                                                                               buy_exec_index)
                return cancel_data, cancel_msg
            except Exception as e:
                logging.exception(e)
@@ -687,15 +687,15 @@
        # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code)
        # if not is_limited_up:
        #     gpcode_manager.FirstCodeManager().add_limited_up_record([code])
        #     place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(
        #     place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(
        #         code)
        #     if place_order_count == 0:
        #         trade_data_manager.PlaceOrderCountManager.place_order(code)
        #         trade_data_manager.PlaceOrderCountManager().place_order(code)
        #     return False, True, "首板代码,且尚未涨停过"
        try:
            # 买1价格必须为涨停价才能买
            # buy1_price = cls.buy1PriceManager.get_price(code)
            # buy1_price = cls.buy1PriceManager().get_price(code)
            # if buy1_price is None:
            #     return False, "买1价尚未获取到"
            # limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -852,7 +852,7 @@
                zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb >= 200 * 100000000:
                buy1_price = code_price_manager.Buy1PriceManager.get_buy1_price(code)
                buy1_price = code_price_manager.Buy1PriceManager().get_buy1_price(code)
                if buy1_price is None:
                    return False, True, f"尚未获取到买1价"
                dif = float(limit_up_price) - float(buy1_price)
@@ -860,13 +860,13 @@
                if dif > 0.10001:
                    return False, True, f"自由流通200亿以上,买1剩余档数大于10档,买一({buy1_price})涨停({limit_up_price})"
        open_limit_up_lowest_price = code_price_manager.Buy1PriceManager.get_open_limit_up_lowest_price(code)
        open_limit_up_lowest_price = code_price_manager.Buy1PriceManager().get_open_limit_up_lowest_price(code)
        price_pre_close = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
        if open_limit_up_lowest_price and (
                float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05:
            return False, True, f"炸板后最低价跌至5%以下"
        limit_up_info = code_price_manager.Buy1PriceManager.get_limit_up_info(code)
        limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code)
        if limit_up_info[0] is None and False:
            total_data = local_today_datas.get(code)
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
@@ -1099,14 +1099,17 @@
            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
                                                           buy_nums, buy_count, max_num_set_new,
                                                           cls.volume_rate_info[code][0])
            f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code,
                                                                                             total_datas[compute_index][
                                                                                                 "val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
                                                                        compute_index,
                                                                        buy_single_index,
                                                                        buy_exec_index, False)
            dask.compute(f1, f2, f3, f4, f5)
            f4 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
            # 暂时不需要
            # f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
            #                                                             compute_index,
            #                                                             buy_single_index,
            #                                                             buy_exec_index, False)
            dask.compute(f1, f2, f3, f4)
            # 已被并行处理
            # # 记录买入信号位置
@@ -1173,7 +1176,7 @@
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data_cache(
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate
@@ -1181,7 +1184,7 @@
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
                                volume_rate):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
        TradePointManager().set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
                                                     max_num_set, volume_rate)
    # 计算下单起始信号
@@ -1269,7 +1272,7 @@
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        # place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        # 目标订单数量
        threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count()
@@ -1296,7 +1299,7 @@
            trigger_buy = False
            # 必须为连续2秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time:
                TradePointManager.delete_buy_point(code)
                TradePointManager().delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None, max_buy_num_set
output/code_info_output.py
@@ -324,7 +324,7 @@
                        data['val']['num'] * float(data['val']['price']) * 100 / 10000, 1)}
        # 买入信号
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data_cache(
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
            code)
        if buy_single_index is None:
@@ -471,7 +471,7 @@
        code_name = gpcode_manager.get_code_name(code)
        # 获取现价,判断是否涨停
        current_price_info = global_util.cuurent_prices.get(code)
        limit_up_info = code_price_manager.Buy1PriceManager.get_limit_up_info(code)
        limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code)
        is_limit_up = True
        open_limit_up = limit_up_info[0] and limit_up_info[1]
        if current_price_info is not None and not current_price_info[1]:
@@ -497,7 +497,7 @@
        return f"{item['val']['time']}#{item['val']['num']}手#{round(item['val']['num'] * float(item['val']['price']) * 100 / 10000, 1)}万"
    # 获取炸板信息
    limit_up_info = code_price_manager.Buy1PriceManager.get_limit_up_info(code)
    limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code)
    break_time = limit_up_info[1]
    records = []
    try:
server.py
@@ -18,7 +18,7 @@
import inited_data
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, LCancelBigNumComputer
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
import l2.l2_data_util
from output import code_info_output
@@ -389,7 +389,7 @@
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price,
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price,
                                                                        sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
@@ -408,7 +408,7 @@
                                        decimal.Decimal("0.00"))
                                    # 获取执行位时间
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data(
                                        code)
                                    if True:
                                        # 只有下单过后才获取交易进度
@@ -545,9 +545,9 @@
                                                                                               price)
                            # if need_cancel:
                            #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                            # if need_sync:
                            #     # 同步数据
                            #     L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
test/l2_trade_test.py
@@ -30,10 +30,10 @@
    for k in keys:
        RedisUtils.delete(redis_l2, k.format(code), auto_free=False)
    RedisUtils.realse(redis_l2)
    l2.l2_data_manager.TradePointManager.delete_buy_point(code)
    l2.l2_data_manager.TradePointManager().delete_buy_point(code)
    big_money_num_manager.reset(code)
    RedisUtils.delete( redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    trade_data_manager.PlaceOrderCountManager.clear_place_order_count(code)
    trade_data_manager.PlaceOrderCountManager().clear_place_order_count(code)
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
    for k in keys:
@@ -66,7 +66,7 @@
                        decimal.Decimal("0.00"))
                    # 获取执行位时间
                    exec_time = None
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data(
                        code)
                    if buy_exec_index:
                        try:
@@ -148,8 +148,7 @@
                time_s = tool.get_time_as_second(time_) - i - 1
                volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
                if volumn is not None:
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                                  tool.time_seconds_format(time_s))
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn), tool.time_seconds_format(time_s))
                    break
            # 设置委买队列
            for i in range(0, len(buy_queues)):
@@ -200,8 +199,8 @@
#     L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, buy_exec_index, buy_single_index,
#                                              buy_exec_index, False)
#
#     l2_data_manager.TradePointManager.get_buy_compute_start_data = mock.Mock(return_value=(426, 479, 479, 0, 100))
#     buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data(
#     l2_data_manager.TradePointManager().get_buy_compute_start_data = mock.Mock(return_value=(426, 479, 479, 0, 100))
#     buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager().get_buy_compute_start_data(
#         code)
#     processor.unreal_buy_dict[code] = mock.Mock(return_value=(479, 167234623))
#
third_data/data_server.py
@@ -499,7 +499,7 @@
                    code = d[0]
                    if code.find("00") == 0 or code.find("60") == 0:
                        limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2]))
                        code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time)
                        code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time)
                self.__kplDataManager.save_data(type_, result_list)
                kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list)
        elif type_ == KPLDataType.OPEN_LIMIT_UP.value:
trade/huaxin/trade_server.py
@@ -197,7 +197,7 @@
                                        break
                                # 获取执行位时间
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data_cache(
                                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                                    code)
                                if True:
                                    if buy_progress_index is not None:
@@ -255,7 +255,7 @@
                            if limit_up_price is not None:
                                # 处理买1,卖1信息
                                code_price_manager.Buy1PriceManager.process(code, buy_1_price, time_str, limit_up_price,
                                code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, limit_up_price,
                                                                            sell_1_price, sell_1_volume // 100)
                                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                                # 如果涨幅大于8%就读取板块
trade/l2_trade_factor.py
@@ -52,7 +52,7 @@
            return 100
        # 暂时不需要次此中策略
        # # 判断有没有炸开
        # if code_price_manager.Buy1PriceManager.is_can_buy(self.code):
        # if code_price_manager.Buy1PriceManager().is_can_buy(self.code):
        #     # 回封
        #     if self.score_index == 0:
        #         return 0
trade/trade_data_manager.py
@@ -370,38 +370,65 @@
# 涨停次数管理
class PlaceOrderCountManager:
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __place_order_count_cache={}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(PlaceOrderCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __incre_place_order_count(cls, code):
        key = "place_order_count-{}".format(code)
        RedisUtils.incrby(cls.__get_redis(), key, 1)
        RedisUtils.expire(cls.__get_redis(), key, tool.get_expire())
    def __load_datas(cls):
        redis_ = cls.__get_redis()
        try:
            keys = RedisUtils.keys(redis_, "place_order_count-*")
            for k in keys:
                code = k.split("-")[-1]
                count = RedisUtils.get(redis_, k)
                cls.__place_order_count_cache[code] = int(count)
        finally:
            RedisUtils.realse(redis_)
    @classmethod
    def __get_place_order_count(cls, code):
    def __incre_place_order_count(self, code):
        if code not in self.__place_order_count_cache:
            self.__place_order_count_cache[code] = 0
        self.__place_order_count_cache[code] += 1
        key = "place_order_count-{}".format(code)
        count = RedisUtils.get(cls.__get_redis(), key)
        RedisUtils.incrby_async(self.__db, key, 1)
        RedisUtils.expire_async(self.__db, key, tool.get_expire())
    def __get_place_order_count(self, code):
        key = "place_order_count-{}".format(code)
        count = RedisUtils.get(self.__get_redis(), key)
        if count is not None:
            return int(count)
        return 0
    @classmethod
    def place_order(cls, code):
        cls.__incre_place_order_count(code)
    def __get_place_order_count_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache,code)
        if cache_result[0]:
            return cache_result[1]
        return 0
    @classmethod
    def get_place_order_count(cls, code):
        return cls.__get_place_order_count(code)
    def place_order(self, code):
        self.__incre_place_order_count(code)
    @classmethod
    def clear_place_order_count(cls, code):
    def get_place_order_count(self, code):
        return self.__get_place_order_count_cache(code)
    def clear_place_order_count(self, code):
        self.__place_order_count_cache[code] = 0
        key = "place_order_count-{}".format(code)
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete_async(self.__db, key)
if __name__ == "__main__":
trade/trade_manager.py
@@ -580,9 +580,8 @@
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
                kp_client_msg_manager.add_msg(code, "买入成交")
                l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager.delete_buy_point(code)
                l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager().delete_buy_point(code)
                # 移除交易窗口分配
                if trade_gui is not None:
                    trade_gui.THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
trade/trade_result_manager.py
@@ -18,24 +18,22 @@
def virtual_buy_success(code):
    # 增加下单计算
    trade_data_manager.PlaceOrderCountManager.place_order(code)
    trade_data_manager.PlaceOrderCountManager().place_order(code)
    # 删除之前的板上卖信息
    L2LimitUpSellStatisticUtil().delete(code)
# 虚拟撤成功
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    f1 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f1 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    # 安全笔数计算
    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6, f7, f8)
    dask.compute(f1, f2, f5, f6, f7, f8)
# 真实买成功
@@ -71,7 +69,7 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data_cache(
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
        code)
    f1 = clear_max_buy1_volume(code)
@@ -79,7 +77,7 @@
    f3 = h_cancel(code, buy_single_index, buy_exec_index)
    f4 = l_cancel(code)
    dask.compute(f1, f2, f3, f4)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
# 真实撤成功
@@ -88,24 +86,20 @@
    f1 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    # 取消买入标识
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6, f7, f8)
    dask.compute(f1, f2, f3, f6, f7, f8)
if __name__ == "__main__":
    code = "600246"
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    dask.compute(f2, f3, f4, f5, f6, f7, f8)
    dask.compute(f2, f3, f6, f7, f8)