From c4861d2429c2bf3a3f11309ad879b549e62e722d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 26 四月 2024 15:26:52 +0800 Subject: [PATCH] 下单/撤单通信方式修改/G撤修改 --- huaxin_client/command_manager.py | 75 +++++++++++++++++++++++++++++++++++-- 1 files changed, 71 insertions(+), 4 deletions(-) diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py index caf1ff9..e5d8f20 100644 --- a/huaxin_client/command_manager.py +++ b/huaxin_client/command_manager.py @@ -7,6 +7,9 @@ import logging import multiprocessing import threading +import time + +import zmq from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager @@ -74,7 +77,9 @@ return cls._instance @classmethod - def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue): + def init(cls, trade_action_callback: TradeActionCallback, + queue_strategy_trade_read_for_trade: multiprocessing.Queue, + queue_strategy_trade_read_for_read: multiprocessing.Queue): cls.action_callback = trade_action_callback cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read @@ -155,17 +160,79 @@ except Exception as e: async_log_util.exception(logger_local_huaxin_trade_debug, e) + ###############ZEROMQ鍗忚鎺ユ敹鍛戒护################# + @classmethod + def __create_order_command_reciever(cls, ipc_addr): + """ + 鎺ユ敹涓嬪崟鍛戒护 + @param ipc_addr: ipc鍦板潃 + @return: + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(ipc_addr) + while True: + data = socket.recv_json() + try: + request_id = data.get('request_id') + use_time = time.time() - data.get('time') + data = data.get('data') + cls.action_callback.OnTrade(None, request_id, None, 2, data) + async_log_util.info(logger_local_huaxin_trade_debug, f"涓嬪崟閫氫俊鑰楁椂锛� {use_time}s") + except Exception as e: + logger_local_huaxin_trade_debug.exception(e) + finally: + socket.send_string("SUCCESS") + + @classmethod + def __create_cancel_order_command_reciever(cls, ipc_addr): + """ + 鎺ユ敹鎾ゅ崟鍛戒护 + @param ipc_addr: ipc鍦板潃 + @return: + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(ipc_addr) + while True: + data = socket.recv_json() + try: + request_id = data.get('request_id') + use_time = time.time() - data.get('time') + data = data.get('data') + cls.action_callback.OnTrade(None, request_id, None, 2, data) + async_log_util.info(logger_local_huaxin_trade_debug, f"鎾ゅ崟閫氫俊鑰楁椂锛� {use_time}s") + + except Exception as e: + logger_local_huaxin_trade_debug.exception(e) + finally: + socket.send_string("SUCCESS") + # 缁存姢杩炴帴鏁扮殑绋冲畾 - def run(self, blocking=True): + def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): if blocking: - t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1 = threading.Thread( + target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) t1.start() self.run_process_command(self.queue_strategy_trade_read) else: # 鎺ュ彈鍛戒护 t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) t1.start() - t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1 = threading.Thread( + target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) t1.start() -- Gitblit v1.8.0