Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/command_manager.py
@@ -5,11 +5,16 @@
import concurrent.futures
import json
import logging
import multiprocessing
import threading
import time
import zmq
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
from log_module.log import logger_local_huaxin_trade_debug
from log_module import async_log_util
from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug
MSG_TYPE_HEART = "heart"
# 命令信息
@@ -56,7 +61,7 @@
class L2ActionCallback(object):
    # 监听L2数据
    def OnSetL2Position(self, client_id, request_id, codes_data):
    def OnSetL2Position(self, codes_data):
        pass
@@ -64,7 +69,7 @@
class TradeCommandManager:
    trade_client_dict = {}
    _instance = None
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
@@ -72,34 +77,34 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback, pipe_l2, pipe_strategy):
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.pipe_strategy = pipe_strategy
        cls.pipe_l2 = pipe_l2
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}")
        # 查看是否是设置L2的代码
        if _type == CLIENT_TYPE_CMD_L2:
            cls.pipe_l2.send(
                json.dumps({"type": "set_l2_codes", "data": result_json["data"]}))
            return
        try:
            data = result_json["data"]
            print("接收内容", result_json)
            request_id = result_json.get('request_id')
            if not socket_util.is_client_params_sign_right(result_json):
                print("签名错误")
                # 签名出错
                SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                          {"code": -1, "msg": "签名错误"})
                return
            # 暂时取消签名
            # if not socket_util.is_client_params_sign_right(result_json):
            #     print("签名错误")
            #     # 签名出错
            #     SendResponseSkManager.send_error_response(_type, request_id, client_id,
            #                                               {"code": -1, "msg": "签名错误"})
            #     return
            if _type == CLIENT_TYPE_TRADE:
                # 交易
                ctype = data["trade_type"]
                async_log_util.info(logger_trade, f"交易开始:{request_id}")
                cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
                async_log_util.info(logger_trade, f"交易结束:{request_id}")
            elif _type == CLIENT_TYPE_MONEY:
                cls.action_callback.OnMoney(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DEAL_LIST:
@@ -112,42 +117,128 @@
            elif _type == "test":
                cls.action_callback.OnTest(client_id, request_id, data, sk)
        except Exception as e:
            logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}")
            logging.exception(e)
            logging.error(result_json)
            async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}")
            # logging.exception(e)
            # logging.error(result_json)
    @classmethod
    def run_process_command(cls, pipe_strategy):
        if pipe_strategy is None:
    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
        if queue_strategy_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = pipe_strategy.recv()
                    val = queue_strategy_trade.get()
                    if val:
                        val = json.loads(val)
                        print("run_process_command", val)
                        _type = val["type"]
                        threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start()
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command(_type, None, val)
                except Exception as e:
                    logger_local_huaxin_trade_debug.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    @classmethod
    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
        if queue_strategy_trade_read_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = queue_strategy_trade_read_trade.get()
                    if val:
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
        except Exception as e:
            logger_local_huaxin_trade_debug.exception(e)
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    ###############ZEROMQ协议接收命令#################
    @classmethod
    def __create_order_command_reciever(cls, ipc_addr):
        """
        接收下单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 1, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    @classmethod
    def __create_cancel_order_command_reciever(cls, ipc_addr):
        """
        接收撤单命令
        @param ipc_addr: ipc地址
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv_json()
            try:
                request_id = data.get('request_id')
                use_time = time.time() - data.get('time')
                data = data.get('data')
                cls.action_callback.OnTrade(None, request_id, None, 2, data)
                async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms   request_id:{request_id}")
            except Exception as e:
                logger_local_huaxin_trade_debug.exception(e)
            finally:
                socket.send_string("SUCCESS")
    # 维护连接数的稳定
    def run(self, blocking=True):
    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
        if blocking:
            self.run_process_command(self.pipe_strategy)
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True)
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
            t1.start()
            t1 = threading.Thread(
                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
            t1.start()
# L2指令管理
class L2CommandManager:
    action_callback = None
    @classmethod
    def init(cls, l2_action_callback):
@@ -156,16 +247,9 @@
    @classmethod
    def process_command(cls, _type, client_id, result_json):
        data = result_json["data"]
        request_id = result_json["request_id"]
        ctype = data["type"]
        if not socket_util.is_client_params_sign_right(result_json):
            # 签名出错
            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                      {"code": -1, "msg": "签名错误"})
            return
        codes_data = data["data"]
        ctype = result_json["type"]
        if ctype == CLIENT_TYPE_CMD_L2:
            cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
            cls.action_callback.OnSetL2Position(data)
if __name__ == "__main__":