| | |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | from huaxin_client import socket_util |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | def init(cls, trade_action_callback: TradeActionCallback, |
| | | queue_strategy_trade_read_for_trade: multiprocessing.Queue, |
| | | queue_strategy_trade_read_for_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read |
| | |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | ###############ZEROMQ协议接收命令################# |
| | | @classmethod |
| | | def __create_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收下单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"下单通信耗时: {use_time}s") |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | @classmethod |
| | | def __create_cancel_order_command_reciever(cls, ipc_addr): |
| | | """ |
| | | 接收撤单命令 |
| | | @param ipc_addr: ipc地址 |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv_json() |
| | | try: |
| | | request_id = data.get('request_id') |
| | | use_time = time.time() - data.get('time') |
| | | data = data.get('data') |
| | | cls.action_callback.OnTrade(None, request_id, None, 2, data) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"撤单通信耗时: {use_time}s") |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): |
| | | if blocking: |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | self.run_process_command(self.queue_strategy_trade_read) |
| | | else: |
| | | # 接受命令 |
| | | t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1 = threading.Thread( |
| | | target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) |
| | | t1.start() |
| | | |
| | | |