# -*- coding: utf-8 -*- """ 命令管理器 """ import concurrent.futures import json import logging import multiprocessing import threading import time import zmq from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager from log_module import async_log_util from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug MSG_TYPE_HEART = "heart" # 命令信息 MSG_TYPE_CMD = "cmd" CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_DELEGATE_LIST = "delegate_list" CLIENT_TYPE_DEAL_LIST = "deal_list" CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" CLIENT_TYPE_CMD_L2 = "l2_cmd" # 心跳时间间隔 HEART_SPACE_TIME = 3 class TradeActionCallback(object): # 交易 def OnTrade(self, client_id, request_id, sk, type_, data): pass # 委托列表 def OnDelegateList(self, client_id, request_id, sk, can_cancel): pass # 成交列表 def OnDealList(self, client_id, request_id, sk): pass # 成交列表 def OnPositionList(self, client_id, request_id, sk): pass # 获取资金信息 def OnMoney(self, client_id, request_id, sk): pass # 测试 def OnTest(self, client_id, request_id, data, sk): pass class L2ActionCallback(object): # 监听L2数据 def OnSetL2Position(self, codes_data): pass # 交易指令管理 class TradeCommandManager: trade_client_dict = {} _instance = None process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance @classmethod def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue, queue_strategy_trade_read_for_read: multiprocessing.Queue): cls.action_callback = trade_action_callback cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read @classmethod def process_command(cls, _type, client_id, result_json, sk=None): async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}") # 查看是否是设置L2的代码 try: data = result_json["data"] request_id = result_json.get('request_id') # 暂时取消签名 # if not socket_util.is_client_params_sign_right(result_json): # print("签名错误") # # 签名出错 # SendResponseSkManager.send_error_response(_type, request_id, client_id, # {"code": -1, "msg": "签名错误"}) # return if _type == CLIENT_TYPE_TRADE: # 交易 ctype = data["trade_type"] async_log_util.info(logger_trade, f"交易开始:{request_id}") cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data) async_log_util.info(logger_trade, f"交易结束:{request_id}") elif _type == CLIENT_TYPE_MONEY: cls.action_callback.OnMoney(client_id, request_id, sk) elif _type == CLIENT_TYPE_DEAL_LIST: cls.action_callback.OnDealList(client_id, request_id, sk) elif _type == CLIENT_TYPE_DELEGATE_LIST: can_cancel = data["can_cancel"] cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel) elif _type == CLIENT_TYPE_POSITION_LIST: cls.action_callback.OnPositionList(client_id, request_id, sk) elif _type == "test": cls.action_callback.OnTest(client_id, request_id, data, sk) except Exception as e: async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}") # logging.exception(e) # logging.error(result_json) @classmethod def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue): if queue_strategy_trade is None: return # 本地命令接收 try: while True: try: val = queue_strategy_trade.get() if val: _type = val["type"] if _type != "test": async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") cls.process_command(_type, None, val) except Exception as e: logger_local_huaxin_trade_debug.exception(e) except Exception as e: async_log_util.exception(logger_local_huaxin_trade_debug, e) @classmethod def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): if queue_strategy_trade_read_trade is None: return # 本地命令接收 try: while True: try: val = queue_strategy_trade_read_trade.get() if val: _type = val["type"] if _type != "test": async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) except Exception as e: async_log_util.exception(logger_local_huaxin_trade_debug, e) logging.exception(e) except Exception as e: async_log_util.exception(logger_local_huaxin_trade_debug, e) ###############ZEROMQ协议接收命令################# @classmethod def __create_order_command_reciever(cls, ipc_addr): """ 接收下单命令 @param ipc_addr: ipc地址 @return: """ context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(ipc_addr) while True: data = socket.recv_json() try: request_id = data.get('request_id') use_time = time.time() - data.get('time') data = data.get('data') cls.action_callback.OnTrade(None, request_id, None, 1, data) async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") except Exception as e: logger_local_huaxin_trade_debug.exception(e) finally: socket.send_string("SUCCESS") @classmethod def __create_cancel_order_command_reciever(cls, ipc_addr): """ 接收撤单命令 @param ipc_addr: ipc地址 @return: """ context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(ipc_addr) while True: data = socket.recv_json() try: request_id = data.get('request_id') use_time = time.time() - data.get('time') data = data.get('data') cls.action_callback.OnTrade(None, request_id, None, 2, data) async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") except Exception as e: logger_local_huaxin_trade_debug.exception(e) finally: socket.send_string("SUCCESS") # 维护连接数的稳定 def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): if blocking: t1 = threading.Thread( target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) t1.start() t1 = threading.Thread( target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) t1.start() t1 = threading.Thread( target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) t1.start() self.run_process_command(self.queue_strategy_trade_read) else: # 接受命令 t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) t1.start() t1 = threading.Thread( target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) t1.start() t1 = threading.Thread( target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) t1.start() t1 = threading.Thread( target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) t1.start() # L2指令管理 class L2CommandManager: action_callback = None @classmethod def init(cls, l2_action_callback): cls.action_callback = l2_action_callback @classmethod def process_command(cls, _type, client_id, result_json): data = result_json["data"] ctype = result_json["type"] if ctype == CLIENT_TYPE_CMD_L2: cls.action_callback.OnSetL2Position(data) if __name__ == "__main__": manager = TradeCommandManager("127.0.0.1", 10008, None) manager.run() input()