Administrator
2023-09-13 3b435873873d5cd498086d9f6afaef3af758a856
H撤重新定义
16个文件已修改
741 ■■■■ 已修改文件
constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 540 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -75,10 +75,11 @@
S_CANCEL_EXPIRE_TIME = 1
# H撤比例
H_CANCEL_FIRST_RATE = 0.69
H_CANCEL_RATE = 0.69
H_CANCEL_MIN_MONEY = 98
H_CANCEL_MIN_COUNT = 40
H_CANCEL_MIN_BIG_NUM_COUNT = 3
H_CANCEL_START_TIME = 900
# L2监控的最低金额
L2_MIN_MONEY = 500000
@@ -105,7 +106,8 @@
# D撤单
# 守护时间
D_CANCEL_EXPIRE_TIME = 120
D_CANCEL_RATE = 0.8
D_CANCEL_START_TIME = 2
D_CANCEL_RATE = 0.5
# L撤
# L撤下单之后多久开始守护
huaxin_client/l1_client.py
@@ -167,12 +167,14 @@
    api.RegisterSpi(spi)
    # 注册单个行情前置服务地址
    # api.RegisterFront("tcp://224.224.1.19:7880")
    # api.RegisterFront("tcp://210.14.72.16:9402")
    # 注册多个行情前置服务地址,用逗号隔开
    # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
    # 注册名字服务器地址,支持多服务地址逗号隔开
    # api.RegisterNameServer('tcp://224.224.3.19:7888')
    # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
    # -------------------------正式地址-------------------------------
    api.RegisterMulticast("udp://224.224.1.19:7880", None, "")
    # 启动接口
huaxin_client/l2_client.py
@@ -523,7 +523,7 @@
    global spi
    spi = Lev2MdSpi(api)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
        api.RegisterFront(Front_Address)
    else:
l2/cancel_buy_strategy.py
@@ -204,6 +204,10 @@
                    buy_volume_rate_index,
                    volume_rate_index,
                    need_cancel=True):
        if buy_single_index is None or buy_exec_index is None:
            return False, "尚未找到下单位置"
        # 只守护30s
        buy_exec_time = total_data[buy_exec_index]["val"]["time"]
        if tool.trade_time_sub(total_data[start_index]["val"]["time"],
@@ -381,12 +385,11 @@
    __tradeBuyQueue = TradeBuyQueue()
    __buyL2SafeCountManager = BuyL2SafeCountManager()
    __hCancelParamsManager = l2_trade_factor.HCancelParamsManager()
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
    # 缓存
    __cancel_watch_indexs_cache = {}
    __cancel_watch_indexs_exec_cache = {}
    __cancel_watch_canceled_indexs_cache = {}
    __cancel_traded_progress_cache = {}
    __cancel_compute_data_cache = {}
    __real_place_order_index_dict = {}
    __instance = None
@@ -410,54 +413,24 @@
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs_exec-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_watch_canceled_indexs-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_canceled_indexs_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_traded_progress-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code, val)
            keys = RedisUtils.keys(__redis, "h_cancel_compute_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 保存成交位置到执行位置的揽括范围数据
    def __save_watch_index_set(self, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
    def __save_watch_index_set(self, code, indexes):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, indexes)
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, finish)))
                               json.dumps(list(indexes)))
    # 保存成交进度
    def __get_watch_index_set(self, code):
        key = f"h_cancel_watch_indexs-{code}"
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, False
            return None
        val = json.loads(val)
        return val[0], val[1], val[2]
        return val
    def __get_watch_index_set_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code)
@@ -465,430 +438,117 @@
            return cache_result[1]
        return None, -1, False
    # 保存执行位置后面的守护数据
    def __save_watch_index_set_after_exec(self, code, datas, process_index, total_count, big_num_count, finished):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    def __get_watch_index_set_after_exec(self, code):
        key = f"h_cancel_watch_indexs_exec-{code}"
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return [], -1, 0, 0, False
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    def __get_watch_index_set_after_exec_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_exec_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return [], -1, 0, 0, False
    # 保存已经撤单的监听位置
    def __add_watch_canceled_indexes(self, code, indexes):
        if code not in self.__cancel_watch_canceled_indexs_cache:
            self.__cancel_watch_canceled_indexs_cache[code] = set()
        key = f"h_cancel_watch_canceled_indexs-{code}"
        for index in indexes:
            self.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, key, index)
        RedisUtils.expire_async(self.__db, key, tool.get_expire())
    def __get_watch_canceled_index(self, code):
        key = f"h_cancel_watch_canceled_indexs-{code}"
        return RedisUtils.smembers(self.__get_redis(), key)
    def __get_watch_canceled_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_canceled_indexs_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return set()
    def __del_watch_canceled_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.delete(self.__get_redis(), key)
    # 保存成交进度
    def __save_traded_progress(self, code, origin_process_index, latest_process_index):
        CodeDataCacheUtil.set_cache(self.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((origin_process_index, latest_process_index)))
    def __get_traded_progress(self, code):
        key = "h_cancel_traded_progress-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    def __get_traded_progress_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_traded_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None
    # 保存结算位置
    def __save_compute_data(self, code, process_index, cancel_num):
        CodeDataCacheUtil.set_cache(self.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    def __get_compute_data(self, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
    def __get_compute_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_compute_data_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1, 0
    def __del_compute_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.delete_async(self.__db, key)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_traded_progress_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_canceled_indexs_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_exec_cache, code)
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}",
              f"h_cancel_watch_canceled_indexs-{code}"]
        ks = [f"h_cancel_watch_indexs-{code}"]
        for key in ks:
            RedisUtils.delete_async(self.__db, key)
        # 计算观察索引,倒序计算
    def compute_watch_index(self, code):
        if self.__cancel_watch_indexs_cache.get(code):
            return
        real_place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
        if not real_place_order_index:
            l2_log.h_cancel_debug(code, "尚未找到真实下单位置")
            return
        total_datas = local_today_datas.get(code)
        # 计算结束位置
        total_num = 0
        # 获取m值数据
        thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
        end_index = real_place_order_index + 1
        for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
            # 看是否撤单
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i,
                                                                                                  total_datas,
                                                                                                  local_today_num_operate_map.get(
                                                                                                      code))
            if left_count > 0:
                total_num += left_count * val["num"]
                if total_num > thresh_hold_num:
                    end_index = i
                    break
        MIN_MONEYS = [300, 200, 100, 50]
        for min_money in MIN_MONEYS:
            watch_indexes = set()
            for i in range(real_place_order_index + 1, end_index + 1):
                # 看是否撤单
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                # 小金额过滤
                if float(val['price']) * val['num'] < min_money * 100:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i,
                                                                                                      total_datas,
                                                                                                      local_today_num_operate_map.get(
                                                                                                          code))
                if left_count > 0:
                    watch_indexes.add(i)
                    if len(watch_indexes) >= 5:
                        break
            if watch_indexes:
                self.__save_watch_index_set(code, watch_indexes)
                l2_log.l_cancel_debug(code, f"设置监听范围, 数据范围:{real_place_order_index}-{end_index} 监听范围-{watch_indexes}")
                break
        # 设置真实下单位置
    def set_real_place_order_index(self, code, index):
        self.__real_place_order_index_dict[code] = index
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    local_today_num_operate_map,
                    buy_volume_index, volume_index,
                    is_first_code):
        if buy_single_index is None or buy_exec_index is None:
            return False, "尚未找到下单位置"
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
        if time_space >= constant.H_CANCEL_START_TIME - 1:
            # 开始计算需要监控的单
            self.__compute_watch_indexs_after_single(code, buy_single_index, buy_exec_index, total_data,
                                                     local_today_num_operate_map, buy_volume_index)
            self.compute_watch_index(code)
        # 守护30s以外的数据
        if time_space <= constant.S_CANCEL_EXPIRE_TIME:
        if time_space <= constant.H_CANCEL_START_TIME:
            return False, None
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取成交进度
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        if latest_progress_index is None:
            latest_progress_index = -1
        # 监听的数据
        watch_indexs_dict = {}
        # 初始化为1防止分母为0
        total_nums = 1
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        watch_index_set = self.__get_watch_index_set_cache(code)
        if watch_index_set:
            cancel_num = 0
            total_num = 0
            for index in watch_index_set:
                data = total_data[index]
                val = data['val']
                total_num += val['num'] * data['re']
                # 判断是否撤单
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, index,
                                                                                                      total_data,
                                                                                                      local_today_num_operate_map)
        processed_index, cancel_num = self.__get_compute_data_cache(code)
        # 获取下单次数
        cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
        has_watch_canceled = False
        # 获取之前已经撤单的数据
        old_canceld_indexs = self.__get_watch_canceled_index_cache(code)
        # 重新计算撤单
        cancel_num = 0
        if old_canceld_indexs:
            for d in old_canceld_indexs:
                nx = int(d.split("-")[0])
                num = int(d.split("-")[1])
                if nx < latest_progress_index:
                    continue
                cancel_num += total_data[nx]["re"] * num
        try:
            temp_watch_canceled_index = set()
            for i in range(start_index, end_index + 1):
                if i <= processed_index:
                    # 已经处理过了
                    continue
                process_index = i
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    l2_log.cancel_debug(code, "查询买入位置开始:{}", i)
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map)
                    l2_log.cancel_debug(code, "查询买入位置结束:{}", i)
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        has_watch_canceled = True
                        cancel_num += data["re"] * val["num"]
                        # 加入
                        temp_watch_canceled_index.add(f"{buy_index}-{val['num']}")
                        rate__ = round(cancel_num / total_nums, 4)
                        if rate__ > cancel_rate_threshold:
                            indexs__ = list(watch_indexs_dict.keys())
                            indexs__.sort()
                            l2_log.trade_record(code, "H撤范围", "'start_index':{},'end_index':{}, 'count':{}",
                                                indexs__[0], indexs__[-1],
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            return True, data
            self.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
                indexs__ = list(watch_indexs_dict.keys())
                indexs__.sort()
                l2_log.trade_record(code, "H撤范围", "'start_index':{},'end_index':{}, 'count':{}",
                                    indexs__[0], indexs__[-1],
                                    len(watch_indexs_dict.keys()))
                l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                    cancel_rate_threshold)
                return True, data
        finally:
            l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{} 目标撤单比例:{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums, cancel_rate_threshold)
            l2_log.h_cancel_debug(code,
                                  f"H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # H撤已撤订单
            l2_log.h_cancel_debug(code, f"H撤已撤订单:{self.__get_watch_canceled_index_cache(code)}")
            # 保存处理进度与数据
            self.__save_compute_data(code, process_index, cancel_num)
            # 有观测数据撤单
            if has_watch_canceled:
                now_rate = round(cancel_num / total_nums, 4)
                if now_rate < cancel_rate_threshold and cancel_rate_threshold - now_rate < 0.1:
                    # 距离撤单在5%以内
                    kp_client_msg_manager.add_msg(code,
                                                  f"逼近H撤({now_rate * 100}%/{cancel_rate_threshold * 100}%),需要人工判别。")
                cancel_num += val['num'] * (data['re'] - left_count)
            if cancel_num / total_num >= constant.H_CANCEL_RATE:
                return True, total_data[-1]
        return False, None
    # 下单成功
    def place_order_success(self, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        self.__clear_data(code)
    # 设置成交进度
    def set_trade_progress(self, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        self.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time,
                                                                               total_data[buy_exec_index]["val"][
                                                                                   "time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = self.__get_traded_progress_cache(code)
        if origin_index is None:
            self.__save_traded_progress(code, index, index)
            # 计算揽括范围
            self.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                            local_today_num_operate_map)
        else:
            self.__save_traded_progress(code, origin_index, index)
        l2_log.h_cancel_debug(code, f"成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
    # 涨停买是否撤单
    def __get_limit_up_buy_no_canceled_count(self, code, index, total_data, local_today_num_operate_map,
                                             MAX_EXPIRE_CANCEL_TIME=None):
        data = None
        try:
            data = total_data[index]
        except:
            print("")
        val = data["val"]
        if L2DataUtil.is_limit_up_price_buy(val):
            # 判断当前买是否已经买撤
            cancel_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(val["num"], "1", val["price"]))
            canceled = False
            if cancel_datas:
                for cancel_data in cancel_datas:
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        if MAX_EXPIRE_CANCEL_TIME and tool.trade_time_sub(cancel_data["val"]["time"],
                                                                          MAX_EXPIRE_CANCEL_TIME) > 0:
                            continue
                        canceled = True
                        count = data["re"] - cancel_data["re"]
                        if count > 0:
                            return count
                        cancel_index = cancel_data["index"]
                        break
            if not canceled:
                count = data["re"]
                return count
        return 0
        # 计算排名前N的大单
    # 过时数据
    def __compute_top_n_num(self, code, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                         local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
        watch_list = list(watch_set)
        watch_list.sort(key=lambda tup: tup[1])
        watch_list.reverse()
        watch_list = watch_list[:count]
        watch_set = set(watch_list)
        return watch_set
    # 从成交位置到执行位置
    def __compute_watch_indexs_between_traded_exec(self, code, progress_index, buy_exec_index, total_data,
                                                   local_today_num_operate_map):
        total_count = 0
        watch_set = set()
        big_num_count = 0
        for i in range(progress_index, buy_exec_index):
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                if val["num"] * float(val["price"]) <= constant.H_CANCEL_MIN_MONEY * 100:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        l2_log.h_cancel_debug(code, f"H撤监控成交位到执行位:{final_watch_list}")
        self.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 计算执行位置之后的需要监听的数据
    def __compute_watch_indexs_after_single(self, code, buy_single_index, buy_exec_index, total_data,
                                            local_today_num_operate_map, buy_volume_index):
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = self.__get_watch_index_set_after_exec_cache(
            code)
        if watch_list and finish:
            # 已经计算完了不需要再进行计算
            return
        watch_set = set()
        if watch_list:
            for data in watch_list:
                watch_set.add((data[0], data[1], data[2]))
        # 暂时不需要使用
        process_index = process_index_old
        finished = False
        big_num_count = big_num_count_old
        total_count = total_count_old
        # H撤单
        MIN_H_COUNT = self.__hCancelParamsManager.get_max_watch_count(buy_volume_index)
        # 从买入信号位3条数据开始计算
        for i in range(buy_single_index + 3, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
            left_count = self.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                   local_today_num_operate_map,
                                                                   tool.trade_time_add_second(
                                                                       total_data[buy_exec_index]["val"]["time"],
                                                                       constant.S_CANCEL_EXPIRE_TIME))
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                if val["num"] * float(val["price"]) <= constant.H_CANCEL_MIN_MONEY * 100:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
                # 判断是否达到阈值
                if total_count >= MIN_H_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                    if len(total_data) <= i + 1 or (len(total_data) > i + 1 and total_data[i + 1]["val"]["time"] !=
                                                    total_data[buy_exec_index]["val"]["time"]):
                        # 至少囊括执行位本秒的数据
                        finished = True
                        l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{},目标计算数量:{}", json.dumps(list(watch_set)),
                                            total_data[-1]["index"], MIN_H_COUNT)
                        break
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        l2_log.h_cancel_debug(code, f"H撤监控执行位相邻单:{final_watch_list} 目标计算数量:{MIN_H_COUNT}")
        # 保存计算范围
        self.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                               finished)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 获取H撤监听的数据索引范围
    # 返回监听范围与已撤单索引
    def get_watch_index_dict(self, code):
        origin_progress_index, latest_progress_index = self.__get_traded_progress_cache(code)
        # 监听的数据
        watch_indexs_dict = {}
        total_nums = 0
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = self.__get_watch_index_set_cache(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = self.__get_watch_index_set_after_exec_cache(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
        return watch_indexs_dict, self.__get_watch_canceled_index_cache(code)
        return {}, set()
# ---------------------------------D撤-------------------------------
@@ -957,8 +617,8 @@
    def set_trade_progress(self, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
                           limit_up_price):
        # 离下单执行位2分钟内的有效
        if tool.trade_time_sub(total_data[-1]['val']['time'],
                               total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
        sub_time = tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time'])
        if sub_time > constant.D_CANCEL_EXPIRE_TIME or sub_time < constant.D_CANCEL_START_TIME:
            return False, "超过D撤守护时间"
        real_order_index = self.__get_real_order_index_cache(code)
@@ -1136,7 +796,7 @@
        try:
            # 已经有计算的无法触发计算
            old_watch_indexes = self.__get_watch_indexes_cache(code)
            if old_watch_indexes and  self.__last_trade_progress_dict.get(code):
            if old_watch_indexes and self.__last_trade_progress_dict.get(code):
                return
        finally:
            self.__last_trade_progress_dict[code] = index
@@ -1193,6 +853,8 @@
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, _local_today_num_operate_map,
                    is_first_code):
        if buy_exec_index is None:
            return False, "尚未找到下单位置"
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
l2/code_price_manager.py
@@ -165,7 +165,7 @@
            pre_close_price = round(float(limit_up_price) / 1.1, 2)
            if (float(buy_1_price) - pre_close_price) / pre_close_price < 0.05:
                # 涨幅小于5%
                l2_trade_util.add_to_forbidden_trade_codes(code)
                l2_trade_util.forbidden_trade(code, "涨停炸开后现价涨幅小于5%之后不买")
                logger_trade_queue_price_info.add(f"涨停炸开后现价涨幅小于5%之后不买:code-{code}")
            # 之前涨停过且现在尚未涨停
            self.set_open_limit_up_lowest_price(code, buy_1_price)
l2/l2_data_manager_new.py
@@ -262,7 +262,7 @@
                if not is_normal:
                    print("历史数据异常:", code)
                    # 数据不正常需要禁止交易
                    l2_trade_util.forbidden_trade(code)
                    l2_trade_util.forbidden_trade(code, msg="L2历史数据异常")
                # 纠正数据
                if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
                    # 同花顺需要纠正数据,其他渠道不需要
@@ -303,7 +303,7 @@
            if not is_normal:
                print("历史数据异常:", code)
                # 数据不正常需要禁止交易
                l2_trade_util.forbidden_trade(code)
                l2_trade_util.forbidden_trade(code, msg="L2历史数据异常")
            origin_start_time = round(t.time() * 1000)
            # 转换数据格式
            _start_index = 0
@@ -328,10 +328,8 @@
                #     f.write(output.getvalue())
            # lp.dump_stats(f"/home/logs/profile/{code}_{round(t.time() * 1000)}.txt")
        except Exception as e:
            print("huaxin L2数据处理异常", code, str(e))
            logging.exception(e)
            logger_l2_error.exception(f"code:{code}")
            logger_l2_error.exception(e)
            async_log_util.error(logger_l2_error,f"code:{code}")
            async_log_util.exception(logger_l2_error, e)
        finally:
            # l2_data_log.l2_time(code, round(t.time() * 1000) - origin_start_time,
            #                     "l2数据处理总耗时",
@@ -357,14 +355,14 @@
                        cls.set_real_place_order_index(code, place_order_index, buy_single_index)
                        async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
                except:
                    logger_l2_error.exception(f"{code} 处理真实下单位置出错")
                    async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错")
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
                if global_util.cuurent_prices.get(code):
                    price_data = global_util.cuurent_prices.get(code)
                    if price_data[1]:
                        # 当前涨停价,设置涨停时间
                        logger_l2_process.info("开盘涨停:{}", code)
                        async_log_util.info(logger_l2_process, "开盘涨停:{}", code)
                        # 保存涨停时间
                        cls.__LimitUpTimeManager.save_limit_up_time(code, "09:30:00")
@@ -464,7 +462,7 @@
        # 增加推出机制
        unique_key = f"{start_index}-{end_index}"
        if cls.__latest_process_order_unique_keys.get(code) == unique_key:
            logger_l2_error.error(f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}")
            async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{start_index} end_index-{end_index}")
            return
        cls.__latest_process_order_unique_keys[code] = unique_key
@@ -486,8 +484,8 @@
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
                logger_l2_error.error(f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                logger_l2_error.exception(e)
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                #                     "已下单-s级大单估算")
@@ -510,9 +508,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
                logger_l2_error.error(f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                logger_l2_error.exception(e)
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
                pass
@@ -530,9 +527,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "L撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
                logger_l2_error.error(f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                logger_l2_error.exception(e)
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算")
                pass
@@ -617,7 +613,7 @@
                                    buy_single_index, buy_exec_index, cls.volume_rate_info[code][0],
                                    params_desc)
            except Exception as e:
                logger_l2_error.exception(e)
                async_log_util.exception(logger_l2_error, e)
                l2_log.debug(code, "执行买入异常:{}", str(e))
                pass
            finally:
@@ -906,7 +902,7 @@
        # 判断板块
        can_buy_result = CodePlateKeyBuyManager.can_buy(code)
        if can_buy_result is None:
            logger_debug.warning("没有获取到板块缓存,将获取板块")
            async_log_util.warning(logger_debug, "没有获取到板块缓存,将获取板块")
            yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
            CodePlateKeyBuyManager.update_can_buy_blocks(code,
                                                         kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
@@ -980,7 +976,7 @@
        unique_key = f"{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys.get(code) == unique_key:
            logger_l2_error.error(f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            return
        cls.__latest_process_not_order_unique_keys[code] = unique_key
l2/l2_data_util.py
@@ -159,7 +159,7 @@
            local_latest_datas[code] = datas
            set_l2_data_latest_count(code, len(datas))
        try:
            async_log_util.info(log.logger_l2_data, f"{code}-{add_datas}")
            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
        except Exception as e:
            logging.exception(e)
        # 暂时不将数据保存到redis
log_module/async_log_util.py
@@ -7,6 +7,47 @@
from log_module.log import logger_debug
from utils import tool
class AsyncLogManager:
    __log_queue = queue.Queue()
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
    def info(self, logger, *args):
        self.__add_log(logger, "info", *args)
    def warning(self, logger, *args):
        self.__add_log(logger, "warning", *args)
    def error(self, logger, *args):
        self.__add_log(logger, "error", *args)
    def exception(self, logger, *args):
        self.__add_log(logger, "exception", *args)
    # 运行同步日志
    def run_sync(self):
        while True:
            try:
                val = self.__log_queue.get()
                time_s = val[1]
                cmd = val[2]
                method = getattr(val[0], cmd)
                d = list(val[3])
                d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:3]}] " + d[0]
                d = tuple(d)
                method(*d)
            except:
                pass
l2_data_log = AsyncLogManager()
log_queue = queue.Queue()
main.py
@@ -34,7 +34,8 @@
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), daemon=True)
    t1 = threading.Thread(target=trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
@@ -42,18 +43,18 @@
    t1.start()
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback),
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    t1 = threading.Thread(target=huaxin_client.trade_client.run, name="trade_client",
                          args=(trade_server.my_trade_response, ptl2_trade, pst_trade),
                          daemon=True)
    t1 = threading.Thread(
        target=lambda: trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd),
        name="trade_server", daemon=True)
    t1.start()
    # 交易服务
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd)
    huaxin_client.trade_client.run(trade_server.my_trade_response, ptl2_trade, pst_trade)
# 主服务
@@ -106,7 +107,8 @@
        l1Process.start()
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, pst_trade)
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade,
                          pst_trade)
        # 将tradeServer作为主进程
        l1Process.join()
server.py
@@ -371,7 +371,8 @@
                    msg = ""
                    try:
                        if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.FirstGPCodesManager().is_in_first_gp_codes_cache(code):
                        if not gpcode_manager.is_in_gp_pool(
                                code) and not gpcode_manager.FirstGPCodesManager().is_in_first_gp_codes_cache(code):
                            # 没在目标代码中且没有在首板今日历史代码中
                            raise Exception("代码没在监听中")
@@ -424,13 +425,6 @@
                                                                                                     buy_queue_result_list,
                                                                                                     exec_time)
                                        if buy_progress_index is not None:
                                            HourCancelBigNumComputer().set_trade_progress(code, buy_time,
                                                                                          buy_exec_index,
                                                                                          buy_progress_index,
                                                                                          l2.l2_data_util.local_today_datas.get(
                                                                                              code),
                                                                                          l2.l2_data_util.local_today_num_operate_map.get(
                                                                                              code))
                                            LCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                                       l2.l2_data_util.local_today_datas.get(
                                                                                           code))
@@ -439,11 +433,12 @@
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                            # 计算大单成交额
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress(code, buy_progress_index,
                                                                                      l2.l2_data_util.local_today_datas.get(
                                                                                          code),
                                                                                      l2.l2_data_util.local_today_num_operate_map.get(
                                                                                          code))
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress(code,
                                                                                                                   buy_progress_index,
                                                                                                                   l2.l2_data_util.local_today_datas.get(
                                                                                                                       code),
                                                                                                                   l2.l2_data_util.local_today_num_operate_map.get(
                                                                                                                       code))
                                        else:
                                            raise Exception("暂未获取到交易进度")
@@ -651,7 +646,7 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    for code in codes:
                        l2_trade_util.forbidden_trade(code)
                        l2_trade_util.forbidden_trade(code, msg="手动加入")
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
test/l2_trade_test.py
@@ -78,13 +78,6 @@
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_,
                                                                              buy_queue_result_list, exec_time)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, time_,
                                                                                             buy_exec_index,
                                                                                             buy_progress_index,
                                                                                             l2.l2_data_util.local_today_datas.get(
                                                                                                 code),
                                                                                             l2.l2_data_util.local_today_num_operate_map.get(
                                                                                                 code))
                        log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
@@ -188,13 +181,6 @@
        total_datas = total_datas[:899]
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
        buy_progress_index = 523
        l2.cancel_buy_strategy.HourCancelBigNumComputer().set_trade_progress(code, buy_progress_index,
                                                                             l2.l2_data_util.local_today_datas.get(
                                                                                 code),
                                                                             l2.l2_data_util.local_today_num_operate_map.get(
                                                                                 code))
# class TestTrade(unittest.TestCase):
third_data/code_plate_key_manager.py
@@ -425,7 +425,7 @@
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_manager.TRADE_STATE_NOT_TRADE:
            # 只要下过单的就不移除
            return
        l2_trade_util.forbidden_trade(code)
        l2_trade_util.forbidden_trade(code, msg=msg)
        logger_kpl_block_can_buy.info(msg)
    @classmethod
trade/huaxin/trade_server.py
@@ -345,11 +345,6 @@
                            if need_cancel:
                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                        f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                         buy_exec_index,
                                                                                         buy_progress_index,
                                                                                         total_datas,
                                                                                         num_operate_map)
                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                      buy_progress_index,
                                                                                      total_datas)
@@ -365,7 +360,7 @@
                            code,
                            buy_progress_index)
                        dask.compute(f1, f2, f3, f4)
                        dask.compute(f2, f3, f4)
                    else:
                        pass
            except Exception as e:
@@ -582,7 +577,7 @@
                    fresult = {"code": 0, "data": datas}
            elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
                if operate == outside_api_command_manager.OPERRATE_SET:
                    l2_trade_util.forbidden_trade(code)
                    l2_trade_util.forbidden_trade(code, msg="手动加入 trade_server")
                    name = gpcode_manager.get_code_name(code)
                    if not name:
                        results = HistoryKDatasUtils.get_gp_codes_names([code])
@@ -801,7 +796,7 @@
            order_status = data["orderStatus"]
            huaxin_trade_record_manager.DelegateRecordManager.add([data])
            if huaxin_util.is_deal(order_status):
                l2_trade_util.forbidden_trade(data["securityID"])
                l2_trade_util.forbidden_trade(data["securityID"],msg="已成交")
                # 成交,更新成交列表与资金列表
                huaxin_trade_data_update.add_deal_list()
                huaxin_trade_data_update.add_money_list()
@@ -842,6 +837,9 @@
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
trade/l2_trade_factor.py
@@ -19,7 +19,7 @@
        continue_count = self.get_begin_continue_buy_count()
        time_range = self.get_time_range()
        m = self.get_m_val()[0]
        m = m//10000
        m = m // 10000
        desc = ""
        if self.buy_rank == 0:
@@ -189,24 +189,18 @@
            volume_rate_index = -1
        return rates[volume_rate_index]
    # 获取m值
    def get_m_val(self):
        # 获取固定m值
        zyltgb = global_util.zyltgb_map.get(self.code)
    @classmethod
    def get_base_m_val(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(self.code)
        # if self.is_first_code:
        #     if self.buy_rank == 0:
        #         return 0, ""
        #     elif self.is_want_buy and zyltgb and zyltgb < 20 * 10000 * 10000:
        #         # 小于20亿的想买单
        #         return 500 * 10000, ""
        #     elif self.buy_rank < 4:
        #         return 1000 * 10000, ""
            zyltgb = global_util.zyltgb_map.get(code)
        base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb)
        return base_m
    # 获取m值
    def get_m_val(self):
        base_m = self.get_base_m_val(self.code)
        rate = self.get_m_val_rate(self.volume_rate_index)
        m = round(base_m * (1 + rate))
        return m, ""
@@ -237,7 +231,7 @@
        rates = [0.34, 0.44, 0.54, 0.64, 0.74, 0.84, 0.94, 1.04]
        if volume_rate_index >= len(rates):
            volume_rate_index = -1
        return 0.59 #rates[volume_rate_index]
        return 0.59  # rates[volume_rate_index]
# H撤参数
@@ -271,7 +265,7 @@
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        return int(-0.058*yi*yi + 60.9*yi)*10000
        return int(-0.058 * yi * yi + 60.9 * yi) * 10000
        # m = 5000000 + (yi - 1) * 500000
        # return round(m * (1 - 0.3))
trade/l2_trade_util.py
@@ -1,6 +1,8 @@
# 是否在禁止交易代码中
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_trade
from utils import tool
__redis_manager = redis_manager.RedisManager(2)
@@ -125,8 +127,9 @@
# 禁止代码交易
def forbidden_trade(code):
def forbidden_trade(code, msg=None):
    add_to_forbidden_trade_codes(code)
    async_log_util.warning(logger_trade, f"加入黑名单原因:{msg}")
    # l2_data_manager.remove_from_l2_fixed_codes(code)
    # l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
trade/trade_manager.py
@@ -582,7 +582,7 @@
            continue
        # 买入成功
        if code is not None and int(data["type"]) == 0:
            l2_trade_util.forbidden_trade(code)
            l2_trade_util.forbidden_trade(code, msg="交易成功")
            state = CodesTradeStateManager().get_trade_state_cache(code)
            if state != TRADE_STATE_BUY_SUCCESS:
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_SUCCESS)