From c4861d2429c2bf3a3f11309ad879b549e62e722d Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 26 四月 2024 15:26:52 +0800
Subject: [PATCH] 下单/撤单通信方式修改/G撤修改

---
 huaxin_client/command_manager.py |   75 +++++++++++++++++++++++++++++++++++--
 1 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py
index caf1ff9..e5d8f20 100644
--- a/huaxin_client/command_manager.py
+++ b/huaxin_client/command_manager.py
@@ -7,6 +7,9 @@
 import logging
 import multiprocessing
 import threading
+import time
+
+import zmq
 
 from huaxin_client import socket_util
 from huaxin_client.client_network import SendResponseSkManager
@@ -74,7 +77,9 @@
         return cls._instance
 
     @classmethod
-    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
+    def init(cls, trade_action_callback: TradeActionCallback,
+             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
+             queue_strategy_trade_read_for_read: multiprocessing.Queue):
         cls.action_callback = trade_action_callback
         cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
         cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
@@ -155,17 +160,79 @@
         except Exception as e:
             async_log_util.exception(logger_local_huaxin_trade_debug, e)
 
+    ###############ZEROMQ鍗忚鎺ユ敹鍛戒护#################
+    @classmethod
+    def __create_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹涓嬪崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv_json()
+            try:
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 2, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"涓嬪崟閫氫俊鑰楁椂锛� {use_time}s")
+            except Exception as e:
+                logger_local_huaxin_trade_debug.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
+
+    @classmethod
+    def __create_cancel_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹鎾ゅ崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv_json()
+            try:
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 2, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"鎾ゅ崟閫氫俊鑰楁椂锛� {use_time}s")
+
+            except Exception as e:
+                logger_local_huaxin_trade_debug.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
+
     # 缁存姢杩炴帴鏁扮殑绋冲畾
-    def run(self, blocking=True):
+    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
         if blocking:
-            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
             t1.start()
             self.run_process_command(self.queue_strategy_trade_read)
         else:
             # 鎺ュ彈鍛戒护
             t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
             t1.start()
-            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
             t1.start()
 
 

--
Gitblit v1.8.0