Administrator
2 天以前 6a0d3ff5832e57ee1b1374d086f24b3c1679b332
huaxin_client/command_manager.py
@@ -7,6 +7,9 @@
import logging
import multiprocessing
import threading
import time
import zmq
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
@@ -23,6 +26,7 @@
CLIENT_TYPE_POSITION_LIST = "position_list"
CLIENT_TYPE_MONEY = "money"
CLIENT_TYPE_DEAL = "deal"
CLIENT_TYPE_ORDER_FOUND_DETAIL = "order_found_detail"
CLIENT_TYPE_CMD_L2 = "l2_cmd"
@@ -55,6 +59,9 @@
    def OnTest(self, client_id, request_id, data, sk):
        pass
    def OnOrderFoundDetail(self, client_id, request_id, sk, data):
        pass
class L2ActionCallback(object):
    # 监听L2数据
@@ -74,7 +81,9 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
@@ -109,6 +118,11 @@
                cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
            elif _type == CLIENT_TYPE_POSITION_LIST:
                cls.action_callback.OnPositionList(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_ORDER_FOUND_DETAIL:
                cls.action_callback.OnOrderFoundDetail(client_id, request_id, sk, data)
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data, sk)
        except Exception as e:
@@ -155,17 +169,79 @@
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    ###############ZEROMQ协议接收命令#################
    @classmethod
    def __create_order_command_reciever(cls, ipc_addr):
        """
        接收下单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 1, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    @classmethod
    def __create_cancel_order_command_reciever(cls, ipc_addr):
        """
        接收撤单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    # 维护连接数的稳定
    def run(self, blocking=True):
    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
        if blocking:
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()