Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/command_manager.py
@@ -5,12 +5,16 @@
import concurrent.futures
import json
import logging
import multiprocessing
import threading
import time
import zmq
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util
from log_module.log import logger_local_huaxin_trade_debug, logger_trade
from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug
MSG_TYPE_HEART = "heart"
# 命令信息
@@ -57,7 +61,7 @@
class L2ActionCallback(object):
    # 监听L2数据
    def OnSetL2Position(self, client_id, request_id, codes_data):
    def OnSetL2Position(self, codes_data):
        pass
@@ -73,19 +77,17 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, pipe_strategy):
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.pipe_strategy = pipe_strategy
        cls.pipe_l2 = pipe_l2
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}")
        # 查看是否是设置L2的代码
        if _type == CLIENT_TYPE_CMD_L2:
            cls.pipe_l2.send(
                json.dumps({"type": "set_l2_codes", "data": result_json["data"]}))
            return
        try:
            data = result_json["data"]
            request_id = result_json.get('request_id')
@@ -115,41 +117,128 @@
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data, sk)
        except Exception as e:
            logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}")
            logging.exception(e)
            logging.error(result_json)
            async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}")
            # logging.exception(e)
            # logging.error(result_json)
    @classmethod
    def run_process_command(cls, pipe_strategy):
        if pipe_strategy is None:
    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
        if queue_strategy_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = pipe_strategy.recv()
                    val = queue_strategy_trade.get()
                    if val:
                        val = json.loads(val)
                        _type = val["type"]
                        cls.process_command_thread_pool.submit(lambda:  cls.process_command(_type, None, val))
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command(_type, None, val)
                except Exception as e:
                    logger_local_huaxin_trade_debug.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    @classmethod
    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
        if queue_strategy_trade_read_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = queue_strategy_trade_read_trade.get()
                    if val:
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    ###############ZEROMQ协议接收命令#################
    @classmethod
    def __create_order_command_reciever(cls, ipc_addr):
        """
        接收下单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 1, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    @classmethod
    def __create_cancel_order_command_reciever(cls, ipc_addr):
        """
        接收撤单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    # 维护连接数的稳定
    def run(self, blocking=True):
    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
        if blocking:
            self.run_process_command(self.pipe_strategy)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True)
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
# L2指令管理
class L2CommandManager:
    action_callback = None
    @classmethod
    def init(cls, l2_action_callback):
@@ -158,16 +247,9 @@
    @classmethod
    def process_command(cls, _type, client_id, result_json):
        data = result_json["data"]
        request_id = result_json["request_id"]
        ctype = data["type"]
        if not socket_util.is_client_params_sign_right(result_json):
            # 签名出错
            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                      {"code": -1, "msg": "签名错误"})
            return
        codes_data = data["data"]
        ctype = result_json["type"]
        if ctype == CLIENT_TYPE_CMD_L2:
            cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
            cls.action_callback.OnSetL2Position(data)
if __name__ == "__main__":