Administrator
2023-09-15 1ff185866bcf0796d2367699bc000abb326360d5
交易设置独立进程/删除trade_client_server/记录L2逐笔委托日志
1个文件已删除
12个文件已修改
234 ■■■■■ 已修改文件
huaxin_client/command_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client_server.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_log.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -65,7 +65,7 @@
class TradeCommandManager:
    trade_client_dict = {}
    _instance = None
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
@@ -130,9 +130,9 @@
                    val = pipe_strategy.recv()
                    if val:
                        val = json.loads(val)
                        # print("run_process_command", val)
                        _type = val["type"]
                        threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start()
                        cls.process_command_thread_pool.submit(lambda:  cls.process_command(_type, None, val))
                except Exception as e:
                    logger_local_huaxin_trade_debug.exception(e)
                    logging.exception(e)
huaxin_client/l2_client.py
@@ -397,7 +397,6 @@
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        print("逐笔委托", item)
        l2_data_manager.add_l2_order_detail(item)
        # logger_local_huaxin_l2_orderdetail.info(
huaxin_client/l2_data_manager.py
@@ -15,7 +15,7 @@
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export, async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system
    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
from utils import tool
order_detail_upload_active_time_dict = {}
@@ -59,12 +59,10 @@
# 添加委托详情
def add_l2_order_detail(data, istransaction=False):
    code = data["SecurityID"]
    # 异步日志记录
    async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, data)
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    if istransaction:
        pass
    else:
        pass
    # 原来的格式
    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
    #                 "Volume": pOrderDetail['Volume'],
@@ -78,7 +76,7 @@
            buy_order_nos_dict[data['SecurityID']] = set()
        buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
        # 买入订单号需要记录日志
        log_buy_no_queue.put_nowait((data['SecurityID'], data['OrderNO']))
        async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
@@ -279,13 +277,7 @@
def __run_log():
    print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            temp = log_buy_no_queue.get()
            if temp:
                logger_local_huaxin_l2_buy_no.info(f"{temp[0]}#{temp[1]}")
        except:
            pass
    async_log_util.huaxin_l2_log.run_sync()
__upload_order_threads = {}
huaxin_client/trade_client.py
@@ -3,10 +3,9 @@
import json
import logging
import os
import threading
import time
from huaxin_client import command_manager, trade_client_server
from huaxin_client import command_manager
from huaxin_client import constant
from huaxin_client import socket_util
import traderapi
@@ -161,7 +160,7 @@
        return
    # 撤买
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None):
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None):
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        async_log_util.info(logger_local_huaxin_trade_debug,
@@ -189,6 +188,8 @@
            req_field.OrderRef = order_ref
            req_field.SessionID = self.__session_id
            req_field.FrontID = self.__front_id
        if order_action_ref:
            req_field.OrderActionRef = order_action_ref
        # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填
@@ -539,7 +540,8 @@
                         nRequestID: "int") -> "void":
        try:
            if pRspInfoField.ErrorID == 0:
                async_log_util.info(logger_local_huaxin_trade_debug, '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID))
                async_log_util.info(logger_local_huaxin_trade_debug,
                                    '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID))
            else:
                async_log_util.error(logger_local_huaxin_trade_debug,
                                     f"OnRspOrderInsert 报单出错:{pRspInfoField.ErrorID}-{pRspInfoField.ErrorMsg}")
@@ -866,6 +868,7 @@
            code = data["code"]
            orderSysID = data.get("orderSysID")
            orderRef = data.get("orderRef")
            orderActionRef = data.get("orderActionRef")
            sinfo = data["sinfo"]
            if direction == 1:
                # 撤买
@@ -875,7 +878,7 @@
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.trade_thread_pool.submit(
                        lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID,
                                                                 order_ref=orderRef))
                                                                 order_ref=orderRef, order_action_ref=orderActionRef))
                    async_log_util.info(logger_local_huaxin_trade_debug,
                                        f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                except Exception as e:
@@ -1030,21 +1033,27 @@
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
            #                 "request_id": request_id}), type, client_id, request_id, temp_params[2])
            trade_response.OnTradeResponse(
                {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                 "request_id": request_id})
            if trade_response:
                trade_response.OnTradeResponse(
                    {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                     "request_id": request_id})
            else:
                send_response(
                    json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                                "request_id": request_id}), type, client_id, request_id, temp_params[2])
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id)
        else:
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            if trade_response:
                trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
            # send_response(
            #     json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
            #     type,
            #     None,
            #     req_id)
            else:
                send_response(
                    json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                    type,
                    None,
                    req_id)
    except Exception as e:
        logging.exception(e)
@@ -1076,7 +1085,7 @@
        time.sleep(2)
def run(trade_response_: TradeResponse, pipe_l2=None, pipe_strategy=None):
def run(trade_response_: TradeResponse=None, pipe_l2=None, pipe_strategy=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1089,9 +1098,6 @@
        global trade_response
        trade_response = trade_response_
        t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True)
        t1.start()
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
huaxin_client/trade_client_server.py
File was deleted
l2/l2_data_log.py
@@ -1,7 +1,7 @@
# l2数据的日志
import time
from log_module import log
from log_module import log, async_log_util
from l2 import l2_log
@@ -9,11 +9,17 @@
    timestamp = int(time.time() * 1000)
    # 只记录耗时较长的信息
    if time_ > 1 or force:
        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, description, code, time_,
                                        "\n" if new_line else "")
        async_log_util.info(log.logger_l2_process_time, "{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp,
                            description, code, time_,
                            "\n" if new_line else "")
    return timestamp
def l2_time_log(code, description):
    async_log_util.info(log.logger_l2_process_time, "{}-{}: {}", l2_log.threadIds.get(code), code,
                        description)
class TradeLog:
    def __init__(self, thread_id):
l2/l2_data_manager_new.py
@@ -18,7 +18,7 @@
from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin
from l2 import safe_count_manager, l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress, cancel_buy_strategy
    transaction_progress, cancel_buy_strategy, l2_data_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \
    LCancelBigNumComputer
from l2.l2_data_manager import L2DataException
@@ -294,47 +294,32 @@
    # 处理华鑫L2数据
    @classmethod
    def process_huaxin(cls, code, origin_datas):
        print("process_huaxin", code, len(origin_datas))
        datas = None
        origin_start_time = round(t.time() * 1000)
        try:
            l2_data_log.l2_time_log(code, "开始加载历史数据")
            # 加载历史的L2数据
            is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False)
            if not is_normal:
                print("历史数据异常:", code)
                # 数据不正常需要禁止交易
                l2_trade_util.forbidden_trade(code, msg="L2历史数据异常")
            origin_start_time = round(t.time() * 1000)
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            l2_data_log.l2_time_log(code, "开始格式化原始数据")
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            __start_time = round(t.time() * 1000)
            l2_data_log.l2_time_log(code, "开始处理数据")
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
            else:
                pass
                # lp = LineProfiler()
                # lp.enable()
                # lp_wrap = lp(cls.process_add_datas)
                # lp_wrap(code, datas, 0, __start_time)
                # output = io.StringIO()
                # lp.print_stats(stream=output)
                # lp.disable()
                # with open(f"/home/logs/profile/{code}_{datas[0]['index']}_{datas[-1]['index']}.txt", 'w') as f:
                #     f.write(output.getvalue())
            # lp.dump_stats(f"/home/logs/profile/{code}_{round(t.time() * 1000)}.txt")
        except Exception as e:
            async_log_util.error(logger_l2_error,f"code:{code}")
            async_log_util.error(logger_l2_error, f"code:{code}")
            async_log_util.exception(logger_l2_error, e)
        finally:
            # l2_data_log.l2_time(code, round(t.time() * 1000) - origin_start_time,
            #                     "l2数据处理总耗时",
            #                     True)
            if datas:
                l2_data_log.l2_time_log(code, "开始保存数据")
                l2.l2_data_util.save_l2_data(code, None, datas)
    @classmethod
@@ -402,21 +387,7 @@
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    if True:  # len(add_datas) < 10:
                        cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
                    else:
                        pass
                        # lp = LineProfiler()
                        # lp.enable()
                        # lp_wrap = lp(cls.__process_order)
                        # lp_wrap(code, start_index, end_index, capture_timestamp, is_first_code)
                        # output = io.StringIO()
                        # lp.print_stats(stream=output)
                        # lp.disable()
                        # with open(
                        #         f"/home/logs/profile/{code}_process_order_{add_datas[0]['index']}_{add_datas[-1]['index']}.txt",
                        #         'w') as f:
                        #     f.write(output.getvalue())
                    cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
                else:
                    # 未挂单,时间相差不大才能挂单
                    if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time):
@@ -426,8 +397,6 @@
                                add_datas[0]["index"],
                                add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                capture_timestamp)
            # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
            #                                    "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -484,7 +453,8 @@
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
@@ -508,7 +478,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
@@ -527,7 +498,8 @@
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "L撤销比例触发阈值"
            except Exception as e:
                async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.error(logger_l2_error,
                                     f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算")
@@ -975,7 +947,8 @@
        unique_key = f"{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys.get(code) == unique_key:
            async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            async_log_util.error(logger_l2_error,
                                 f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            return
        cls.__latest_process_not_order_unique_keys[code] = unique_key
@@ -1147,11 +1120,11 @@
    @classmethod
    def __get_threshmoney(cls, code):
        m,msg = cls.__l2PlaceOrderParamsManagerDict[code].get_m_val()
        m, msg = cls.__l2PlaceOrderParamsManagerDict[code].get_m_val()
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) == trade_manager.TRADE_STATE_NOT_TRADE:
            # 首次下单m值扩大1.5倍
            m = int(m * 1.5)
        return m,msg
        return m, msg
    # 计算万手哥笔数
    @classmethod
l2/l2_data_util.py
@@ -19,6 +19,7 @@
from db import redis_manager_delegate as redis_manager
from utils import tool
__db = 1
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
@@ -148,11 +149,11 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
    if add_datas:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(),
            RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(),
                                   json.dumps(datas))
            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
@@ -162,8 +163,6 @@
            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
        except Exception as e:
            logging.exception(e)
        # 暂时不将数据保存到redis
        # saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
log_module/async_log_util.py
@@ -49,6 +49,9 @@
l2_data_log = AsyncLogManager()
huaxin_l2_log = AsyncLogManager()
log_queue = queue.Queue()
log_module/log_export.py
@@ -316,6 +316,8 @@
            for line in lines:
                if line:
                    data = line.split(" - ")[1].strip()
                    if data.startswith("["):
                        data = data[data.find("]") + 1:].strip()
                    code = data.split("#")[0]
                    buy_no = int(data.split("#")[1])
                    if code not in fdatas:
main.py
@@ -49,12 +49,7 @@
    t1.start()
    #
    # 启动华鑫交易服务
    t1 = threading.Thread(
        target=lambda: trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd),
        name="trade_server", daemon=True)
    t1.start()
    huaxin_client.trade_client.run(trade_server.my_trade_response, ptl2_trade, pst_trade)
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd)
# 主服务
@@ -107,11 +102,17 @@
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade))
        tradeProcess.start()
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade,
                          pst_trade)
        # 将tradeServer作为主进程
        l1Process.join()
        tradeProcess.join()
    except Exception as e:
        logger_system.exception(e)
trade/huaxin/huaxin_trade_api.py
@@ -80,7 +80,7 @@
# pipe的交易通道是否正常
def is_pipe_channel_normal():
    return False
    return True
# 测试交易通道
@@ -352,11 +352,13 @@
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
    order_action_ref = huaxin_util.create_order_ref()
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                            "direction": direction,
                            "code": code,
                            "orderRef": orderRef,
                            "orderActionRef": order_action_ref,
                            "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    try:
trade/huaxin/trade_server.py
@@ -27,7 +27,8 @@
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \
    l2_data_log
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer, SecondCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
@@ -36,7 +37,7 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \
    logger_l2_g_cancel, logger_debug, logger_system, logger_trade
    logger_l2_g_cancel, logger_debug, logger_system, logger_trade, logger_l2_process
from third_data import block_info, kpl_api, kpl_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
@@ -288,12 +289,14 @@
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        # async_log_util.info(hx_logger_l2_upload, f"{code}数据处理开始:{thread_id}")
        l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
            l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
        finally:
            pass
            # async_log_util.info(hx_logger_l2_upload, f"{code}数据处理结束:{thread_id}")
            async_log_util.info(logger_l2_process, "code:{} 处理数据数量: {} 最终处理时间:{}", code,
                                len(_datas),
                                round(time.time() * 1000) - now_timestamp)
            l2_data_log.l2_time_log(code, "处理L2逐笔委托结束")
    @classmethod
    def l2_transaction(cls, code, datas):