From c4e5d846ac630133fbabb96b4d36a5b16f51a962 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 03 九月 2025 09:50:46 +0800
Subject: [PATCH] bug修复

---
 huaxin_client/l2_client.py |  249 ++++++++++++++++++++++++++++++++-----------------
 1 files changed, 160 insertions(+), 89 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index c9d599f..5876188 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -17,7 +17,7 @@
 from huaxin_client.l2_data_manager import L2DataUploadManager
 from log_module import log, async_log_util
 from log_module.async_log_util import huaxin_l2_log
-from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript
+from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
 from utils import tool
 
 ###B绫�###
@@ -50,8 +50,9 @@
 SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                  b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
 SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
-set_codes_data_queue = queue.Queue()
-market_code_dict = {}
+set_codes_data_queue = queue.Queue(maxsize=1000)
+
+ENABLE_NGST = True
 
 
 class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -76,9 +77,10 @@
         szse_codes = []
         sse_codes = []
         for code in codes:
-            if code.find("00") == 0:
+            market_type = tool.get_market_type(code)
+            if market_type == tool.MARKET_TYPE_SZSE:
                 szse_codes.append(code.encode())
-            elif code.find("60") == 0:
+            elif market_type == tool.MARKET_TYPE_SSE:
                 sse_codes.append(code.encode())
         return sse_codes, szse_codes
 
@@ -90,27 +92,38 @@
         logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}")
         logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}")
         if sh:
-            # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
-            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
-            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            if ENABLE_NGST:
+                result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}")
+            else:
+                # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
+                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
+                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
             self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
         if sz:
             self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
 
+    def subscribe_codes(self, _codes):
+        self.__subscribe(_codes)
+
     def __subscribe(self, _codes):
         sh, sz = self.__split_codes(_codes)
         logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}")
         logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}")
         if sh:
-            # 璁㈤槄閫愮瑪濮旀墭
-            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
-            # 璁㈤槄閫愮瑪鎴愪氦
-            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
+            if ENABLE_NGST:
+                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}")
+            else:
+                # 璁㈤槄閫愮瑪濮旀墭
+                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
+                # 璁㈤槄閫愮瑪鎴愪氦
+                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
 
             result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
             logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}")
@@ -135,8 +148,9 @@
         for d in codes_data:
             code = d[0]
             codes.add(code)
-            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4])
-            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4])
+            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
+            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
+        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes))
         add_codes = codes - self.subscripted_codes
         del_codes = self.subscripted_codes - codes
         print("add del codes", add_codes, del_codes)
@@ -145,9 +159,9 @@
                 self.l2_data_upload_manager.release_distributed_upload_queue(c)
                 l2_data_manager.del_target_code(c)
             for c in codes:
-                self.l2_data_upload_manager.distribute_upload_queue(c)
+                self.l2_data_upload_manager.distribute_upload_queue(c, codes)
                 l2_data_manager.add_target_code(c)
-        except Exception as e:
+        except Exception as e:  # TODO 娓呴櫎鍘熸潵杩樻病閲婃斁鎺夌殑鏁版嵁
             logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}")
             logger_system.exception(e)
         self.__subscribe(add_codes)
@@ -155,6 +169,8 @@
 
         if add_codes:
             logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}")
+            for c in add_codes:
+                logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}")
 
         logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes))
 
@@ -229,6 +245,8 @@
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
         if bIsLast == 1:
             print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
             l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -237,6 +255,29 @@
         print("OnRspUnSubOrderDetail", bIsLast)
         try:
             code = pSpecificSecurity['SecurityID']
+            self.subscripted_codes.discard(code)
+            if bIsLast == 1:
+                print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
+                l2_data_manager.add_subscript_codes(self.subscripted_codes)
+        except Exception as e:
+            logging.exception(e)
+
+    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
+        async_log_util.info(logger_local_huaxin_l2_subscript,
+                            f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
+        if pRspInfo["ErrorID"] == 0:
+            print("璁㈤槄鎴愬姛")
+            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
+        if bIsLast == 1:
+            print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
+            l2_data_manager.add_subscript_codes(self.subscripted_codes)
+
+    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
+        try:
+            code = pSpecificSecurity['SecurityID']
+            logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}")
             self.subscripted_codes.discard(code)
             if bIsLast == 1:
                 print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
@@ -259,34 +300,42 @@
     def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubXTSTick")
 
-    # 4.0.5鐗堟湰鎺ュ彛
-    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
-        print("OnRspSubNGTSTick")
-
     def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                         FirstLevelSellOrderVolumes):
         # 浼犲叆锛氭椂闂达紝鐜颁环,鎴愪氦鎬婚噺,涔�1锛屼拱2锛屼拱3,涔�4,涔�5,鍗�1,鍗�2,鍗�3,鍗�4,鍗�5
         try:
+            buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
+                    (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
+                    (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
+                    (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
+                    (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])]
+            for i in range(6, 11):
+                if not pDepthMarketData[f"BidVolume{i}"]:
+                    break
+                buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}']))
+
+            sells = [
+                (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
+                (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
+                (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
+                (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
+                (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
+            ]
+            for i in range(6, 11):
+                if not pDepthMarketData[f"AskVolume{i}"]:
+                    break
+                sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}']))
+
             d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                  "lastPrice": pDepthMarketData['LastPrice'],
                  "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
                  "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                  "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                  "avgAskPrice": pDepthMarketData["AvgAskPrice"],
-                 "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
-                         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
-                         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
-                         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
-                         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
-                 "sell": [
-                     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
-                     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
-                     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
-                     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
-                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
-                 ]}
-            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
+                 "buy": buys,
+                 "sell": sells}
             self.l2_data_upload_manager.add_market_data(d)
+            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
         except:
             pass
 
@@ -343,28 +392,6 @@
             self.l2_data_upload_manager.add_transaction_detail(item)
 
     def OnRtnOrderDetail(self, pOrderDetail):
-        # can_listen = False
-        # code = str(pOrderDetail['SecurityID'])
-        # start_time = 0
-        # if code in self.special_code_volume_for_order_dict:
-        #     start_time = time.time()
-        #     if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
-        #         'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
-        #         # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗�
-        #         if self.special_code_volume_for_order_dict[code][1] > time.time():
-        #             # 鐗规畩閲忕洃鍚�
-        #             can_listen = True
-        #         else:
-        #             self.special_code_volume_for_order_dict.pop(code)
-        # if not can_listen:
-        #     min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
-        #     if min_volume is None:
-        #         # 榛樿绛涢��50w
-        #         if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
-        #             return
-        #     elif pOrderDetail['Volume'] < min_volume:
-        #         return
-        # 杈撳嚭閫愮瑪濮旀墭鏁版嵁
         # 涓婅瘉OrderStatus=b"D"琛ㄧず鎾ゅ崟
         item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
                 "Volume": pOrderDetail['Volume'],
@@ -373,6 +400,38 @@
                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
         self.l2_data_upload_manager.add_l2_order_detail(item, 0)
+
+    def OnRtnNGTSTick(self, pTick):
+        """
+        涓婅瘉鑲$エ鐨勯�愮瑪濮旀墭涓庢垚浜�
+        @param pTick:
+        @return:
+        """
+        try:
+            if pTick['TickType'] == b'T':
+                # 鎴愪氦
+                item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
+                        "TradeVolume": pTick['Volume'],
+                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
+                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
+                        "SellNo": pTick['SellNo'],
+                        "ExecType": '1'}
+                self.l2_data_upload_manager.add_transaction_detail(item)
+            elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D':
+                # 鎾ゅ崟
+                item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'],
+                        "Volume": pTick['Volume'],
+                        "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(),
+                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
+                        "SubSeq": pTick['SubSeq'], "OrderNO": '',
+                        "OrderStatus": pTick['TickType'].decode()}
+                if pTick['Side'] == b'1':
+                    item['OrderNO'] = pTick['BuyNo']
+                elif pTick['Side'] == b'2':
+                    item['OrderNO'] = pTick['SellNo']
+                self.l2_data_upload_manager.add_l2_order_detail(item, 0)
+        except Exception as e:
+            logger_local_huaxin_l2_subscript.exception(e)
 
     def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                             FirstLevelSellOrderVolumes):
@@ -446,33 +505,44 @@
         for sell_index in range(0, FirstLevelSellNum):
             print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
 
-    def OnRtnXTSTick(self, pTick):
-        # 杈撳嚭涓婃捣鍊哄埜閫愮瑪鏁版嵁鈥�
-        print(
-            "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
-                pTick['TickType'],
-                pTick['SecurityID'],
-                pTick['Price'],
-                pTick['Volume'],
-                pTick['TickTime'],
-                pTick['MainSeq'],
-                pTick['SubSeq'],
-                pTick['BuyNo'],
-                pTick['SellNo']))
 
-    def OnRtnNGTSTick(self, pTick):
-        # 杈撳嚭涓婃捣鑲″熀閫愮瑪鏁版嵁鈥�
-        print(
-            "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
-                pTick['TickType'],
-                pTick['SecurityID'],
-                pTick['Price'],
-                pTick['Volume'],
-                pTick['TickTime'],
-                pTick['MainSeq'],
-                pTick['SubSeq'],
-                pTick['BuyNo'],
-                pTick['SellNo']))
+class SubscriptDefend:
+    """
+    璁㈤槄瀹堟姢
+    瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄
+    """
+    __l2_market_update_time = {}
+
+    @classmethod
+    def set_l2_market_update(cls, code):
+        cls.__l2_market_update_time[code] = time.time()
+
+    @classmethod
+    def run(cls):
+        while True:
+            try:
+                now_time = tool.get_now_time_as_int()
+                if now_time < int("093015"):
+                    continue
+                if int("112945") < now_time < int("130015"):
+                    continue
+                if int("145645") < now_time:
+                    continue
+                if spi.subscripted_codes:
+                    codes = []
+                    for code in spi.subscripted_codes:
+                        # 鑾峰彇涓婃鏇存柊鏃堕棿
+                        update_time = cls.__l2_market_update_time.get(code)
+                        if update_time and time.time() - update_time > 15:
+                            # 闇�瑕侀噸鏂拌闃�
+                            codes.append(code)
+                    if codes:
+                        logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}")
+                        spi.subscribe_codes(codes)
+            except:
+                pass
+            finally:
+                time.sleep(15)
 
 
 class MyL2ActionCallback(L2ActionCallback):
@@ -598,7 +668,8 @@
         if queue_r is not None:
             t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
             t1.start()
-
+        # 璁㈤槄瀹堟姢
+        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
         # 鍒濆鍖�
         data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
         l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
@@ -653,13 +724,13 @@
              'OrderTime': '13000015',
              'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
 
-    queue_r = multiprocessing.Queue()
+    queue_r = multiprocessing.Queue(maxsize=1024)
     order_queues = []
     transaction_queues = []
-    market_queue = multiprocessing.Queue()
+    market_queue = multiprocessing.Queue(maxsize=1024)
     for i in range(20):
-        order_queues.append(multiprocessing.Queue())
-        transaction_queues.append(multiprocessing.Queue())
+        order_queues.append(multiprocessing.Queue(maxsize=1024))
+        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
     threading.Thread(target=test_add_codes).start()
 
     run(queue_r, order_queues, transaction_queues, market_queue)

--
Gitblit v1.8.0