From 25a69761838501a775896eb8a2234cb8ee0a446d Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 14 十一月 2023 17:27:13 +0800
Subject: [PATCH] bug修复

---
 huaxin_client/l2_data_manager.py |   69 ++++++++++++++++++++++------------
 1 files changed, 44 insertions(+), 25 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index de05165..313c4d5 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -37,6 +37,9 @@
         self.temp_order_queue_dict = {}
         self.temp_transaction_queue_dict = {}
         self.filter_order_condition_dict = {}
+        self.upload_l2_data_task_dict = {}
+        self.l2_order_codes = set()
+        self.l2_transaction_codes = set()
 
     # 璁剧疆璁㈠崟杩囨护鏉′欢
     def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
@@ -46,7 +49,8 @@
         if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
             self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
         else:
-            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, special_volumes_expire_time]
+            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes,
+                                                      special_volumes_expire_time]
 
     # 杩囨护璁㈠崟
     def __filter_order(self, item):
@@ -114,12 +118,34 @@
 
     # 鍒嗛厤涓婁紶闃熷垪
     def distribute_upload_queue(self, code):
-        self.order_queue_distribute_manager.distribute_queue(code)
-        self.transaction_queue_distribute_manager.distribute_queue(code)
-        self.temp_order_queue_dict[code] = collections.deque()
-        self.temp_transaction_queue_dict[code] = collections.deque()
-        threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
-        threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
+        if not self.order_queue_distribute_manager.get_distributed_queue(code):
+            self.order_queue_distribute_manager.distribute_queue(code)
+        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
+            self.transaction_queue_distribute_manager.distribute_queue(code)
+        if code not in self.temp_order_queue_dict:
+            self.temp_order_queue_dict[code] = collections.deque()
+        if code not in self.temp_transaction_queue_dict:
+            self.temp_transaction_queue_dict[code] = collections.deque()
+
+        if code not in self.upload_l2_data_task_dict:
+            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
+            t1.start()
+            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
+            t2.start()
+            self.upload_l2_data_task_dict[code] = (t1, t2)
+        # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒�
+
+    def release_distributed_upload_queue(self, code):
+        self.order_queue_distribute_manager.release_distribute_queue(code)
+        self.transaction_queue_distribute_manager.release_distribute_queue(code)
+        if code in self.temp_order_queue_dict:
+            self.temp_order_queue_dict[code].clear()
+            self.temp_order_queue_dict.pop(code)
+        if code in self.temp_transaction_queue_dict:
+            self.temp_transaction_queue_dict[code].clear()
+            self.temp_transaction_queue_dict.pop(code)
+        if code in self.upload_l2_data_task_dict:
+            self.upload_l2_data_task_dict.pop(code)
 
     def __upload_l2_data(self, code, _queue, datas):
         _queue.put_nowait((code, datas, time.time()))
@@ -132,7 +158,7 @@
         upload_queue = queue_info[1]
         while True:
             try:
-                if not q:
+                if len(q) > 0:
                     data = q.popleft()
                     # 鍓嶇疆鏁版嵁澶勭悊锛岃繃婊ゆ帀鏃犵敤鐨勬暟鎹�
                     data = self.__filter_order(data)
@@ -142,14 +168,16 @@
                     if temp_list:
                         # 涓婁紶鏁版嵁
                         self.__upload_l2_data(code, upload_queue, temp_list)
-                        temp_list.clear()
+                        temp_list = []
                     else:
                         if code not in self.temp_order_queue_dict:
+                            self.l2_order_codes.discard(code)
                             break
+                        self.l2_order_codes.add(code)
                         time.sleep(0.001)
 
-            except:
-                pass
+            except Exception as e:
+                logging.exception(e)
             finally:
                 pass
 
@@ -161,7 +189,7 @@
         temp_list = []
         while True:
             try:
-                if not q:
+                if len(q) > 0:
                     data = q.popleft()
                     data = self.__filter_transaction(data)
                     if data:
@@ -170,26 +198,17 @@
                     if temp_list:
                         # 涓婁紶鏁版嵁
                         self.__upload_l2_data(code, upload_queue, temp_list)
-                        temp_list.clear()
+                        temp_list = []
                     else:
                         if code not in self.temp_transaction_queue_dict:
+                            self.l2_transaction_codes.discard(code)
                             break
+                        self.l2_transaction_codes.add(code)
                         time.sleep(0.002)
             except:
                 pass
             finally:
                 pass
-
-    # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒�
-    def release_distributed_upload_queue(self, code):
-        self.order_queue_distribute_manager.release_distribute_queue(code)
-        self.transaction_queue_distribute_manager.release_distribute_queue(code)
-        if code in self.temp_order_queue_dict:
-            self.temp_order_queue_dict[code].clear()
-            self.temp_order_queue_dict.pop(code)
-        if code in self.temp_transaction_queue_dict:
-            self.temp_transaction_queue_dict[code].clear()
-            self.temp_transaction_queue_dict.pop(code)
 
 
 def add_target_code(code):
@@ -293,7 +312,7 @@
 
 
 def __test():
-    code = "002073"
+    # 鍒嗛厤鏁版嵁
     pass
 
 

--
Gitblit v1.8.0