From b86057f378a6567842e41f93df25f9188f0dc25c Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 13 十一月 2023 15:43:39 +0800
Subject: [PATCH] 修改l2数据上传规则(未生效)

---
 huaxin_client/l2_data_manager.py |   70 ++++++++++++++++++++++++++++++++++
 1 files changed, 69 insertions(+), 1 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 0c00579..c89727a 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -11,9 +11,10 @@
 
 # 娲诲姩鏃堕棿
 from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
-from log_module import  async_log_util
+from log_module import async_log_util
 from log_module.log import logger_local_huaxin_l2_error, logger_system
 from utils import tool
+import collections
 
 order_detail_upload_active_time_dict = {}
 transaction_upload_active_time_dict = {}
@@ -33,6 +34,8 @@
         self.order_queue_distribute_manager = order_queue_distribute_manager
         self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
         self.market_data_queue = market_data_queue
+        self.temp_order_queue_dict = {}
+        self.temp_transaction_queue_dict = {}
 
     # 娣诲姞濮旀墭璇︽儏
     def add_l2_order_detail(self, data, start_time, istransaction=False):
@@ -63,11 +66,75 @@
     def distribute_upload_queue(self, code):
         self.order_queue_distribute_manager.distribute_queue(code)
         self.transaction_queue_distribute_manager.distribute_queue(code)
+        self.temp_order_queue_dict[code] = collections.deque()
+        self.temp_transaction_queue_dict[code] = collections.deque()
+        # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
+        # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
+
+    def __upload_l2_data(self, code, _queue, datas):
+        _queue.put_nowait((code, datas, time.time()))
+
+    # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
+    def __run_upload_order_task(self, code):
+        q: collections.deque = self.temp_order_queue_dict.get(code)
+        temp_list = []
+        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
+        upload_queue = queue_info[1]
+        while True:
+            try:
+                if not q:
+                    data = q.popleft()
+                    temp_list.append(data)
+                else:
+                    if temp_list:
+                        # 涓婁紶鏁版嵁
+                        self.__upload_l2_data(code, upload_queue, temp_list)
+                        temp_list.clear()
+                    else:
+                        if code not in self.temp_order_queue_dict:
+                            break
+                        time.sleep(0.001)
+
+            except:
+                pass
+            finally:
+                pass
+
+    # 澶勭悊鎴愪氦鏁版嵁骞朵笂浼�
+    def __run_upload_transaction_task(self, code):
+        q: collections.deque = self.temp_transaction_queue_dict.get(code)
+        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
+        upload_queue = queue_info[1]
+        temp_list = []
+        while True:
+            try:
+                if not q:
+                    data = q.popleft()
+                    temp_list.append(data)
+                else:
+                    if temp_list:
+                        # 涓婁紶鏁版嵁
+                        self.__upload_l2_data(code, upload_queue, temp_list)
+                        temp_list.clear()
+                    else:
+                        if code not in self.temp_transaction_queue_dict:
+                            break
+                        time.sleep(0.002)
+            except:
+                pass
+            finally:
+                pass
 
     # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒�
     def release_distributed_upload_queue(self, code):
         self.order_queue_distribute_manager.release_distribute_queue(code)
         self.transaction_queue_distribute_manager.release_distribute_queue(code)
+        if code in self.temp_order_queue_dict:
+            self.temp_order_queue_dict[code].clear()
+            self.temp_order_queue_dict.pop(code)
+        if code in self.temp_transaction_queue_dict:
+            self.temp_transaction_queue_dict[code].clear()
+            self.temp_transaction_queue_dict.pop(code)
 
 
 def add_target_code(code):
@@ -81,6 +148,7 @@
     if code in target_codes_add_time:
         target_codes_add_time.pop(code)
 
+
 def add_subscript_codes(codes):
     print("add_subscript_codes", codes)
     # 鍔犲叆涓婁紶闃熷垪

--
Gitblit v1.8.0