| | |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | |
| | | |
| | | class L2ActionCallback(object): |
| | | # 监听L2数据 |
| | | def OnSetL2Position(self, client_id, request_id, codes_data): |
| | | def OnSetL2Position(self, codes_data): |
| | | pass |
| | | |
| | | |
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue): |
| | | def init(cls, trade_action_callback: TradeActionCallback, |
| | | queue_strategy_trade_read_for_trade: multiprocessing.Queue, |
| | | queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command(_type, None, val) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | @classmethod |
| | | def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade_read_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = queue_strategy_trade_read_trade.get() |
| | | if val: |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | ###############ZEROMQ协议接收命令################# |
| | | @classmethod |
| | | def __create_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收下单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 1, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | @classmethod |
| | | def __create_cancel_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收撤单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {round(use_time*1000,3)}ms request_id:{request_id}") |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): |
| | | if blocking: |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | self.run_process_command(self.queue_strategy_trade_read) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | # L2指令管理 |
| | | class L2CommandManager: |
| | | action_callback = None |
| | | |
| | | @classmethod |
| | | def init(cls, l2_action_callback): |
| | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json): |
| | | data = result_json["data"] |
| | | request_id = result_json["request_id"] |
| | | ctype = data["type"] |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | | return |
| | | codes_data = data["data"] |
| | | ctype = result_json["type"] |
| | | if ctype == CLIENT_TYPE_CMD_L2: |
| | | cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) |
| | | cls.action_callback.OnSetL2Position(data) |
| | | |
| | | |
| | | if __name__ == "__main__": |