From d47fbd65ab8197348ad293b7948fcdd2f8995594 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 31 八月 2023 13:16:00 +0800
Subject: [PATCH] L2数据上传任务守护线程

---
 huaxin_client/l2_data_manager.py |   65 +++++++++++++++++++++++++++-----
 1 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 13ad2a3..d6552bc 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -23,6 +23,7 @@
 tmep_order_detail_queue_dict = {}
 tmep_transaction_queue_dict = {}
 target_codes = set()
+target_codes_add_time = {}
 common_queue = queue.Queue()
 trading_canceled_queue = queue.Queue()
 log_buy_no_queue = queue.Queue()
@@ -30,6 +31,18 @@
 buy_order_nos_dict = {}
 # 鏈�杩戠殑澶у崟鎴愪氦鍗曞彿
 latest_big_order_transaction_orders_dict = {}
+
+
+def add_target_code(code):
+    target_codes.add(code)
+    # 璁板綍浠g爜鍔犲叆鏃堕棿
+    target_codes_add_time[code] = time.time()
+
+
+def del_target_code(code):
+    target_codes.discard(code)
+    if code in target_codes_add_time:
+        target_codes_add_time.pop(code)
 
 
 # 鑾峰彇鏈�杩戠殑澶у崟鎴愪氦璁㈠崟鍙�
@@ -257,19 +270,28 @@
             pass
 
 
+__upload_order_threads = {}
+__upload_trans_threads = {}
+
+
 # 杩愯涓婁紶浠诲姟
 def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
-    # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛�
-    if code not in target_codes:
-        return
-    # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛�
-    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
-        t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
-        t.start()
+    try:
+        # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛�
+        if code not in target_codes:
+            return
+        # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛�
+        if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
+            t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
+            t.start()
+            __upload_order_threads[code] = t
 
-    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
-        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
-        t.start()
+        if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
+            t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
+            t.start()
+            __upload_trans_threads[code] = t
+    finally:
+        pass
 
 
 def run_upload_common(l2_data_callback: L2DataCallBack):
@@ -290,6 +312,29 @@
     t.start()
 
 
+# 杩愯瀹堟姢绾跨▼
+def run_upload_daemon(_l2_data_callback):
+    def upload_daemon():
+        while True:
+            try:
+                for code in target_codes_add_time:
+                    # 鐩爣浠g爜鍔犲叆2s涔嬪悗鍚姩瀹堟姢
+                    if time.time() - target_codes_add_time[code] > 2:
+                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
+                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
+                            t.start()
+                            __upload_order_threads[code] = t
+                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
+                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), daemon=True)
+                            t.start()
+                            __upload_trans_threads[code] = t
+                    time.sleep(3)
+            except:
+                pass
+    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
+    t.start()
+
+
 def __test(_l2_data_callback):
     code = "002073"
     if code not in tmep_order_detail_queue_dict:

--
Gitblit v1.8.0