From 122def357140a4a504710a57fe2bc1a8020aa7b1 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 27 二月 2024 17:57:39 +0800
Subject: [PATCH] 新版L2数据传输协议测试

---
 huaxin_client/l2_data_manager.py |   41 ++++++++++++++++++++++++-----------------
 1 files changed, 24 insertions(+), 17 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 0dd5bf7..ccddadf 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -30,9 +30,10 @@
 
 # L2涓婁紶鏁版嵁绠$悊鍣�
 class L2DataUploadManager:
+    # order_ipc_hosts:杩滅▼host
     def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                  transaction_queue_distribute_manager: CodeQueueDistributeManager,
-                 market_data_queue: multiprocessing.Queue):
+                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
         self.order_queue_distribute_manager = order_queue_distribute_manager
         self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
         self.market_data_queue = market_data_queue
@@ -42,6 +43,8 @@
         self.upload_l2_data_task_dict = {}
         self.l2_order_codes = set()
         self.l2_transaction_codes = set()
+        # 璁㈠崟
+        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
 
     # 璁剧疆璁㈠崟杩囨护鏉′欢
     # special_price:杩囨护鐨�1鎵嬬殑浠锋牸
@@ -141,6 +144,8 @@
             self.temp_order_queue_dict[code] = collections.deque()
         if code not in self.temp_transaction_queue_dict:
             self.temp_transaction_queue_dict[code] = collections.deque()
+        # 鍒嗛厤璁㈠崟涓婁紶鍗忚
+        self.l2_order_upload_protocol.distribute_upload_host(code)
 
         if code not in self.upload_l2_data_task_dict:
             t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -161,9 +166,13 @@
             self.temp_transaction_queue_dict.pop(code)
         if code in self.upload_l2_data_task_dict:
             self.upload_l2_data_task_dict.pop(code)
+        self.l2_order_upload_protocol.release_distributed_upload_host(code)
 
     def __upload_l2_data(self, code, _queue, datas):
         _queue.put_nowait((code, datas, time.time()))
+
+    def __upload_l2_order_data(self, code, datas):
+        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
 
     # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
     def __run_upload_order_task(self, code):
@@ -182,7 +191,8 @@
 
                 if temp_list:
                     # 涓婁紶鏁版嵁
-                    self.__upload_l2_data(code, upload_queue, temp_list)
+                    # self.__upload_l2_data(code, upload_queue, temp_list)
+                    self.__upload_l2_order_data(code, temp_list)
                     temp_list = []
                 else:
                     if code not in self.temp_order_queue_dict:
@@ -225,7 +235,6 @@
                 pass
 
 
-# L2涓婁紶鏁版嵁鍗忚绠$悊鍣�
 class L2DataUploadProtocolManager:
 
     # ipchosts IPC鍗忚
@@ -235,7 +244,7 @@
         self.socket_client_dict = {}
         # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)}
         self.code_socket_client_dict = {}
-        self.lock = threading.Lock()
+        self.rlock = threading.RLock()
         context = zmq.Context()
         for host in self.ipchosts:
             socket = context.socket(zmq.REQ)
@@ -244,34 +253,31 @@
 
     # 鑾峰彇
     def __get_available_ipchost(self):
-        try:
-            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
-                raise Exception("鏃犲彲鐢╤ost")
-            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
-            for host in self.socket_client_dict:
-                if host not in used_hosts:
-                    return host, self.socket_client_dict[host]
+        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
             raise Exception("鏃犲彲鐢╤ost")
-        finally:
-            self.lock.release()
+        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
+        for host in self.socket_client_dict:
+            if host not in used_hosts:
+                return host, self.socket_client_dict[host]
+        raise Exception("鏃犲彲鐢╤ost")
 
     # 鍒嗛厤HOST
     def distribute_upload_host(self, code):
-        self.lock.acquire()
+        self.rlock.acquire()
         try:
             host_info = self.__get_available_ipchost()
             if host_info:
                 self.code_socket_client_dict[code] = host_info
         finally:
-            self.lock.release()
+            self.rlock.release()
 
     def release_distributed_upload_host(self, code):
-        self.lock.acquire()
+        self.rlock.acquire()
         try:
             if code in self.code_socket_client_dict:
                 self.code_socket_client_dict.pop(code)
         finally:
-            self.lock.release()
+            self.rlock.release()
 
     def upload_data_as_json(self, code, data):
         if code not in self.code_socket_client_dict:
@@ -386,6 +392,7 @@
     pass
 
 
+
 def run_test():
     t = threading.Thread(target=lambda: __test(), daemon=True)
     t.start()

--
Gitblit v1.8.0