Administrator
2024-04-26 c4861d2429c2bf3a3f11309ad879b549e62e722d
下单/撤单通信方式修改/G撤修改
6个文件已修改
1个文件已添加
321 ■■■■ 已修改文件
huaxin_client/command_manager.py 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 53 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_cancel.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 115 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -7,6 +7,9 @@
import logging
import multiprocessing
import threading
import time
import zmq
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
@@ -74,7 +77,9 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
@@ -155,17 +160,79 @@
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    ###############ZEROMQ协议接收命令#################
    @classmethod
    def __create_order_command_reciever(cls, ipc_addr):
        """
        接收下单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {use_time}s")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    @classmethod
    def __create_cancel_order_command_reciever(cls, ipc_addr):
        """
        接收撤单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {use_time}s")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    # 维护连接数的稳定
    def run(self, blocking=True):
    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
        if blocking:
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
huaxin_client/trade_client.py
@@ -7,6 +7,8 @@
import threading
import time
import zmq
from huaxin_client import command_manager
from huaxin_client import constant
from huaxin_client import socket_util
@@ -1106,11 +1108,6 @@
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
            #                 "request_id": request_id}), type, client_id, request_id, temp_params[2])
            if trade_response:
                trade_response.OnTradeResponse(
                    {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                     "request_id": request_id})
            else:
                send_response(
                    json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                                "request_id": request_id}), type, client_id, request_id)
@@ -1118,11 +1115,6 @@
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id)
        else:
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            if trade_response:
                trade_response.OnTradeCallback(
                    {"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
            else:
                send_response(
                    json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}),
                    type,
@@ -1140,9 +1132,17 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None,
        queue_strategy_trade_write_=None,
def run(ipc_order_addr, ipc_cancel_order_addr, queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None, queue_strategy_trade_read_for_read=None):
    """
    运行
    @param ipc_order_addr: zmq下单命令ipc地址
    @param ipc_cancel_order_addr: zmq撤单命令ipc地址
    @param queue_strategy_trade_write_:
    @param queue_strategy_trade_read:
    @param queue_strategy_trade_read_for_read:
    @return:
    """
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1151,9 +1151,6 @@
        global queue_strategy_trade_write
        queue_strategy_trade_write = queue_strategy_trade_write_
        global trade_response
        trade_response = trade_response_
        # 运行日志同步
        threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
@@ -1161,7 +1158,7 @@
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
        tradeCommandManager.run(ipc_order_addr, ipc_cancel_order_addr)
    except Exception as e:
        logger_system.exception(e)
    # 不需要运行命令解析
l2/cancel_buy_strategy.py
@@ -1468,10 +1468,10 @@
            if not must_buy:
                temp_thresh_hold_rate = round((total_num - max_num) * 0.9 / total_num, 2)
                thresh_hold_rate = min(thresh_hold_rate, temp_thresh_hold_rate)
            l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}")
            l2_log.l_cancel_debug(code, f"L后计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}")
            if rate >= thresh_hold_rate:
                canceled_indexes.sort()
                l2_log.l_cancel_debug(code, f"L下撤单,撤单位置:{canceled_indexes[-1]}")
                l2_log.l_cancel_debug(code, f"L后撤单,撤单位置:{canceled_indexes[-1]}")
                return True, total_data[canceled_indexes[-1]]
        return False, None
@@ -2221,8 +2221,11 @@
    def cancel_success(self, code):
        self.clear(code)
# ---------------------------------新G撤-------------------------------
class NewGCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __real_place_order_index_dict = {}
    __trade_progress_index_dict = {}
    __watch_indexes_dict = {}
@@ -2234,13 +2237,42 @@
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(NewGCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "g_cancel_watch_index_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                if val:
                    val = json.loads(val)
                    val = set(val)
                CodeDataCacheUtil.set_cache(cls.__watch_indexes_dict, code, val)
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def __set_watch_index(self, code, indexes):
        CodeDataCacheUtil.set_cache(self.__watch_indexes_dict, code, indexes)
        RedisUtils.setex_async(self.__db, f"g_cancel_watch_index_info-{code}", tool.get_expire(),
                               json.dumps(list(indexes)))
    def set_real_place_order_index(self, code, index, buy_single_index, is_default):
        self.__real_place_order_index_dict[code] = (index, is_default)
        start_index = buy_single_index
        if code in self.__trade_progress_index_dict:
            start_index = self.__trade_progress_index_dict.get(code)
        if not is_default:
            trade_index, is_default = TradeBuyQueue().get_traded_index(code)
            if trade_index is None:
                start_index = 0
            else:
                start_index = trade_index
        self.__commpute_watch_indexes(code, start_index, (index, is_default), from_real_order_index_changed=True)
    def clear(self, code=None):
@@ -2249,12 +2281,16 @@
                self.__real_place_order_index_dict.pop(code)
            if code in self.__watch_indexes_dict:
                self.__watch_indexes_dict.pop(code)
            RedisUtils.delete_async(self.__db, f"g_cancel_watch_index_info-{code}")
            if code in self.__trade_progress_index_dict:
                self.__trade_progress_index_dict.pop(code)
        else:
            self.__real_place_order_index_dict.clear()
            self.__watch_indexes_dict.clear()
            self.__trade_progress_index_dict.clear()
            keys = RedisUtils.keys(self.__get_redis(), f"g_cancel_watch_index_info-*")
            for k in keys:
                RedisUtils.delete(self.__get_redis(), k)
    # recompute:是否重新计算
    def __commpute_watch_indexes(self, code, traded_index, real_order_index_info, from_real_order_index_changed=False,
@@ -2296,13 +2332,13 @@
            if from_real_order_index_changed or recompute:
                # 真实下单位改变后才会更新
                final_watch_indexes = origin_watch_index | watch_indexes
                self.__watch_indexes_dict[code] = final_watch_indexes
                self.__set_watch_index(code, final_watch_indexes)
                l2_log.g_cancel_debug(code, f"大单监听:{final_watch_indexes} 是否重新计算:{recompute}")
    def set_trade_progress(self, code, buy_single_index, index):
        if self.__trade_progress_index_dict.get(code) != index:
        # if self.__trade_progress_index_dict.get(code) != index:
            self.__trade_progress_index_dict[code] = index
            self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code))
        # self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code))
    def need_cancel(self, code, buy_exec_index, start_index, end_index):
        if code not in self.__real_place_order_index_dict:
@@ -2417,7 +2453,8 @@
            return True, total_datas[cancel_half_index], f"B撤:撤单索引-{cancel_half_index}"
        return False, None, "还有大单没撤单"
    def test(self):
        self.__set_watch_index("000333", {1, 2, 3, 4, 5, 6, 7})
# ---------------------------------独苗撤-------------------------------
main.py
@@ -28,7 +28,20 @@
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
    """
    策略进程
    @param pipe_server:
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -51,7 +64,7 @@
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_)
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
# 主服务
@@ -105,6 +118,9 @@
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
@@ -122,7 +138,7 @@
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
@@ -138,7 +154,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r)
                          queue_l1_trade_w_strategy_r,trade_ipc_addr)
        # 将tradeServer作为主进程
        l1Process.join()
test/test_cancel.py
New file
@@ -0,0 +1,11 @@
from db.redis_manager_delegate import RedisUtils
from l2.cancel_buy_strategy import NewGCancelBigNumComputer
def test_g_cancel():
    NewGCancelBigNumComputer().test()
if __name__ == '__main__':
    test_g_cancel()
    RedisUtils.run_loop()
trade/huaxin/huaxin_trade_api.py
@@ -11,6 +11,8 @@
import time
import concurrent.futures
import zmq
from code_attribute import gpcode_manager
from huaxin_client import constant as huaxin_client_constant
from log_module import async_log_util
@@ -142,9 +144,55 @@
                pass
def __create_trade_ipc_context(trade_ipc_addr):
    """
    创建IPC发送端口
    @param trade_ipc_addr:(下单地址,撤单地址)
    @return:
    """
    context = zmq.Context()
    global order_socket, cancel_order_socket
    order_socket = context.socket(zmq.REQ)
    order_socket.connect(trade_ipc_addr[0])
    cancel_order_socket = context.socket(zmq.REQ)
    cancel_order_socket.connect(trade_ipc_addr[1])
    # while True:
    #     try:
    #         # datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
    #         # L2SharedMemoryDataUtil.set_data(datas, shared_memory)
    #         socket.send_json({'data': [], "time": time.time()})
    #         response = socket.recv_string()
    #     except Exception as e:
    #         logging.exception(e)
def __order_by_zmq(data_json):
    """
    通过zmq发送下单信息
    @param data_json:
    @return:
    """
    order_socket.send_json(data_json)
    response = order_socket.recv_string()
def __cancel_order_by_zmq(data_json):
    cancel_order_socket.send_json(data_json)
    response = cancel_order_socket.recv_string()
def __test_order():
    time.sleep(60)
    order_ref = huaxin_util.create_order_ref()
    order(1, "000333", 100, 1.00, price_type=2, blocking=False, order_ref=order_ref, shadow_price=0.99)
    time.sleep(60)
    cancel_order(1, "000333", '', orderRef=order_ref, blocking=False)
# 设置交易通信队列
# 暂时不会使用该方法
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_):
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_,
                   trade_ipc_addr):
    global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_
@@ -155,6 +203,11 @@
    t1.start()
    t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
    t1.start()
    # 创建IPC发送端口
    __create_trade_ipc_context(trade_ipc_addr)
    # 测试下单
    threading.Thread(target=lambda: __test_order(), daemon=True).start()
# 交易通道的错误次数
@@ -323,14 +376,12 @@
# 网络请求
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True, is_trade=False):
def __request(_type, data, request_id=None, log_enable=True, is_trade=False):
    """
    请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离
    @param _type:
    @param data:
    @param request_id:
    @param blocking:
    @param is_pipe:
    @param log_enable:
    @param is_trade:
    @return:
@@ -339,14 +390,22 @@
        request_id = __get_request_id(_type)
    try:
        if log_enable:
            async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id,
                                is_pipe)
            async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{}", 0, request_id)
        root_data = {"type": _type,
                     "data": data,
                     "request_id": request_id}
                     "request_id": request_id,
                     "time": time.time()
                     }
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        if is_trade:
            # queue_strategy_w_trade_r.put_nowait(root_data)
            # 采用zmq通信
            if data['trade_type'] == 1:
                __order_by_zmq(root_data)
            elif data['trade_type'] == 2:
                __cancel_order_by_zmq(root_data)
            else:
            queue_strategy_w_trade_r.put_nowait(root_data)
        else:
            queue_strategy_w_trade_r_for_read.put_nowait(root_data)
@@ -402,14 +461,22 @@
        pass
# 下单委托
# direction 1-买  2-卖
# code:代码
# volume:交易量
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None,
          order_ref=None, shadow_price=None):
    """
    下单委托
    @param direction: 1-买  2-卖
    @param code:
    @param volume:交易量
    @param price:价格(如果是卖时不传价格就按照5挡价卖)
    @param price_type:
    @param blocking:是否阻塞进程
    @param sinfo:
    @param request_id:
    @param order_ref:
    @param shadow_price:
    @return:
    """
    timestamp = round(time.time() * 1000)
    if not sinfo:
        sinfo = f"b_{code}_{timestamp}"
@@ -427,8 +494,7 @@
                                "price_type": price_type,
                                "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
                               request_id=request_id,
                               blocking=blocking,
                               is_pipe=is_pipe_channel_normal(), is_trade=True)
                               is_trade=True)
    try:
        if blocking:
            return __read_response(request_id, blocking)
@@ -469,8 +535,7 @@
                                "code": code,
                                "orderRef": orderRef,
                                "orderActionRef": order_action_ref,
                                "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                               is_pipe=is_pipe_channel_normal(), is_trade=True)
                                "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, is_trade=True)
    try:
        return __read_response(request_id, blocking)
    finally:
@@ -489,7 +554,7 @@
def get_delegate_list(can_cancel=True, blocking=True, timeout=TIMEOUT):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
                           {"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
                            "can_cancel": 1 if can_cancel else 0}, blocking=blocking, is_pipe=is_pipe_channel_normal())
                            "can_cancel": 1 if can_cancel else 0})
    return __read_response(request_id, blocking, timeout=timeout)
@@ -497,39 +562,35 @@
# 获取成交列表
def get_deal_list(blocking=True, timeout=TIMEOUT):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
                           {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST})
    return __read_response(request_id, blocking, timeout=timeout)
# 获取持仓列表
def get_position_list(blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
                           {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST})
    return __read_response(request_id, blocking)
# 获取账户资金状况
def get_money(blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
                           {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_MONEY})
    return __read_response(request_id, blocking)
# 设置L2订阅数据
def set_l2_codes_data(codes_data, blocking=True):
    request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
                           {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
                           {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data})
    return __read_response(request_id, blocking)
# 设置L2订阅数据
def __test_trade_channel(sid):
    request_id = __request("test",
                           {"type": "test", "data": {"sid": sid}}, blocking=True, log_enable=False)
                           {"type": "test", "data": {"sid": sid}}, log_enable=False)
    return __read_response(request_id, True, log_enable=False)
trade/huaxin/huaxin_trade_server.py
@@ -1805,7 +1805,17 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r):
        queue_l1_trade_w_strategy_r, trade_ipc_addr):
    """
    @param queue_strategy_r_trade_w:
    @param queue_l1_w_strategy_r:
    @param queue_strategy_w_trade_r:
    @param queue_strategy_w_trade_r_for_read:
    @param queue_l1_trade_w_strategy_r:
    @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
    @return:
    """
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1820,7 +1830,7 @@
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                                        queue_strategy_w_trade_r_for_read)
                                        queue_strategy_w_trade_r_for_read, trade_ipc_addr)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)