Administrator
2023-08-08 c20c3c10635ce78db4a86ce9c0bb1d02e90f525d
单例+缓存优化
13个文件已修改
759 ■■■■■ 已修改文件
constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 557 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/block_info.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -109,7 +109,7 @@
D_CANCEL_RATE = 0.5
# L撤
L_CANCEL_MAX_WATCH_COUNT = 5
L_CANCEL_MAX_WATCH_COUNT = 15
# 撤单比例
L_CANCEL_RATE = 0.6
# 最小金额
inited_data.py
@@ -88,9 +88,9 @@
        # 清空暂停交易代码
        gpcode_manager.PauseBuyCodesManager().clear()
        # 清除L撤数据
        LCancelBigNumComputer.clear()
        LCancelBigNumComputer().clear()
        # 清除D撤数据
        DCancelBigNumComputer.clear()
        DCancelBigNumComputer().clear()
# 每日初始化
l2/cancel_buy_strategy.py
@@ -32,64 +32,78 @@
    __sCancelParamsManager = l2_trade_factor.SCancelParamsManager
    __s_big_num_cancel_compute_data_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(SecondCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code,
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "s_big_num_cancel_compute_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 保存结束位置
    def __save_compute_data(self, code, process_index, buy_num, cancel_num):
        CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code,
                                    (process_index, buy_num, cancel_num))
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
    def __get_compute_data(self, code):
        key = "s_big_num_cancel_compute_data-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_compute_data_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__s_big_num_cancel_compute_data_cache, code)
    def __get_compute_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__s_big_num_cancel_compute_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val)
        val = self.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code, val)
        return val
    @classmethod
    def __clear_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
        ks = ["s_big_num_cancel_compute_data-{}".format(code)]
        for key in ks:
            RedisUtils.delete_async(cls.__db, key)
            RedisUtils.delete_async(self.__db, key)
    @classmethod
    def clear_data(cls):
    def clear_data(self):
        ks = ["s_big_num_cancel_compute_data-*"]
        for key in ks:
            keys = RedisUtils.keys(cls.__get_redis(), key)
            keys = RedisUtils.keys(self.__get_redis(), key)
            for k in keys:
                code = k.replace("s_big_num_cancel_compute_data-", "")
                cls.__clear_data(code)
                self.__clear_data(code)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, volume_rate_index):
    def __compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, volume_rate_index):
        # 点火大单数量
        fire_count = cls.__sCancelParamsManager.get_max_exclude_count(volume_rate_index)
        return cls.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count,
                                        constant.S_CANCEL_MIN_MONEY)
        fire_count = self.__sCancelParamsManager.get_max_exclude_count(volume_rate_index)
        return self.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count,
                                         constant.S_CANCEL_MIN_MONEY)
    # 计算未撤的总手数
    @classmethod
    def compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w):
    def compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
@@ -128,8 +142,7 @@
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
                    buy_volume_rate_index,
                    volume_rate_index,
                    need_cancel=True):
@@ -150,7 +163,7 @@
                    break
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data_cache(code)
        process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
@@ -160,8 +173,8 @@
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                      total_data, place_order_count)
            left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                       total_data, place_order_count)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
@@ -180,8 +193,8 @@
            #         if buy_index is not None and a_start_index <= buy_index <= a_end_index:
            #             # 在买入信号之后
            #             cls.__save_cancel_data(code, i)
        range_seconds = cls.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
        cancel_rate_threshold = cls.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
        range_seconds = self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
        cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
@@ -240,13 +253,12 @@
                                buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold, range_seconds)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
            self.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def cancel_success(cls, code):
        cls.__clear_data(code)
    def cancel_success(self, code):
        self.__clear_data(code)
# --------------------------------H撤-------------------------------
@@ -263,172 +275,204 @@
    __cancel_traded_progress_cache = {}
    __cancel_compute_data_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交位置到执行位置的揽括范围数据
    @classmethod
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs_exec-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_canceled_indexs-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_traded_progress-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_compute_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 保存成交位置到执行位置的揽括范围数据
    def __save_watch_index_set(self, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set(cls, code):
    def __get_watch_index_set(self, code):
        key = f"h_cancel_watch_indexs-{code}"
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, False
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_watch_index_set_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_cache, code)
    def __get_watch_index_set_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_index_set(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
        val = self.__get_watch_index_set(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, val)
        return val
    # 保存执行位置后面的守护数据
    @classmethod
    def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code,
    def __save_watch_index_set_after_exec(self, code, datas, process_index, total_count, big_num_count, finished):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set_after_exec(cls, code):
    def __get_watch_index_set_after_exec(self, code):
        key = f"h_cancel_watch_indexs_exec-{code}"
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return [], -1, 0, 0, False
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    @classmethod
    def __get_watch_index_set_after_exec_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_indexs_exec_cache, code)
    def __get_watch_index_set_after_exec_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_exec_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_index_set_after_exec(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val)
        val = self.__get_watch_index_set_after_exec(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code, val)
        return val
    # 保存已经撤单的监听位置
    @classmethod
    def __add_watch_canceled_indexes(cls, code, indexes):
        if code not in cls.__cancel_watch_canceled_indexs_cache:
            cls.__cancel_watch_canceled_indexs_cache[code] = set()
    def __add_watch_canceled_indexes(self, code, indexes):
        if code not in self.__cancel_watch_canceled_indexs_cache:
            self.__cancel_watch_canceled_indexs_cache[code] = set()
        key = f"h_cancel_watch_canceled_indexs-{code}"
        for index in indexes:
            cls.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, key, index)
        RedisUtils.expire_async(cls.__db, key, tool.get_expire())
            self.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, key, index)
        RedisUtils.expire_async(self.__db, key, tool.get_expire())
    @classmethod
    def __get_watch_canceled_index(cls, code):
    def __get_watch_canceled_index(self, code):
        key = f"h_cancel_watch_canceled_indexs-{code}"
        return RedisUtils.smembers(cls.__get_redis(), key)
        return RedisUtils.smembers(self.__get_redis(), key)
    @classmethod
    def __get_watch_canceled_index_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_canceled_indexs_cache, code)
    def __get_watch_canceled_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_canceled_indexs_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_canceled_index(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val)
        val = self.__get_watch_canceled_index(code)
        CodeDataCacheUtil.set_cache(self.__cancel_watch_canceled_indexs_cache, code, val)
        return val
    @classmethod
    def __del_watch_canceled_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
    def __del_watch_canceled_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete(self.__get_redis(), key)
    # 保存成交进度
    @classmethod
    def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
    def __save_traded_progress(self, code, origin_process_index, latest_process_index):
        CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
    def __get_traded_progress(self, code):
        key = "h_cancel_traded_progress-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __get_traded_progress_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_traded_progress_cache, code)
    def __get_traded_progress_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_traded_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_traded_progress(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val)
        val = self.__get_traded_progress(code)
        CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code, val)
        return val
    # 保存结算位置
    @classmethod
    def __save_compute_data(cls, code, process_index, cancel_num):
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code,
    def __save_compute_data(self, code, process_index, cancel_num):
        CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
    def __get_compute_data(self, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = RedisUtils.get(cls.__get_redis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __get_compute_data_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_compute_data_cache, code)
    def __get_compute_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_compute_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val)
        val = self.__get_compute_data(code)
        CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code, val)
        return val
    @classmethod
    def __del_compute_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code)
    def __del_compute_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete(self.__get_redis(), key)
    @classmethod
    def __clear_data(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_traded_progress_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_indexs_exec_cache, code)
        CodeDataCacheUtil.clear_cache(cls.__cancel_compute_data_cache, code)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_traded_progress_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_exec_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}",
              f"h_cancel_watch_canceled_indexs-{code}"]
        for key in ks:
            RedisUtils.delete(cls.__get_redis(), key)
            RedisUtils.delete(self.__get_redis(), key)
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    local_today_num_operate_map,
                    buy_volume_index, volume_index,
                    is_first_code):
@@ -436,14 +480,14 @@
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
            # 开始计算需要监控的单
            cls.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data,
                                                    local_today_num_operate_map, buy_volume_index)
            self.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data,
                                                     local_today_num_operate_map, buy_volume_index)
        # 守护30s以外的数据
        if time_space <= constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        # 获取成交进度
        origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code)
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        if latest_progress_index is None:
            latest_progress_index = -1
        # 监听的数据
@@ -454,7 +498,7 @@
        total_nums = 1
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set_cache(code)[0]
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
@@ -464,7 +508,7 @@
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code)
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
@@ -473,17 +517,17 @@
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        processed_index, cancel_num = cls.__get_compute_data_cache(code)
        processed_index, cancel_num = self.__get_compute_data_cache(code)
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index)
        cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
        has_watch_canceled = False
        # 获取之前已经撤单的数据
        old_canceld_indexs = cls.__get_watch_canceled_index_cache(code)
        old_canceld_indexs = self.__get_watch_canceled_index_cache(code)
        # 重新计算撤单
        cancel_num = 0
        if old_canceld_indexs:
@@ -521,9 +565,9 @@
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            return True, data
            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
@@ -543,9 +587,9 @@
            logger_l2_h_cancel.info(
                f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # H撤已撤订单
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{cls.__get_watch_canceled_index_cache(code)}")
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{self.__get_watch_canceled_index_cache(code)}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
            self.__save_compute_data(code, process_index, cancel_num)
            # 有观测数据撤单
            if has_watch_canceled:
                now_rate = round(cancel_num / total_nums, 4)
@@ -557,33 +601,30 @@
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
    def place_order_success(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        self.__clear_data(code)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        cls.__tradeBuyQueue.set_traded_index(code, index)
    def set_trade_progress(self, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        self.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time,
                                                                               total_data[buy_exec_index]["val"][
                                                                                   "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = cls.__get_traded_progress_cache(code)
        origin_index, latest_index = self.__get_traded_progress_cache(code)
        if origin_index is None:
            cls.__save_traded_progress(code, index, index)
            self.__save_traded_progress(code, index, index)
            # 计算揽括范围
            cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                           local_today_num_operate_map)
            self.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                            local_today_num_operate_map)
        else:
            cls.__save_traded_progress(code, origin_index, index)
            self.__save_traded_progress(code, origin_index, index)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map,
    def __get_limit_up_buy_no_canceled_count(self, code, index, total_data, local_today_num_operate_map,
                                             MAX_EXPIRE_CANCEL_TIME=None):
        data = None
        try:
@@ -619,13 +660,12 @@
        # 计算排名前N的大单
    # 过时数据
    @classmethod
    def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count):
    def __compute_top_n_num(self, code, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                        local_today_num_operate_map)
            not_cancel_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                         local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
@@ -637,14 +677,13 @@
        return watch_set
    # 从成交位置到执行位置
    @classmethod
    def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data,
    def __compute_watch_indexs_between_traded_exec(self, code, progress_index, buy_exec_index, total_data,
                                                   local_today_num_operate_map):
        total_count = 0
        watch_set = set()
        big_num_count = 0
        for i in range(progress_index, buy_exec_index):
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
@@ -659,15 +698,14 @@
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控成交位到执行位:{final_watch_list}")
        cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        self.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 计算执行位置之后的需要监听的数据
    @classmethod
    def __compute_watch_indexs_after_single(cls, code, buy_single_index, buy_exec_index, total_data,
    def __compute_watch_indexs_after_single(self, code, buy_single_index, buy_exec_index, total_data,
                                            local_today_num_operate_map, buy_volume_index):
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec_cache(
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = self.__get_watch_index_set_after_exec_cache(
            code)
        if watch_list and finish:
            # 已经计算完了不需要再进行计算
@@ -683,17 +721,17 @@
        big_num_count = big_num_count_old
        total_count = total_count_old
        # H撤单
        MIN_H_COUNT = cls.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        MIN_H_COUNT = self.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        # 从买入信号位3条数据开始计算
        for i in range(buy_single_index + 3, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                  local_today_num_operate_map,
                                                                  tool.trade_time_add_second(
                                                                      total_data[buy_exec_index]["val"]["time"],
                                                                      constant.S_CANCEL_EXPIRE_TIME))
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                   local_today_num_operate_map,
                                                                   tool.trade_time_add_second(
                                                                       total_data[buy_exec_index]["val"]["time"],
                                                                       constant.S_CANCEL_EXPIRE_TIME))
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
@@ -720,22 +758,21 @@
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控执行位相邻单:{final_watch_list} 目标计算数量:{MIN_H_COUNT}")
        # 保存计算范围
        cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                              finished)
        self.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                               finished)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 获取H撤监听的数据索引范围
    # 返回监听范围与已撤单索引
    @classmethod
    def get_watch_index_dict(cls, code):
        origin_progress_index, latest_progress_index = cls.__get_traded_progress_cache(code)
    def get_watch_index_dict(self, code):
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        # 监听的数据
        watch_indexs_dict = {}
        total_nums = 0
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set_cache(code)[0]
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
@@ -744,12 +781,12 @@
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec_cache(code)
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
        return watch_indexs_dict, cls.__get_watch_canceled_index_cache(code)
        return watch_indexs_dict, self.__get_watch_canceled_index_cache(code)
# ---------------------------------D撤-------------------------------
@@ -760,57 +797,71 @@
    __redis_manager = redis_manager.RedisManager(0)
    __cancel_real_order_index_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "d_cancel_real_order_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __set_real_order_index(cls, code, index):
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(cls.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    def __set_real_order_index(self, code, index):
        CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(self.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __del_real_order_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(cls.__db, f"d_cancel_real_order_index-{code}")
    def __del_real_order_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(self.__db, f"d_cancel_real_order_index-{code}")
    @classmethod
    def __get_real_order_index(cls, code):
        val = RedisUtils.get(cls.__db, f"d_cancel_real_order_index-{code}")
    def __get_real_order_index(self, code):
        val = RedisUtils.get(self.__db, f"d_cancel_real_order_index-{code}")
        if val:
            return int(val)
        return None
    @classmethod
    def __get_real_order_index_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_real_order_index_cache, code)
    def __get_real_order_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_real_order_index(code)
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, val)
        val = self.__get_real_order_index(code)
        CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, val)
        return val
    @classmethod
    def clear(cls, code=None):
    def clear(self, code=None):
        if code:
            cls.__del_real_order_index(code)
            self.__del_real_order_index(code)
        else:
            keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*")
            keys = RedisUtils.keys(self.__get_redis(), "d_cancel_real_order_index-*")
            if keys:
                for k in keys:
                    code = k.replace("d_cancel_real_order_index-", "")
                    cls.__del_real_order_index(code)
                    self.__del_real_order_index(code)
    # 设置成交位
    @classmethod
    def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
    def set_trade_progress(self, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
                           limit_up_price):
        # 离下单执行位2分钟内的有效
        if tool.trade_time_sub(total_data[-1]['val']['time'],
                               total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
            return False, "超过D撤守护时间"
        real_order_index = cls.__get_real_order_index_cache(code)
        real_order_index = self.__get_real_order_index_cache(code)
        if not real_order_index:
            return False, "尚未获取到真实下单位置"
@@ -837,18 +888,15 @@
        return False, ""
    # 设置真实的下单位置
    @classmethod
    def set_real_order_index(cls, code, index):
        cls.__set_real_order_index(code, index)
    def set_real_order_index(self, code, index):
        self.__set_real_order_index(code, index)
        logger_l2_d_cancel.info(f"{code}下单位置设置:{index}")
    @classmethod
    def place_order_success(cls, code):
        cls.clear(code)
    def place_order_success(self, code):
        self.clear(code)
    @classmethod
    def cancel_success(cls, code):
        cls.clear(code)
    def cancel_success(self, code):
        self.clear(code)
# ---------------------------------L撤-------------------------------
@@ -859,64 +907,78 @@
    __last_trade_progress_dict = {}
    __cancel_watch_index_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __add_watch_indexes(cls, code, indexes):
    def __add_watch_indexes(self, code, indexes):
        if not indexes:
            return
        if code not in cls.__cancel_watch_index_cache:
            cls.__cancel_watch_index_cache[code] = set()
        if code not in self.__cancel_watch_index_cache:
            self.__cancel_watch_index_cache[code] = set()
        for index in indexes:
            cls.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(cls.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
            self.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
    @classmethod
    def __del_watch_indexes(cls, code, indexes):
    def __del_watch_indexes(self, code, indexes):
        if not indexes:
            return
        for index in indexes:
            if code in cls.__cancel_watch_index_cache:
                cls.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(cls.__db, f"l_cancel_watch_index-{code}", index)
            if code in self.__cancel_watch_index_cache:
                self.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    @classmethod
    def __get_watch_indexes(cls, code):
        return RedisUtils.smembers(cls.__get_redis(), f"l_cancel_watch_index-{code}")
    def __get_watch_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}")
    @classmethod
    def __get_watch_indexes_cache(cls, code):
        cache_result = CodeDataCacheUtil.get_cache(cls.__cancel_watch_index_cache, code)
    def __get_watch_indexes_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = cls.__get_watch_indexes(code)
        cls.__cancel_watch_index_cache[code] = val
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val)
        val = self.__get_watch_indexes(code)
        self.__cancel_watch_index_cache[code] = val
        CodeDataCacheUtil.set_cache(self.__cancel_watch_index_cache, code, val)
        return val
    @classmethod
    def del_watch_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(cls.__db, f"l_cancel_watch_index-{code}")
    def del_watch_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
    @classmethod
    def clear(cls, code=None):
    def clear(self, code=None):
        if code:
            cls.del_watch_index(code)
            self.del_watch_index(code)
        else:
            keys = RedisUtils.keys(cls.__get_redis(), f"l_cancel_watch_index-*")
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            for k in keys:
                code = k.replace("l_cancel_watch_index-", "")
                cls.del_watch_index(code)
                self.del_watch_index(code)
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    @classmethod
    def set_trade_progress(cls, code, index, total_data):
        old_watch_indexes = cls.__get_watch_indexes_cache(code)
        if cls.__last_trade_progress_dict.get(code) == index and len(
    def set_trade_progress(self, code, index, total_data):
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if self.__last_trade_progress_dict.get(code) == index and len(
                old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
            # 成交进度尚未发生变化且已经监听到了足够的数据
            return
@@ -947,18 +1009,17 @@
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        cls.__add_watch_indexes(code, add_indexes)
        cls.__del_watch_indexes(code, delete_indexes)
        self.__add_watch_indexes(code, add_indexes)
        self.__del_watch_indexes(code, delete_indexes)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
                    is_first_code):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
        if time_space <= constant.S_CANCEL_EXPIRE_TIME or int(tool.get_now_time_str().replace(":", "")) > int("145000"):
            return False, None
        watch_indexes = cls.__get_watch_indexes_cache(code)
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
            return False, None
        watch_indexes = set([int(i) for i in watch_indexes])
@@ -994,13 +1055,11 @@
        return False, None
    @classmethod
    def place_order_success(cls, code):
        cls.clear(code)
    def place_order_success(self, code):
        self.clear(code)
    @classmethod
    def cancel_success(cls, code):
        cls.clear(code)
    def cancel_success(self, code):
        self.clear(code)
# --------------------------------封单额变化撤------------------------
@@ -1032,9 +1091,7 @@
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    def __get_l2_second_money_record(self, code, time):
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -3,20 +3,24 @@
"""
import time
from log_module.log import hx_logger_trade_debug
from log_module.log import hx_logger_trade_debug, logger_real_place_order_position
_place_order_info_dict = {}
# 下单
def place_order(code, price, volume, exec_index):
    logger_real_place_order_position.info("下单:code-{} price-{} volume-{} exec-index-{}", code, price, volume,
                                          exec_index)
    _place_order_info_dict[code] = (price, volume, exec_index, time.time())
# 获取下单信息
def get_order_info(code):
    info = _place_order_info_dict.get(code)
    logger_real_place_order_position.info("get_order_info:data-{}", info)
    if info and time.time() - info[3] > 3:
        logger_real_place_order_position.info("get_order_info 间隔3s以上:code-{}", code)
        # 间隔3s以上就无效了
        info = None
        _place_order_info_dict.pop(code)
@@ -43,5 +47,6 @@
            continue
        # 获取到了下单位置
        hx_logger_trade_debug.info(f"真实下单位置:{code}-{d['index']}")
        logger_real_place_order_position.info(f"真实下单位置:{code}-{d['index']}")
        return d["index"]
    return None
l2/l2_data_manager_new.py
@@ -214,6 +214,7 @@
    __l2PlaceOrderParamsManagerDict = {}
    __last_buy_single_dict = {}
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __latest_process_unique_keys = {}
    # 获取代码评分
    @classmethod
@@ -290,7 +291,7 @@
            place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
            if place_order_index:
                logger_l2_process.info("code:{} 获取到下单真实位置:{}", code, place_order_index)
                DCancelBigNumComputer.set_real_order_index(code, place_order_index)
                DCancelBigNumComputer().set_real_order_index(code, place_order_index)
            __start_time = round(t.time() * 1000)
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
@@ -431,6 +432,13 @@
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, is_first_code, new_add=True):
        # 增加推出机制
        unique_key = f"{start_index}-{end_index}"
        if cls.__latest_process_unique_keys.get(code) == unique_key:
            logger_l2_error.error(f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}")
            return
        cls.__latest_process_unique_keys[code] = unique_key
        # 计算安全笔数
        @dask.delayed
        def compute_safe_count():
@@ -475,13 +483,13 @@
            _start_time = round(t.time() * 1000)
            # S撤单计算,看秒级大单撤单
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      code_volumn_manager.get_volume_rate_index(
                                                                                          buy_volume_rate),
                                                                                      cls.volume_rate_info[code][1],
                                                                                      is_first_code)
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                        buy_exec_index, start_index,
                                                                                        end_index, total_data,
                                                                                        code_volumn_manager.get_volume_rate_index(
                                                                                            buy_volume_rate),
                                                                                        cls.volume_rate_info[code][1],
                                                                                        is_first_code)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
@@ -496,7 +504,7 @@
        def h_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_single_index,
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                    buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    local_today_num_operate_map.get(
@@ -518,7 +526,7 @@
        def l_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = LCancelBigNumComputer.need_cancel(code,
                b_need_cancel, b_cancel_data = LCancelBigNumComputer().need_cancel(code,
                                                                                 buy_exec_index, start_index,
                                                                                 end_index, total_data,
                                                                                 local_today_num_operate_map.get(
@@ -840,8 +848,8 @@
            trade_price = current_price_process_manager.get_trade_price(code)
            if trade_price is None:
                return False, True, f"尚未获取到当前成交价"
            if float(limit_up_price) - float(trade_price) > 0.02001:
                return False, False, f"当前成交价({trade_price})尚未在2档及以内"
            if float(limit_up_price) - float(trade_price) > 0.04001:
                return False, False, f"当前成交价({trade_price})尚未在4档及以内"
            # 判断成交进度是否距离我们的位置很近
            total_data = local_today_datas.get(code)
@@ -892,13 +900,13 @@
            # 之前没有涨停过
            # 统计买入信号位到当前位置没有撤的大单金额
            min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000
            left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer.compute_left_big_num(code,
                                                                                                  buy_single_index,
                                                                                                  buy_exec_index,
                                                                                                  total_data[-1][
                                                                                                      "index"],
                                                                                                  total_data,
                                                                                                  0, min_money_w)
            left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer().compute_left_big_num(code,
                                                                                                    buy_single_index,
                                                                                                    buy_exec_index,
                                                                                                    total_data[-1][
                                                                                                        "index"],
                                                                                                    total_data,
                                                                                                    0, min_money_w)
            if left_big_num > 0:
                # 重新获取分数与分数索引
                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
@@ -1150,13 +1158,13 @@
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, is_first_code,
                                                                                  cls.volume_rate_info[code][1],
                                                                                  cls.volume_rate_info[code][1],
                                                                                  True)
                need_cancel, cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                    compute_index,
                                                                                    buy_single_index, compute_index,
                                                                                    total_datas, is_first_code,
                                                                                    cls.volume_rate_info[code][1],
                                                                                    cls.volume_rate_info[code][1],
                                                                                    True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
                l2_log.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
@@ -1168,10 +1176,10 @@
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            else:
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, is_first_code,
                                                       cls.volume_rate_info[code][1],
                                                       cls.volume_rate_info[code][1], False)
                SecondCancelBigNumComputer().need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                         compute_index, total_datas, is_first_code,
                                                         cls.volume_rate_info[code][1],
                                                         cls.volume_rate_info[code][1], False)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
log_module/log.py
@@ -82,6 +82,10 @@
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_real_place_order_position"),
                   filter=lambda record: record["extra"].get("name") == "l2_real_place_order_position",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -232,6 +236,8 @@
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress")
logger_real_place_order_position = __mylogger.get_logger("l2_real_place_order_position")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
output/code_info_output.py
@@ -365,7 +365,7 @@
        # H撤监听范围
        if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
            hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code)
            # 根据日志读取实时的计算数据
            h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code)
            if hcancel_datas_dict:
server.py
@@ -390,7 +390,7 @@
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price,
                                                                        sell_one_price, sell_one_volumn)
                                                                          sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
@@ -425,15 +425,16 @@
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                        buy_progress_index,
                                                                                        l2.l2_data_util.local_today_datas.get(
                                                                                            code),
                                                                                        l2.l2_data_util.local_today_num_operate_map.get(
                                                                                            code))
                                            LCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                     l2.l2_data_util.local_today_datas.get(
                                                                                         code))
                                            HourCancelBigNumComputer().set_trade_progress(code, buy_time,
                                                                                          buy_exec_index,
                                                                                          buy_progress_index,
                                                                                          l2.l2_data_util.local_today_datas.get(
                                                                                              code),
                                                                                          l2.l2_data_util.local_today_num_operate_map.get(
                                                                                              code))
                                            LCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                                       l2.l2_data_util.local_today_datas.get(
                                                                                           code))
                                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                                           buy_progress_index,
test/l2_trade_test.py
@@ -32,7 +32,7 @@
    RedisUtils.realse(redis_l2)
    l2.l2_data_manager.TradePointManager().delete_buy_point(code)
    big_money_num_manager.reset(code)
    RedisUtils.delete( redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    RedisUtils.delete(redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    trade_data_manager.PlaceOrderCountManager().clear_place_order_count(code)
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
@@ -46,7 +46,6 @@
    BuyL2SafeCountManager().clear_data(code)
    transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
class VirtualTrade(unittest.TestCase):
@@ -76,12 +75,13 @@
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_,
                                                                              buy_queue_result_list, exec_time)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index,
                                                                                           buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
                                                                                               code),
                                                                                           l2.l2_data_util.local_today_num_operate_map.get(
                                                                                               code))
                        l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, time_,
                                                                                             buy_exec_index,
                                                                                             buy_progress_index,
                                                                                             l2.l2_data_util.local_today_datas.get(
                                                                                                 code),
                                                                                             l2.l2_data_util.local_today_num_operate_map.get(
                                                                                                 code))
                        log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
@@ -137,7 +137,8 @@
        RealTimeKplMarketData().set_top_5_reasons(jingxuan_ranks)
        RealTimeKplMarketData().set_top_5_industry(industry_ranks)
        LimitUpCodesPlateKeyManager().set_today_limit_up(KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
        LimitUpCodesPlateKeyManager().set_today_limit_up(
            KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
@@ -148,7 +149,8 @@
                time_s = tool.get_time_as_second(time_) - i - 1
                volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
                if volumn is not None:
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn), tool.time_seconds_format(time_s))
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn),
                                                                                    tool.time_seconds_format(time_s))
                    break
            # 设置委买队列
            for i in range(0, len(buy_queues)):
@@ -176,11 +178,11 @@
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
        buy_progress_index = 523
        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                           l2.l2_data_util.local_today_datas.get(
                                                                               code),
                                                                           l2.l2_data_util.local_today_num_operate_map.get(
                                                                               code))
        l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                             l2.l2_data_util.local_today_datas.get(
                                                                                 code),
                                                                             l2.l2_data_util.local_today_num_operate_map.get(
                                                                                 code))
# class TestTrade(unittest.TestCase):
third_data/block_info.py
@@ -204,12 +204,13 @@
            code_ = data[0]
            break_codes.add(code_)
    # 统计回封
    for data in latest_datas:
        if data[5] != target_block:
            continue
        # 回封
        if data[2] != data[3]:
            re_limit_codes.add(data[0])
    if latest_datas:
        for data in latest_datas:
            if data[5] != target_block:
                continue
            # 回封
            if data[2] != data[3]:
                re_limit_codes.add(data[0])
    # 排除自己
    break_codes.discard(code)
third_data/data_server.py
@@ -393,7 +393,7 @@
                trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code)
                    # 根据日志读取实时的计算数据
                    h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code)
                    if hcancel_datas_dict:
trade/huaxin/trade_server.py
@@ -211,7 +211,7 @@
                                            "time"]
                                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if buy_exec_index:
                                            need_cancel, msg = DCancelBigNumComputer.set_trade_progress(code,
                                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                                        buy_progress_index,
                                                                                                        buy_exec_index,
                                                                                                        total_datas,
@@ -222,12 +222,12 @@
                                            if need_cancel:
                                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                                        f1 = dask.delayed(HourCancelBigNumComputer.set_trade_progress)(code, buy_time,
                                                                                                       buy_exec_index,
                                                                                                       buy_progress_index,
                                                                                                       total_datas,
                                                                                                       num_operate_map)
                                        f2 = dask.delayed(LCancelBigNumComputer.set_trade_progress)(code,
                                        f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                                         buy_exec_index,
                                                                                                         buy_progress_index,
                                                                                                         total_datas,
                                                                                                         num_operate_map)
                                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                                    buy_progress_index,
                                                                                                    total_datas)
                                        f3 = dask.delayed(deal_big_money_manager.set_trade_progress)(code,
@@ -255,8 +255,9 @@
                            if limit_up_price is not None:
                                # 处理买1,卖1信息
                                code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, limit_up_price,
                                                                            sell_1_price, sell_1_volume // 100)
                                code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
                                                                              limit_up_price,
                                                                              sell_1_price, sell_1_volume // 100)
                                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                                # 如果涨幅大于8%就读取板块
                                if (buy_1_price - pre_close_price) / pre_close_price > 0.08:
trade/trade_result_manager.py
@@ -30,9 +30,9 @@
    # 安全笔数计算
    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code)
    dask.compute(f1, f2, f5, f6, f7, f8)
@@ -54,9 +54,9 @@
    @dask.delayed
    def h_cancel(code, buy_single_index, buy_exec_index):
        try:
            HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                         local_today_datas.get(code),
                                                         local_today_num_operate_map.get(code))
            HourCancelBigNumComputer().place_order_success(code, buy_single_index, buy_exec_index,
                                                           local_today_datas.get(code),
                                                           local_today_num_operate_map.get(code))
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)
@@ -64,7 +64,7 @@
    @dask.delayed
    def l_cancel(code):
        try:
            LCancelBigNumComputer.del_watch_index(code)
            LCancelBigNumComputer().del_watch_index(code)
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)
@@ -88,9 +88,9 @@
    # 取消买入标识
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code)
    dask.compute(f1, f2, f3, f6, f7, f8)
@@ -99,7 +99,7 @@
    code = "600246"
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer.cancel_success)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code)
    dask.compute(f2, f3, f6, f7, f8)