From bb2c58fb407a3783b3704b33df6a154207ae4199 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 13 三月 2024 14:39:05 +0800
Subject: [PATCH] 更改L2数据接收方式

---
 huaxin_client/l2_data_manager.py |   16 +++++++++++-----
 1 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 9a34109..ec5e72f 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -34,7 +34,7 @@
 class L2DataUploadManager:
     def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                  transaction_queue_distribute_manager: CodeQueueDistributeManager,
-                 market_data_queue: multiprocessing.Queue):
+                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
         self.order_queue_distribute_manager = order_queue_distribute_manager
         self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
         self.market_data_queue = market_data_queue
@@ -45,6 +45,7 @@
         self.upload_l2_data_task_dict = {}
         self.l2_order_codes = set()
         self.l2_transaction_codes = set()
+        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
 
     # 璁剧疆璁㈠崟杩囨护鏉′欢
     # special_price:杩囨护鐨�1鎵嬬殑浠锋牸
@@ -133,7 +134,8 @@
             self.temp_transaction_queue_dict[code] = collections.deque()
         if code not in self.temp_log_queue_dict:
             self.temp_log_queue_dict[code] = queue.Queue()
-
+        # 鍒嗛厤璁㈠崟涓婁紶鍗忚
+        self.l2_order_upload_protocol.distribute_upload_host(code)
 
         if code not in self.upload_l2_data_task_dict:
             t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -148,6 +150,7 @@
     def release_distributed_upload_queue(self, code):
         self.order_queue_distribute_manager.release_distribute_queue(code)
         self.transaction_queue_distribute_manager.release_distribute_queue(code)
+        self.l2_order_upload_protocol.release_distributed_upload_host(code)
         if code in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code].clear()
             self.temp_order_queue_dict.pop(code)
@@ -162,6 +165,9 @@
 
     def __upload_l2_data(self, code, _queue, datas):
         _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
+
+    def __upload_l2_order_data(self, code, datas):
+        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
 
     # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
     def __run_upload_order_task(self, code):
@@ -180,8 +186,8 @@
 
                 if temp_list:
                     # 涓婁紶鏁版嵁
-                    self.__upload_l2_data(code, upload_queue, temp_list)
-                    # self.__upload_l2_order_data(code, temp_list)
+                    # self.__upload_l2_data(code, upload_queue, temp_list)
+                    self.__upload_l2_order_data(code, temp_list)
                     temp_list = []
                 else:
                     if code not in self.temp_order_queue_dict:
@@ -289,7 +295,7 @@
         if code not in self.code_socket_client_dict:
             raise Exception("灏氭湭鍒嗛厤host")
         host, socket = self.code_socket_client_dict[code]
-        socket.send_json(data)
+        socket.send(marshal.dumps(data))
         socket.recv_string()
 
 

--
Gitblit v1.8.0