Administrator
2023-08-08 c20c3c10635ce78db4a86ce9c0bb1d02e90f525d
l2/cancel_buy_strategy.py
@@ -32,64 +32,78 @@
    __sCancelParamsManager = l2_trade_factor.SCancelParamsManager
    __s_big_num_cancel_compute_data_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(SecondCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code,
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "s_big_num_cancel_compute_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 保存结束位置
    def __save_compute_data(self, code, process_index, buy_num, cancel_num):
        CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code,
                                    (process_index, buy_num, cancel_num))
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
    def __get_compute_data(self, code):
        key = "s_big_num_cancel_compute_data-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_compute_data_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__s_big_num_cancel_compute_data_cache, code)
    def __get_compute_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__s_big_num_cancel_compute_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val)
        val = self.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, val)
        return val
    @classmethod
    def __clear_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
        ks = ["s_big_num_cancel_compute_data-{}".format(code)]
        for key in ks:
            RedisUtils.delete_async(cls.__db, key)
            RedisUtils.delete_async(self.__db, key)
    @classmethod
    def clear_data(cls):
    def clear_data(self):
        ks = ["s_big_num_cancel_compute_data-*"]
        for key in ks:
            keys = RedisUtils.keys(cls.__get_redis(), key)
            keys = RedisUtils.keys(self.__get_redis(), key)
            for k in keys:
                code = k.replace("s_big_num_cancel_compute_data-", "")
                cls.__clear_data(code)
                self.__clear_data(code)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, volume_rate_index):
    def __compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, volume_rate_index):
        # 点火大单数量
        fire_count = cls.__sCancelParamsManager.get_max_exclude_count(volume_rate_index)
        return cls.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count,
                                        constant.S_CANCEL_MIN_MONEY)
        fire_count = self.__sCancelParamsManager.get_max_exclude_count(volume_rate_index)
        return self.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count,
                                         constant.S_CANCEL_MIN_MONEY)
    # 计算未撤的总手数
    @classmethod
    def compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w):
    def compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
@@ -128,8 +142,7 @@
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
                    buy_volume_rate_index,
                    volume_rate_index,
                    need_cancel=True):
@@ -150,7 +163,7 @@
                    break
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data_cache(code)
        process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
@@ -160,8 +173,8 @@
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                      total_data, place_order_count)
            left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                       total_data, place_order_count)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
@@ -180,8 +193,8 @@
            #         if buy_index is not None and a_start_index <= buy_index <= a_end_index:
            #             # 在买入信号之后
            #             cls.__save_cancel_data(code, i)
        range_seconds = cls.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
        cancel_rate_threshold = cls.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
        range_seconds = self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
        cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
@@ -240,13 +253,12 @@
                                buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold, range_seconds)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
            self.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def cancel_success(cls, code):
        cls.__clear_data(code)
    def cancel_success(self, code):
        self.__clear_data(code)
# --------------------------------H撤-------------------------------
@@ -263,172 +275,204 @@
    __cancel_traded_progress_cache = {}
    __cancel_compute_data_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交位置到执行位置的揽括范围数据
    @classmethod
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs_exec-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_canceled_indexs-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_traded_progress-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_compute_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 保存成交位置到执行位置的揽括范围数据
    def __save_watch_index_set(self, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set(cls, code):
    def __get_watch_index_set(self, code):
        key = f"h_cancel_watch_indexs-{code}"
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, False
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_watch_index_set_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_cache, code)
    def __get_watch_index_set_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_index_set(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
        val = self.__get_watch_index_set(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, val)
        return val
    # 保存执行位置后面的守护数据
    @classmethod
    def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code,
    def __save_watch_index_set_after_exec(self, code, datas, process_index, total_count, big_num_count, finished):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set_after_exec(cls, code):
    def __get_watch_index_set_after_exec(self, code):
        key = f"h_cancel_watch_indexs_exec-{code}"
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return [], -1, 0, 0, False
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    @classmethod
    def __get_watch_index_set_after_exec_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_exec_cache, code)
    def __get_watch_index_set_after_exec_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_exec_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_index_set_after_exec(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val)
        val = self.__get_watch_index_set_after_exec(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, val)
        return val
    # 保存已经撤单的监听位置
    @classmethod
    def __add_watch_canceled_indexes(cls, code, indexes):
        if code not in cls.__cancel_watch_canceled_indexs_cache:
            cls.__cancel_watch_canceled_indexs_cache[code] = set()
    def __add_watch_canceled_indexes(self, code, indexes):
        if code not in self.__cancel_watch_canceled_indexs_cache:
            self.__cancel_watch_canceled_indexs_cache[code] = set()
        key = f"h_cancel_watch_canceled_indexs-{code}"
        for index in indexes:
            cls.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, key, index)
        RedisUtils.expire_async(cls.__db, key, tool.get_expire())
            self.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, key, index)
        RedisUtils.expire_async(self.__db, key, tool.get_expire())
    @classmethod
    def __get_watch_canceled_index(cls, code):
    def __get_watch_canceled_index(self, code):
        key = f"h_cancel_watch_canceled_indexs-{code}"
        return RedisUtils.smembers(cls.__get_redis(), key)
        return RedisUtils.smembers(self.__get_redis(), key)
    @classmethod
    def __get_watch_canceled_index_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_canceled_indexs_cache, code)
    def __get_watch_canceled_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_canceled_indexs_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_canceled_index(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val)
        val = self.__get_watch_canceled_index(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_canceled_indexs_cache, code, val)
        return val
    @classmethod
    def __del_watch_canceled_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
    def __del_watch_canceled_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete(self.__get_redis(), key)
    # 保存成交进度
    @classmethod
    def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
    def __save_traded_progress(self, code, origin_process_index, latest_process_index):
        CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
    def __get_traded_progress(self, code):
        key = "h_cancel_traded_progress-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __get_traded_progress_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_traded_progress_cache, code)
    def __get_traded_progress_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_traded_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_traded_progress(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val)
        val = self.__get_traded_progress(code)
        CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, val)
        return val
    # 保存结算位置
    @classmethod
    def __save_compute_data(cls, code, process_index, cancel_num):
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code,
    def __save_compute_data(self, code, process_index, cancel_num):
        CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
    def __get_compute_data(self, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __get_compute_data_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_compute_data_cache, code)
    def __get_compute_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_compute_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val)
        val = self.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, val)
        return val
    @classmethod
    def __del_compute_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code)
    def __del_compute_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete(self.__get_redis(), key)
    @classmethod
    def __clear_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_traded_progress_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_exec_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_traded_progress_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_exec_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}",
              f"h_cancel_watch_canceled_indexs-{code}"]
        for key in ks:
            RedisUtils.delete(cls.__get_redis(), key)
            RedisUtils.delete(self.__get_redis(), key)
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    local_today_num_operate_map,
                    buy_volume_index, volume_index,
                    is_first_code):
@@ -436,14 +480,14 @@
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
            # 开始计算需要监控的单
            cls.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data,
                                                    local_today_num_operate_map, buy_volume_index)
            self.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data,
                                                     local_today_num_operate_map, buy_volume_index)
        # 守护30s以外的数据
        if time_space <= constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        # 获取成交进度
        origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code)
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        if latest_progress_index is None:
            latest_progress_index = -1
        # 监听的数据
@@ -454,7 +498,7 @@
        total_nums = 1
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set_cache(code)[0]
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
@@ -464,7 +508,7 @@
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code)
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
@@ -473,17 +517,17 @@
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        processed_index, cancel_num = cls.__get_compute_data_cache(code)
        processed_index, cancel_num = self.__get_compute_data_cache(code)
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index)
        cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
        has_watch_canceled = False
        # 获取之前已经撤单的数据
        old_canceld_indexs = cls.__get_watch_canceled_index_cache(code)
        old_canceld_indexs = self.__get_watch_canceled_index_cache(code)
        # 重新计算撤单
        cancel_num = 0
        if old_canceld_indexs:
@@ -521,9 +565,9 @@
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            return True, data
            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
@@ -543,9 +587,9 @@
            logger_l2_h_cancel.info(
                f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # H撤已撤订单
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{cls.__get_watch_canceled_index_cache(code)}")
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{self.__get_watch_canceled_index_cache(code)}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
            self.__save_compute_data(code, process_index, cancel_num)
            # 有观测数据撤单
            if has_watch_canceled:
                now_rate = round(cancel_num / total_nums, 4)
@@ -557,33 +601,30 @@
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
    def place_order_success(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        self.__clear_data(code)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        cls.__tradeBuyQueue.set_traded_index(code, index)
    def set_trade_progress(self, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        self.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time,
                                                                               total_data[buy_exec_index]["val"][
                                                                                   "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = cls.__get_traded_progress_cache(code)
        origin_index, latest_index = self.__get_traded_progress_cache(code)
        if origin_index is None:
            cls.__save_traded_progress(code, index, index)
            self.__save_traded_progress(code, index, index)
            # 计算揽括范围
            cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                           local_today_num_operate_map)
            self.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                            local_today_num_operate_map)
        else:
            cls.__save_traded_progress(code, origin_index, index)
            self.__save_traded_progress(code, origin_index, index)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map,
    def __get_limit_up_buy_no_canceled_count(self, code, index, total_data, local_today_num_operate_map,
                                             MAX_EXPIRE_CANCEL_TIME=None):
        data = None
        try:
@@ -619,13 +660,12 @@
        # 计算排名前N的大单
    # 过时数据
    @classmethod
    def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count):
    def __compute_top_n_num(self, code, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                        local_today_num_operate_map)
            not_cancel_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                         local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
@@ -637,14 +677,13 @@
        return watch_set
    # 从成交位置到执行位置
    @classmethod
    def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data,
    def __compute_watch_indexs_between_traded_exec(self, code, progress_index, buy_exec_index, total_data,
                                                   local_today_num_operate_map):
        total_count = 0
        watch_set = set()
        big_num_count = 0
        for i in range(progress_index, buy_exec_index):
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
@@ -659,15 +698,14 @@
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控成交位到执行位:{final_watch_list}")
        cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        self.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 计算执行位置之后的需要监听的数据
    @classmethod
    def __compute_watch_indexs_after_single(cls, code, buy_single_index, buy_exec_index, total_data,
    def __compute_watch_indexs_after_single(self, code, buy_single_index, buy_exec_index, total_data,
                                            local_today_num_operate_map, buy_volume_index):
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec_cache(
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = self.__get_watch_index_set_after_exec_cache(
            code)
        if watch_list and finish:
            # 已经计算完了不需要再进行计算
@@ -683,17 +721,17 @@
        big_num_count = big_num_count_old
        total_count = total_count_old
        # H撤单
        MIN_H_COUNT = cls.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        MIN_H_COUNT = self.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        # 从买入信号位3条数据开始计算
        for i in range(buy_single_index + 3, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                  local_today_num_operate_map,
                                                                  tool.trade_time_add_second(
                                                                      total_data[buy_exec_index]["val"]["time"],
                                                                      constant.S_CANCEL_EXPIRE_TIME))
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                   local_today_num_operate_map,
                                                                   tool.trade_time_add_second(
                                                                       total_data[buy_exec_index]["val"]["time"],
                                                                       constant.S_CANCEL_EXPIRE_TIME))
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
@@ -720,22 +758,21 @@
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控执行位相邻单:{final_watch_list} 目标计算数量:{MIN_H_COUNT}")
        # 保存计算范围
        cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                              finished)
        self.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                               finished)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 获取H撤监听的数据索引范围
    # 返回监听范围与已撤单索引
    @classmethod
    def get_watch_index_dict(cls, code):
        origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code)
    def get_watch_index_dict(self, code):
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        # 监听的数据
        watch_indexs_dict = {}
        total_nums = 0
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set_cache(code)[0]
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
@@ -744,12 +781,12 @@
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code)
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
        return watch_indexs_dict, cls.__get_watch_canceled_index_cache(code)
        return watch_indexs_dict, self.__get_watch_canceled_index_cache(code)
# ---------------------------------D撤-------------------------------
@@ -760,57 +797,71 @@
    __redis_manager = redis_manager.RedisManager(0)
    __cancel_real_order_index_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "d_cancel_real_order_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __set_real_order_index(cls, code, index):
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(cls.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    def __set_real_order_index(self, code, index):
        CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(self.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __del_real_order_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(cls.__db, f"d_cancel_real_order_index-{code}")
    def __del_real_order_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(self.__db, f"d_cancel_real_order_index-{code}")
    @classmethod
    def __get_real_order_index(cls, code):
        val = RedisUtils.get(cls.__db, f"d_cancel_real_order_index-{code}")
    def __get_real_order_index(self, code):
        val = RedisUtils.get(self.__db, f"d_cancel_real_order_index-{code}")
        if val:
            return int(val)
        return None
    @classmethod
    def __get_real_order_index_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_real_order_index_cache, code)
    def __get_real_order_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_real_order_index(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, val)
        val = self.__get_real_order_index(code)
        CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, val)
        return val
    @classmethod
    def clear(cls, code=None):
    def clear(self, code=None):
        if code:
            cls.__del_real_order_index(code)
            self.__del_real_order_index(code)
        else:
            keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*")
            keys = RedisUtils.keys(self.__get_redis(), "d_cancel_real_order_index-*")
            if keys:
                for k in keys:
                    code = k.replace("d_cancel_real_order_index-", "")
                    cls.__del_real_order_index(code)
                    self.__del_real_order_index(code)
    # 设置成交位
    @classmethod
    def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
    def set_trade_progress(self, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
                           limit_up_price):
        # 离下单执行位2分钟内的有效
        if tool.trade_time_sub(total_data[-1]['val']['time'],
                               total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
            return False, "超过D撤守护时间"
        real_order_index = cls.__get_real_order_index_cache(code)
        real_order_index = self.__get_real_order_index_cache(code)
        if not real_order_index:
            return False, "尚未获取到真实下单位置"
@@ -837,18 +888,15 @@
        return False, ""
    # 设置真实的下单位置
    @classmethod
    def set_real_order_index(cls, code, index):
        cls.__set_real_order_index(code, index)
    def set_real_order_index(self, code, index):
        self.__set_real_order_index(code, index)
        logger_l2_d_cancel.info(f"{code}下单位置设置:{index}")
    @classmethod
    def place_order_success(cls, code):
        cls.clear(code)
    def place_order_success(self, code):
        self.clear(code)
    @classmethod
    def cancel_success(cls, code):
        cls.clear(code)
    def cancel_success(self, code):
        self.clear(code)
# ---------------------------------L撤-------------------------------
@@ -859,64 +907,78 @@
    __last_trade_progress_dict = {}
    __cancel_watch_index_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __add_watch_indexes(cls, code, indexes):
    def __add_watch_indexes(self, code, indexes):
        if not indexes:
            return
        if code not in cls.__cancel_watch_index_cache:
            cls.__cancel_watch_index_cache[code] = set()
        if code not in self.__cancel_watch_index_cache:
            self.__cancel_watch_index_cache[code] = set()
        for index in indexes:
            cls.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(cls.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
            self.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
    @classmethod
    def __del_watch_indexes(cls, code, indexes):
    def __del_watch_indexes(self, code, indexes):
        if not indexes:
            return
        for index in indexes:
            if code in cls.__cancel_watch_index_cache:
                cls.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(cls.__db, f"l_cancel_watch_index-{code}", index)
            if code in self.__cancel_watch_index_cache:
                self.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    @classmethod
    def __get_watch_indexes(cls, code):
        return RedisUtils.smembers(cls.__get_redis(), f"l_cancel_watch_index-{code}")
    def __get_watch_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}")
    @classmethod
    def __get_watch_indexes_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_index_cache, code)
    def __get_watch_indexes_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_indexes(code)
        cls.__cancel_watch_index_cache[code] = val
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val)
        val = self.__get_watch_indexes(code)
        self.__cancel_watch_index_cache[code] = val
        CodeDataCacheUtil.set_cache(self.__cancel_watch_index_cache, code, val)
        return val
    @classmethod
    def del_watch_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(cls.__db, f"l_cancel_watch_index-{code}")
    def del_watch_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
    @classmethod
    def clear(cls, code=None):
    def clear(self, code=None):
        if code:
            cls.del_watch_index(code)
            self.del_watch_index(code)
        else:
            keys = RedisUtils.keys(cls.__get_redis(), f"l_cancel_watch_index-*")
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            for k in keys:
                code = k.replace("l_cancel_watch_index-", "")
                cls.del_watch_index(code)
                self.del_watch_index(code)
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    @classmethod
    def set_trade_progress(cls, code, index, total_data):
        old_watch_indexes = cls.__get_watch_indexes_cache(code)
        if cls.__last_trade_progress_dict.get(code) == index and len(
    def set_trade_progress(self, code, index, total_data):
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if self.__last_trade_progress_dict.get(code) == index and len(
                old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
            # 成交进度尚未发生变化且已经监听到了足够的数据
            return
@@ -947,18 +1009,17 @@
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        cls.__add_watch_indexes(code, add_indexes)
        cls.__del_watch_indexes(code, delete_indexes)
        self.__add_watch_indexes(code, add_indexes)
        self.__del_watch_indexes(code, delete_indexes)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
                    is_first_code):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
        if time_space <= constant.S_CANCEL_EXPIRE_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"):
            return False, None
        watch_indexes = cls.__get_watch_indexes_cache(code)
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
            return False, None
        watch_indexes = set([int(i) for i in watch_indexes])
@@ -994,13 +1055,11 @@
        return False, None
    @classmethod
    def place_order_success(cls, code):
        cls.clear(code)
    def place_order_success(self, code):
        self.clear(code)
    @classmethod
    def cancel_success(cls, code):
        cls.clear(code)
    def cancel_success(self, code):
        self.clear(code)
# --------------------------------封单额变化撤------------------------
@@ -1032,9 +1091,7 @@
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    def __get_l2_second_money_record(self, code, time):