Administrator
2023-11-02 eb33b717023d9871bd74e6dce47a065228cffefc
L2进程与策略进程分开
9个文件已修改
3个文件已添加
781 ■■■■ 已修改文件
huaxin_client/code_queue_distribute_manager.py 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 315 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_listen_manager.py 120 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
outside_api_command_manager.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mul_queue.py 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api_server.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/code_queue_distribute_manager.py
New file
@@ -0,0 +1,45 @@
"""
代码队列管理
"""
class CodeQueueDistributeManager:
    # queue_list
    def __init__(self, queue_list: list):
        flist = []
        for i in range(0, len(queue_list)):
            flist.append((i, queue_list[i]))
        self.queue_list = flist
        self.distibuted_code_queue_dict = {}
    # 获取可用的队列
    def get_available_queue(self):
        distibuted_queue_indexes = set()
        for code in self.distibuted_code_queue_dict:
            distibuted_queue_indexes.add(self.distibuted_code_queue_dict[code][0])
        for q_info in self.queue_list:
            if q_info[0] not in distibuted_queue_indexes:
                return q_info
        return None
    # 为代码分配队列
    def distribute_queue(self, code):
        if code in self.distibuted_code_queue_dict:
            return self.distibuted_code_queue_dict.get(code)
        q_info = self.get_available_queue()
        if not q_info:
            raise Exception("无可用的队列")
        self.distibuted_code_queue_dict[code] = q_info
        return q_info
    # 获取代码分配的队列
    def get_distributed_queue(self, code):
        return self.distibuted_code_queue_dict.get(code)
    def release_distribute_queue(self, code):
        if code in self.distibuted_code_queue_dict:
            self.distibuted_code_queue_dict.pop(code)
    # 获取空闲的位置数量
    def get_free_queue_count(self):
        return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys())
huaxin_client/command_manager.py
@@ -58,7 +58,7 @@
class L2ActionCallback(object):
    # 监听L2数据
    def OnSetL2Position(self, client_id, request_id, codes_data):
    def OnSetL2Position(self, codes_data):
        pass
@@ -147,6 +147,7 @@
# L2指令管理
class L2CommandManager:
    action_callback = None
    @classmethod
    def init(cls, l2_action_callback):
@@ -155,16 +156,9 @@
    @classmethod
    def process_command(cls, _type, client_id, result_json):
        data = result_json["data"]
        request_id = result_json["request_id"]
        ctype = data["type"]
        if not socket_util.is_client_params_sign_right(result_json):
            # 签名出错
            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                      {"code": -1, "msg": "签名错误"})
            return
        codes_data = data["data"]
        ctype = result_json["type"]
        if ctype == CLIENT_TYPE_CMD_L2:
            cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
            cls.action_callback.OnSetL2Position(data)
if __name__ == "__main__":
huaxin_client/l1_client.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import threading
import time
@@ -122,7 +123,7 @@
__latest_subscript_codes = set()
def __upload_codes_info(pipe_l2, datas):
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    if not tool.is_trade_time():
        return
    # 上传数据
@@ -130,8 +131,8 @@
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
    # 记录新增加的代码
    codes = set([x[0] for x in datas])
    add_codes = codes - __latest_subscript_codes
@@ -160,7 +161,7 @@
        pass
def run(pipe_l2):
def run(queue_l1_w_strategy_r):
    logger_local_huaxin_l1.info("运行l1订阅服务")
    codes_sh = []
    codes_sz = []
@@ -203,8 +204,8 @@
    # 测试链路
    # level1_data_dict["000969"] = (
    #     "000969", 9.46, 9.11, 771000*100, time.time())
    # level1_data_dict["000961"] = (
    #     "000961",1.93, 10.29, 2638000 * 100, time.time())
    level1_data_dict["002292"] = (
        "002292", 8.06, 9.96, 969500 * 100, time.time())
    # 等待程序结束
    while True:
@@ -226,7 +227,7 @@
            codes = [x[0] for x in datas]
            print("代码数量:", len(datas))
            logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
            __upload_codes_info(pipe_l2, datas)
            __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            logging.exception(e)
        finally:
huaxin_client/l2_client.py
@@ -6,12 +6,15 @@
import queue
import threading
import time
from typing import List
from huaxin_client import command_manager, l2_data_transform_protocol
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.command_manager import L2ActionCallback
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
@@ -40,7 +43,6 @@
SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
spi = None
set_codes_data_queue = queue.Queue()
market_code_dict = {}
@@ -56,10 +58,11 @@
    # 买入的大单订单号
    def __init__(self, api):
    def __init__(self, api, l2_data_upload_manager):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
    def __split_codes(self, codes):
        szse_codes = []
@@ -124,12 +127,16 @@
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        for c in codes:
            l2_data_manager.add_target_code(c)
        for c in del_codes:
            l2_data_manager.del_target_code(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c, l2_data_callback)
        try:
            for c in del_codes:
                self.l2_data_upload_manager.release_distributed_upload_queue(c)
                l2_data_manager.del_target_code(c)
            for c in codes:
                self.l2_data_upload_manager.distribute_upload_queue(c)
                l2_data_manager.add_target_code(c)
        except Exception as e:
            logger_system.error(f"L2代码分配上传队列出错:{str(e)}")
            logger_system.exception(e)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
@@ -268,8 +275,7 @@
                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                 ]}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
            l2_data_manager.add_market_data(d)
            self.l2_data_upload_manager.add_market_data(d)
        except:
            pass
@@ -289,12 +295,6 @@
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            # G撤数据暂时注释
            # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
            # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos:
            #     # 正在成交的订单撤单了
            #     l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo'])
            #     async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pTransaction['BuyNo']}")
            if min_volume is None:
                # 默认筛选50w
                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
@@ -320,8 +320,7 @@
                item["Side"] = "2"
            # 深证撤单
            print("逐笔委托", item)
            l2_data_manager.add_l2_order_detail(item, 0, True)
            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                # 涨停价
@@ -338,7 +337,7 @@
                #     return
                # self.__last_transaction_keys_dict[code] = key
                # print("逐笔成交", item)
                l2_data_manager.add_transaction_detail(item)
                self.l2_data_upload_manager.add_transaction_detail(item)
    def OnRtnOrderDetail(self, pOrderDetail):
        can_listen = False
@@ -346,7 +345,8 @@
        start_time = 0
        if code in self.special_code_volume_for_order_dict:
            start_time = time.time()
            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
                'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
                # 监控目标订单与影子订单
                if self.special_code_volume_for_order_dict[code][1] > time.time():
                    # 特殊量监听
@@ -354,13 +354,6 @@
                else:
                    self.special_code_volume_for_order_dict.pop(code)
        if not can_listen:
            # 暂时注释掉G撤相关数据产生
            # if pOrderDetail['OrderStatus'] == b'D':
            #     transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
            #     if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos:
            #         # 正在成交的订单撤单了
            #         l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO'])
            #         async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pOrderDetail['OrderNO']}")
            min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
            if min_volume is None:
                # 默认筛选50w
@@ -376,7 +369,7 @@
                "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                "OrderStatus": pOrderDetail['OrderStatus'].decode()}
        l2_data_manager.add_l2_order_detail(item, start_time)
        self.l2_data_upload_manager.add_l2_order_detail(item, start_time)
    def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                            FirstLevelSellOrderVolumes):
@@ -481,7 +474,7 @@
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, client_id, request_id, codes_data):
    def OnSetL2Position(self, codes_data):
        print("L2订阅数量:", len(codes_data))
        logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
@@ -490,7 +483,7 @@
            logging.exception(e)
def __init_l2():
def __init_l2(l2_data_upload_manager):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -502,7 +495,7 @@
    # case 2非缓存模式
    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api)
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -522,32 +515,21 @@
    api.Init()
def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue):
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                value = value.decode("utf-8")
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                if data["type"] == "listen_volume":
                _type = data["type"]
                if _type == "listen_volume":
                    volume = data["data"]["volume"]
                    code = data["data"]["code"]
                    spi.set_code_special_watch_volume(code, volume)
        except Exception as e:
            logging.exception(e)
def __receive_from_pipe_strategy(pipe_):
    logger_system.info(f"l2_client __receive_from_pipe_strategy 线程ID:{tool.get_thread_id()}")
    while True:
        # print("__receive_from_pipe_strategy")
        try:
            val = pipe_.recv()
            if val:
                print("L2客户端接受到数据")
                data = json.loads(val)
                if data["data"]["type"] == "l2_cmd":
                elif _type == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
        except Exception as e:
            logging.exception(e)
@@ -556,28 +538,24 @@
pipe_strategy = None
def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy,
        _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None:
def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if queue_trade_w_l2_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True)
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        if _pipe_strategy is not None:
            global pipe_strategy
            pipe_strategy = _pipe_strategy
            t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
            t1.start()
        __init_l2()
        global l2_data_callback
        l2_data_callback = _l2_data_callback
        l2_data_manager.run_upload_common(l2_data_callback)
        l2_data_manager.run_upload_trading_canceled(l2_data_callback)
        # 初始化
        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
                                                     transaction_queue_distribute_manager, market_queue)
        __init_l2(l2_data_upload_manager)
        l2_data_manager.run_upload_common()
        l2_data_manager.run_log()
        l2_data_manager.run_upload_daemon(l2_data_callback)
        # l2_data_manager.run_test(l2_data_callback)
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
@@ -589,6 +567,6 @@
if __name__ == "__main__":
    run(None, None, None)
    # run(None, None, None)
    # spi.set_codes_data([("000333", 12000)])
    input()
huaxin_client/l2_data_manager.py
@@ -1,19 +1,18 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import queue
import random
import threading
import time
from huaxin_client import socket_util, l2_data_transform_protocol
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export, async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from log_module import  async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_system
from utils import tool
order_detail_upload_active_time_dict = {}
@@ -24,12 +23,51 @@
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
log_buy_no_queue = queue.Queue()
# 买入订单号的字典
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_orders_dict = {}
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time, istransaction=False):
        code = data["SecurityID"]
        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        queue_info[1].put_nowait(
            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
        if not queue_info:
            return
        # 判断是否为大单成交
        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                  data['SellNo'], data['ExecType']))
    def add_market_data(self, data):
        # 加入上传队列
        self.market_data_queue.put_nowait(data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
        self.order_queue_distribute_manager.distribute_queue(code)
        self.transaction_queue_distribute_manager.distribute_queue(code)
    # 释放已经分配的队列
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
def add_target_code(code):
@@ -42,89 +80,6 @@
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
# 获取最近的大单成交订单号
def get_latest_transaction_order_nos(code):
    return latest_big_order_transaction_orders_dict.get(code)
# 正在成交的订单撤单了
def trading_order_canceled(code_, order_no):
    trading_canceled_queue.put((code_, order_no))
# 添加委托详情
def add_l2_order_detail(data, start_time, istransaction=False):
    code = data["SecurityID"]
    # 异步日志记录
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    # 原来的格式
    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
    #                 "Volume": pOrderDetail['Volume'],
    #                 "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
    # 用于G撤的数据,暂时注释
    # if data['Side'] == "1":
    #     # 记录所有买入的订单号
    #     if data['SecurityID'] not in buy_order_nos_dict:
    #         buy_order_nos_dict[data['SecurityID']] = set()
    #     buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
    #     # 买入订单号需要记录日志
    #     async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
# 添加逐笔成交
def add_transaction_detail(data):
    code = data["SecurityID"]
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    # 原来的格式
    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
    #                     "TradeVolume": pTransaction['TradeVolume'],
    #                     "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
    #                     "ExecType": pTransaction['ExecType'].decode()}
    # 判断是否为大单成交
    code = data['SecurityID']
    # G撤相关数据操作暂时注释
    # if code in buy_order_nos_dict:
    #     if data['BuyNo'] in buy_order_nos_dict[code]:
    #         try:
    #             temp_list = latest_big_order_transaction_orders_dict.get(code)
    #             if not temp_list:
    #                 temp_list = []
    #             if temp_list:
    #                 if temp_list[-1] != data['BuyNo']:
    #                     # 不加入重复订单号
    #                     temp_list.append(data['BuyNo'])
    #                     if len(temp_list) > 10:
    #                         # 最多加10个订单号
    #                         temp_list = temp_list[-10:]
    #             else:
    #                 temp_list.append(data['BuyNo'])
    #             latest_big_order_transaction_orders_dict[code] = temp_list
    #         except:
    #             pass
    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                           data['SellNo'], data['ExecType']))
def add_market_data(data):
    code = data['securityID']
    # 加入上传队列
    common_queue.put((code, "l2_market_data", data))
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
@@ -165,7 +120,6 @@
# 上传数据
def upload_data(code, _type, datas, new_sk=False):
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
@@ -180,76 +134,16 @@
        logging.exception(e)
    finally:
        pass
        # print("请求结束", uid, result)
        # logger_local_huaxin_l2_upload.info(
        #     f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
    # print("上传结果", result)
# 循环读取上传数据
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    if True:
        while True:
            # print("order task")
            try:
                if code not in target_codes:
                    break
                order_detail_upload_active_time_dict[code] = time.time()
                udatas = []
                while not tmep_order_detail_queue_dict[code].empty():
                    temp = tmep_order_detail_queue_dict[code].get()
                    udatas.append(temp)
                if udatas:
                    # start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    # use_time = int((time.time() - start_time) * 1000)
                    # if use_time > 10:
                    #     async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms")
                else:
                    # 没有数据的时候需等待,有数据时不需等待
                    time.sleep(0.001)
            except Exception as e:
                hx_logger_contact_debug.exception(e)
                logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
                pass
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
        # print("trans task")
        try:
            if code not in target_codes:
                break
            transaction_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_transaction_queue_dict[code].empty():
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                # upload_data(code, "l2_trans", udatas)
                l2_data_callback.OnL2Transaction(code, udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
def __run_upload_common(l2_data_callback: L2DataCallBack):
def __run_upload_common():
    print("__run_upload_common")
    logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                if temp[1] == "l2_market_data":
                    l2_data_callback.OnMarketData(temp[0], temp[2])
                else:
                    upload_data(temp[0], temp[1], temp[2])
                upload_data(temp[0], temp[1], temp[2])
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
@@ -258,121 +152,30 @@
            time.sleep(0.01)
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    print("__run_upload_trading_canceled")
    logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
def __run_log():
    print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    async_log_util.huaxin_l2_log.run_sync()
__upload_order_threads = {}
__upload_trans_threads = {}
# 运行上传任务
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    try:
        # 如果代码没有在目标代码中就不需要运行
        if code not in target_codes:
            return
        # 如果最近的活动时间小于2s就不需要运行
        if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
            t.start()
            __upload_order_threads[code] = t
        if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
            t.start()
            __upload_trans_threads[code] = t
    finally:
        pass
def run_upload_common(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
    t.start()
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
# 采用socket传输数据
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
    t.start()
def run_log():
    # G撤相关数据,暂时注释
    # fdatas = log_export.load_huaxin_local_buy_no()
    # global buy_order_nos_dict
    # buy_order_nos_dict = fdatas
    t = threading.Thread(target=lambda: __run_log(), daemon=True)
    t.start()
# 运行守护线程
def run_upload_daemon(_l2_data_callback):
    def upload_daemon():
        logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                for code in target_codes_add_time:
                    # 目标代码加入2s之后启动守护
                    if time.time() - target_codes_add_time[code] > 2:
                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_order_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}")
                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_trans_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}")
            except:
                pass
            finally:
                time.sleep(3)
    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
    t.start()
def __test(_l2_data_callback):
def __test():
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
    t.start()
    while True:
        try:
            tmep_order_detail_queue_dict[code].put_nowait(
                ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
            time.sleep(5)
        except:
            pass
    pass
def run_test(_l2_data_callback):
    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
def run_test():
    t = threading.Thread(target=lambda: __test(), daemon=True)
    t.start()
huaxin_client/trade_client.py
@@ -159,8 +159,8 @@
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        if queue_trade_w_l2_r is not None:
            queue_trade_w_l2_r.put_nowait(
        if queue_other_w_l2_r is not None:
            queue_other_w_l2_r.put_nowait(
                json.dumps({"type": "listen_volume", "data": {"code": code,
                                                              "volume": count}}).encode(
                    'utf-8'))
@@ -1110,15 +1110,15 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def run(trade_response_: TradeResponse = None, queue_trade_w_l2_r_: multiprocessing.Queue = None,
def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None,
        queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
        __init_trade_data_server()
        global queue_trade_w_l2_r
        queue_trade_w_l2_r = queue_trade_w_l2_r_
        global queue_other_w_l2_r
        queue_other_w_l2_r = queue_other_w_l2_r_
        global queue_strategy_trade_write
        queue_strategy_trade_write = queue_strategy_trade_write_
l2/l2_data_listen_manager.py
New file
@@ -0,0 +1,120 @@
"""
L2数据监听
"""
import multiprocessing
import threading
import time
from log_module import async_log_util
from log_module.log import logger_debug
__l2_order_active_time_dict = {}
__l2_transaction_active_time_dict = {}
class L2DataListenManager:
    TYPE_ORDER = "order"
    TYPE_TRANSACTION = "transaction"
    TYPE_MARKET = "market"
    def __init__(self, l2_data_callback):
        self.my_l2_data_callback = l2_data_callback
        self.__l2_order_active_time_dict = {}
        self.__l2_transaction_active_time_dict = {}
        self.__l2_market_active_time_dict = {}
    # 接收L2逐笔委托数据
    def __recive_l2_orders(self, q: multiprocessing.Queue):
        __id = id(q)
        count = 0
        while True:
            datas_dict = {}
            try:
                while not q.empty():
                    item = q.get()
                    if item[0] not in datas_dict:
                        datas_dict[item[0]] = []
                    datas_dict[item[0]].append(item)
                if datas_dict:
                    for c in datas_dict:
                        self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
                else:
                    time.sleep(0.002)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                datas_dict.clear()
                count += 1
                if count > 100:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_order_active_time_dict[__id] = time.time()
    # 接收L2逐笔成交数据
    def __recive_transaction_orders(self, q: multiprocessing.Queue):
        __id = id(q)
        datas_dict = {}
        count = 0
        while True:
            try:
                while not q.empty():
                    item = q.get()
                    if item[0] not in datas_dict:
                        datas_dict[item[0]] = []
                    datas_dict[item[0]].append(item)
                if datas_dict:
                    for c in datas_dict:
                        self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
                else:
                    time.sleep(0.01)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                datas_dict.clear()
                count += 1
                if count > 50:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_transaction_active_time_dict[__id] = time.time()
    def __recive_l2_markets(self, q: multiprocessing.Queue):
        __id = id(q)
        while True:
            try:
                if not q.empty():
                    item = q.get()
                    self.my_l2_data_callback.OnMarketData(item['securityID'], item)
                else:
                    time.sleep(0.002)
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                self.__l2_market_active_time_dict[__id] = time.time()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
    def get_active_count(self, type_):
        expire_time = time.time() - 5
        active_count = 0
        if type_ == self.TYPE_ORDER:
            for _id in self.__l2_order_active_time_dict:
                if self.__l2_order_active_time_dict[_id] > expire_time:
                    active_count += 1
        elif type_ == self.TYPE_TRANSACTION:
            for _id in self.__l2_transaction_active_time_dict:
                if self.__l2_transaction_active_time_dict[_id] > expire_time:
                    active_count += 1
        elif type_ == self.TYPE_MARKET:
            for _id in self.__l2_market_active_time_dict:
                if self.__l2_market_active_time_dict[_id] > expire_time:
                    active_count += 1
        return active_count
main.py
@@ -22,7 +22,9 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -33,7 +35,8 @@
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r),
                          daemon=True)
    t1.start()
    #
@@ -41,14 +44,9 @@
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_,
                            transaction_queues_, market_queue_)
# 主服务
@@ -83,13 +81,10 @@
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 交易写L2读
        queue_trade_w_l2_r = multiprocessing.Queue()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        #
        queue_l1_w_strategy_r = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -102,19 +97,37 @@
        logger_system.info("主进程ID:{}", os.getpid())
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,))
        l1Process.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
            target=huaxin_client.trade_client.run,
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,))
        tradeProcess.start()
        # 创建L2通信队列
        order_queues = []
        transaction_queues = []
        market_queue = multiprocessing.Queue()
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            transaction_queues.append(multiprocessing.Queue())
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
        l2Process.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          order_queues, transaction_queues, market_queue)
        # 将tradeServer作为主进程
        l1Process.join()
        l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
outside_api_command_manager.py
@@ -54,7 +54,9 @@
API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 同步L1需要订阅的代码
API_TYPE_SYSTEM_LOG = "system_log"  # 系统日志
API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server"  # 从数据服务器拉取数据
API_TYPE_CODE_TRADE_INFO = "code_trade_info"
API_TYPE_CODE_TRADE_INFO = "code_trade_info"  # 代码交易信息
API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count"  # L2有效监听数量
class ActionCallback(object):
    # 交易
@@ -99,6 +101,9 @@
    # 代码的交易信息
    def OnGetCodeTradeInfo(self, client_id, request_id, data):
        pass
    def OnGetActiveListenCount(self, client_id, request_id):
        pass
@@ -202,6 +207,9 @@
                            cls.action_callback.OnGetFromDataServer(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_TRADE_INFO:
                            cls.action_callback.OnGetCodeTradeInfo(client_id, request_id, data)
                        elif content_type == API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT:
                            cls.action_callback.OnGetActiveListenCount(client_id, request_id)
                    except Exception as e:
                        logging.exception(e)
                        pass
test/test_mul_queue.py
New file
@@ -0,0 +1,43 @@
import logging
import multiprocessing
import time
def run_process1(queue: multiprocessing.Queue):
    while True:
        try:
            queue.put_nowait("process1")
            time.sleep(1)
        except Exception as e:
            logging.exception(e)
def run_process2(queue: multiprocessing.Queue):
    while True:
        try:
            queue.put_nowait("process2")
            time.sleep(1)
        except Exception as e:
            logging.exception(e)
def run_process3(queue: multiprocessing.Queue):
    while True:
        try:
            print(queue.get())
            time.sleep(0.001)
        except Exception as e:
            logging.exception(e)
if __name__ == "__main__":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=run_process1, args=(q,))
    p2 = multiprocessing.Process(target=run_process2, args=(q,))
    p3 = multiprocessing.Process(target=run_process3, args=(q,))
    p1.start()
    p2.start()
    p3.start()
    while True:
        time.sleep(0.1)
trade/huaxin/huaxin_trade_api_server.py
@@ -1,6 +1,7 @@
import hashlib
import json
import logging
import multiprocessing
import socket
import socketserver
import threading
@@ -427,7 +428,7 @@
        super().finish()
def __set_target_codes(pipe_l2):
def __set_target_codes(queue_other_w_l2_r: multiprocessing.Queue):
    logger_system.info("启动读取L2订阅代码队列")
    while True:
        try:
@@ -470,11 +471,9 @@
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    root_data = {"data": {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                          "data": datas},
                                 "request_id": f"{ClientSocketManager.CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}"}
                    root_data = socket_util.encryp_client_params_sign(root_data)
                    pipe_l2.send(json.dumps(root_data))
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas}
                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    print("设置L2代码结束")
                    # 如果在9:24-9:30 需要加载板块
                    if int("092400") < int(tool.get_now_time_str().replace(":", "")) < int("093000"):
@@ -535,13 +534,13 @@
            time.sleep(1)
def run(pipe_server, pipe_l2):
def run(pipe_server, queue_other_w_l2_r):
    logger_system.info("create TradeApiServer")
    logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}")
    # 拉取交易信息
    huaxin_trade_data_update.run()
    #
    t1 = threading.Thread(target=lambda: __set_target_codes(pipe_l2), daemon=True)
    t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True)
trade/huaxin/huaxin_trade_server.py
@@ -36,6 +36,7 @@
    GCancelBigNumComputer, SecondCancelBigNumComputer, LCancelRateManager, LatestCancelIndexManager
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_listen_manager import L2DataListenManager
from l2.l2_data_util import L2DataUtil
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinTransactionDatasProcessor
@@ -293,7 +294,7 @@
    def l2_order(cls, code, _datas, timestamp):
        now_timestamp = int(time.time() * 1000)
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
                            f"{code}#耗时:{int((time.time() - timestamp)*1000)}-{now_timestamp}#{_datas}")
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
@@ -378,12 +379,12 @@
            time.sleep(2)
def __recv_pipe_l1(pipe_l1):
def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}")
    if pipe_l1 is not None:
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = pipe_l1.recv()
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    print("收到来自L1的数据:", val["type"])
@@ -817,6 +818,17 @@
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
    def OnGetActiveListenCount(self, client_id, request_id):
        try:
            order = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id)
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
    def OnL2Order(self, code, datas, timestamp):
@@ -825,8 +837,8 @@
    def OnL2Transaction(self, code, datas):
        TradeServerProcessor.l2_transaction(code, datas)
    def OnMarketData(self, code, datas):
        TradeServerProcessor.l2_market_data(code, datas)
    def OnMarketData(self, code, data):
        TradeServerProcessor.l2_market_data(code, data)
    def OnTradingOrderCancel(self, code, buy_no):
        TradeServerProcessor.trading_order_canceled(code, buy_no)
@@ -860,9 +872,11 @@
# 回调
my_l2_data_callback = MyL2DataCallback()
my_trade_response = MyTradeResponse()
l2DataListenManager: L2DataListenManager = None
def run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r):
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, order_queues, transaction_queues,
        market_queue):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -875,11 +889,16 @@
                     OutsideApiCommandCallback())
        manager.run(blocking=False)
        # 监听L2数据
        global l2DataListenManager
        l2DataListenManager = L2DataListenManager(my_l2_data_callback)
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
        t1.start()
        # 同步异步日志