Administrator
2024-04-26 c4861d2429c2bf3a3f11309ad879b549e62e722d
huaxin_client/command_manager.py
@@ -7,6 +7,9 @@
import logging
import multiprocessing
import threading
import time
import zmq
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
@@ -74,7 +77,9 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
@@ -155,17 +160,79 @@
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    ###############ZEROMQ协议接收命令#################
    @classmethod
    def __create_order_command_reciever(cls, ipc_addr):
        """
        接收下单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {use_time}s")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    @classmethod
    def __create_cancel_order_command_reciever(cls, ipc_addr):
        """
        接收撤单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {use_time}s")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    # 维护连接数的稳定
    def run(self, blocking=True):
    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
        if blocking:
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()