From 6567e8cd1fb11ea10912bb3ac5bf2965c74c0e4b Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 06 一月 2025 18:09:47 +0800
Subject: [PATCH] 统计大卖单

---
 huaxin_client/l2_client.py |   54 ++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index f88ef1d..aab2143 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -17,7 +17,7 @@
 from huaxin_client.l2_data_manager import L2DataUploadManager
 from log_module import log, async_log_util
 from log_module.async_log_util import huaxin_l2_log
-from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript
+from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
 from utils import tool
 
 ###B绫�###
@@ -51,7 +51,6 @@
                  b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
 SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
 set_codes_data_queue = queue.Queue(maxsize=1000)
-market_code_dict = {}
 
 ENABLE_NGST = True
 
@@ -106,6 +105,9 @@
             self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+
+    def subscribe_codes(self, _codes):
+        self.__subscribe(_codes)
 
     def __subscribe(self, _codes):
         sh, sz = self.__split_codes(_codes)
@@ -241,6 +243,8 @@
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
         if bIsLast == 1:
             print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
             l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -262,6 +266,8 @@
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
         if bIsLast == 1:
             print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
             l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -326,8 +332,8 @@
                  "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                  "buy": buys,
                  "sell": sells}
-            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
             self.l2_data_upload_manager.add_market_data(d)
+            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
         except:
             pass
 
@@ -498,6 +504,45 @@
             print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
 
 
+class SubscriptDefend:
+    """
+    璁㈤槄瀹堟姢
+    瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄
+    """
+    __l2_market_update_time = {}
+
+    @classmethod
+    def set_l2_market_update(cls, code):
+        cls.__l2_market_update_time[code] = time.time()
+
+    @classmethod
+    def run(cls):
+        while True:
+            try:
+                now_time = tool.get_now_time_as_int()
+                if now_time < int("093015"):
+                    continue
+                if int("112945") < now_time < int("130015"):
+                    continue
+                if int("145645") < now_time:
+                    continue
+                if spi.subscripted_codes:
+                    codes = []
+                    for code in spi.subscripted_codes:
+                        # 鑾峰彇涓婃鏇存柊鏃堕棿
+                        update_time = cls.__l2_market_update_time.get(code)
+                        if update_time and time.time() - update_time > 15:
+                            # 闇�瑕侀噸鏂拌闃�
+                            codes.append(code)
+                    if codes:
+                        logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}")
+                        spi.subscribe_codes(codes)
+            except:
+                pass
+            finally:
+                time.sleep(15)
+
+
 class MyL2ActionCallback(L2ActionCallback):
 
     def OnSetL2Position(self, codes_data):
@@ -621,7 +666,8 @@
         if queue_r is not None:
             t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
             t1.start()
-
+        # 璁㈤槄瀹堟姢
+        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
         # 鍒濆鍖�
         data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
         l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)

--
Gitblit v1.8.0