From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 18 六月 2025 18:41:30 +0800
Subject: [PATCH] 异常保护

---
 huaxin_client/l2_client.py |  384 +++++++++++++++++++++++++++++++-----------------------
 1 files changed, 221 insertions(+), 163 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index 4672fcd..5876188 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -7,26 +7,24 @@
 import threading
 import time
 import concurrent.futures
-from typing import List
 
-from huaxin_client import command_manager, l2_data_transform_protocol
+from huaxin_client import command_manager
 from huaxin_client import constant
 from huaxin_client import l2_data_manager
 import lev2mdapi
-from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
+from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
 from huaxin_client.command_manager import L2ActionCallback
 from huaxin_client.l2_data_manager import L2DataUploadManager
 from log_module import log, async_log_util
 from log_module.async_log_util import huaxin_l2_log
-from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
-    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
+from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
 from utils import tool
 
 ###B绫�###
 Front_Address = "tcp://10.0.1.101:6900"
 Multicast_Address = "udp://224.224.2.19:7889"
 Multicast_Address2 = "udp://224.224.224.234:7890"
-Local_Interface_Address = "192.168.84.75"
+Local_Interface_Address = constant.LOCAL_IP
 
 ###A绫�###
 if constant.IS_A:
@@ -52,8 +50,9 @@
 SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                  b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
 SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
-set_codes_data_queue = queue.Queue()
-market_code_dict = {}
+set_codes_data_queue = queue.Queue(maxsize=1000)
+
+ENABLE_NGST = True
 
 
 class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -78,9 +77,10 @@
         szse_codes = []
         sse_codes = []
         for code in codes:
-            if code.find("00") == 0:
+            market_type = tool.get_market_type(code)
+            if market_type == tool.MARKET_TYPE_SZSE:
                 szse_codes.append(code.encode())
-            elif code.find("60") == 0:
+            elif market_type == tool.MARKET_TYPE_SSE:
                 sse_codes.append(code.encode())
         return sse_codes, szse_codes
 
@@ -92,27 +92,38 @@
         logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}")
         logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}")
         if sh:
-            # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
-            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
-            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            if ENABLE_NGST:
+                result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}")
+            else:
+                # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
+                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
+                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
             self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
         if sz:
             self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
             self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
 
+    def subscribe_codes(self, _codes):
+        self.__subscribe(_codes)
+
     def __subscribe(self, _codes):
         sh, sz = self.__split_codes(_codes)
         logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}")
         logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}")
         if sh:
-            # 璁㈤槄閫愮瑪濮旀墭
-            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
-            # 璁㈤槄閫愮瑪鎴愪氦
-            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
+            if ENABLE_NGST:
+                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}")
+            else:
+                # 璁㈤槄閫愮瑪濮旀墭
+                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
+                # 璁㈤槄閫愮瑪鎴愪氦
+                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
 
             result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
             logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}")
@@ -126,6 +137,9 @@
 
     def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
 
+        if from_cache and self.codes_volume_and_price_dict:
+            return
+
         if not self.is_login and not constant.TEST:
             raise Exception("L2灏氭湭鐧诲綍")
         if delay > 0:
@@ -134,8 +148,9 @@
         for d in codes_data:
             code = d[0]
             codes.add(code)
-            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3])
-            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2]), d[3])
+            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
+            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
+        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes))
         add_codes = codes - self.subscripted_codes
         del_codes = self.subscripted_codes - codes
         print("add del codes", add_codes, del_codes)
@@ -144,9 +159,9 @@
                 self.l2_data_upload_manager.release_distributed_upload_queue(c)
                 l2_data_manager.del_target_code(c)
             for c in codes:
-                self.l2_data_upload_manager.distribute_upload_queue(c)
+                self.l2_data_upload_manager.distribute_upload_queue(c, codes)
                 l2_data_manager.add_target_code(c)
-        except Exception as e:
+        except Exception as e:  # TODO 娓呴櫎鍘熸潵杩樻病閲婃斁鎺夌殑鏁版嵁
             logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}")
             logger_system.exception(e)
         self.__subscribe(add_codes)
@@ -154,6 +169,8 @@
 
         if add_codes:
             logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}")
+            for c in add_codes:
+                logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}")
 
         logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes))
 
@@ -186,17 +203,6 @@
                     return data_json[1]
         return []
 
-    def set_code_special_watch_volume(self, code, volume):
-        # 鏈夋晥鏈熶负3s
-        # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3)
-        d = self.codes_volume_and_price_dict.get(code)
-        if d:
-            min_volume, limit_up_price, special_price = d[0], d[1], d[2]
-            self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price,special_price,
-                                                                    {volume, constant.SHADOW_ORDER_VOLUME},
-                                                                    time.time() + 3)
-            huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"璁剧疆涓嬪崟閲忕洃鍚細{code}-{volume}")
-
     def OnFrontConnected(self):
         print("OnFrontConnected")
         logger_system.info(f"l2_client OnFrontConnected 绾跨▼ID:{tool.get_thread_id()}")
@@ -213,10 +219,12 @@
         if pRspInfo['ErrorID'] == 0:
             print("----L2琛屾儏鐧诲綍鎴愬姛----")
             self.is_login = True
+            logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛")
             # 鍒濆璁剧疆鍊�
-            threading.Thread(
-                target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0),
-                daemon=True).start()
+            if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
+                threading.Thread(
+                    target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60),
+                    daemon=True).start()
 
     def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubMarketData")
@@ -237,6 +245,8 @@
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
         if bIsLast == 1:
             print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
             l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -245,6 +255,29 @@
         print("OnRspUnSubOrderDetail", bIsLast)
         try:
             code = pSpecificSecurity['SecurityID']
+            self.subscripted_codes.discard(code)
+            if bIsLast == 1:
+                print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
+                l2_data_manager.add_subscript_codes(self.subscripted_codes)
+        except Exception as e:
+            logging.exception(e)
+
+    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
+        async_log_util.info(logger_local_huaxin_l2_subscript,
+                            f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
+        if pRspInfo["ErrorID"] == 0:
+            print("璁㈤槄鎴愬姛")
+            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
+            # 鍒濆鍖�
+            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
+        if bIsLast == 1:
+            print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
+            l2_data_manager.add_subscript_codes(self.subscripted_codes)
+
+    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
+        try:
+            code = pSpecificSecurity['SecurityID']
+            logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}")
             self.subscripted_codes.discard(code)
             if bIsLast == 1:
                 print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
@@ -267,34 +300,42 @@
     def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubXTSTick")
 
-    # 4.0.5鐗堟湰鎺ュ彛
-    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
-        print("OnRspSubNGTSTick")
-
     def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                         FirstLevelSellOrderVolumes):
         # 浼犲叆锛氭椂闂达紝鐜颁环,鎴愪氦鎬婚噺,涔�1锛屼拱2锛屼拱3,涔�4,涔�5,鍗�1,鍗�2,鍗�3,鍗�4,鍗�5
         try:
+            buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
+                    (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
+                    (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
+                    (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
+                    (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])]
+            for i in range(6, 11):
+                if not pDepthMarketData[f"BidVolume{i}"]:
+                    break
+                buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}']))
+
+            sells = [
+                (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
+                (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
+                (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
+                (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
+                (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
+            ]
+            for i in range(6, 11):
+                if not pDepthMarketData[f"AskVolume{i}"]:
+                    break
+                sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}']))
+
             d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'],
                  "lastPrice": pDepthMarketData['LastPrice'],
                  "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'],
                  "totalValueTrade": pDepthMarketData['TotalValueTrade'],
                  "totalAskVolume": pDepthMarketData['TotalAskVolume'],
                  "avgAskPrice": pDepthMarketData["AvgAskPrice"],
-                 "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']),
-                         (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']),
-                         (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']),
-                         (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']),
-                         (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])],
-                 "sell": [
-                     (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']),
-                     (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']),
-                     (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']),
-                     (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
-                     (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
-                 ]}
-            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
+                 "buy": buys,
+                 "sell": sells}
             self.l2_data_upload_manager.add_market_data(d)
+            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
         except:
             pass
 
@@ -314,12 +355,6 @@
         # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
         # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁
         if pTransaction['ExecType'] == b"2":
-            # if min_volume is None:
-            #     # 榛樿绛涢��50w
-            #     if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
-            #         return
-            # elif pTransaction['TradeVolume'] < min_volume:
-            #     return
             # 鎾ゅ崟
             item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                     "Volume": pTransaction['TradeVolume'],
@@ -357,28 +392,6 @@
             self.l2_data_upload_manager.add_transaction_detail(item)
 
     def OnRtnOrderDetail(self, pOrderDetail):
-        # can_listen = False
-        # code = str(pOrderDetail['SecurityID'])
-        # start_time = 0
-        # if code in self.special_code_volume_for_order_dict:
-        #     start_time = time.time()
-        #     if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
-        #         'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
-        #         # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗�
-        #         if self.special_code_volume_for_order_dict[code][1] > time.time():
-        #             # 鐗规畩閲忕洃鍚�
-        #             can_listen = True
-        #         else:
-        #             self.special_code_volume_for_order_dict.pop(code)
-        # if not can_listen:
-        #     min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
-        #     if min_volume is None:
-        #         # 榛樿绛涢��50w
-        #         if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000:
-        #             return
-        #     elif pOrderDetail['Volume'] < min_volume:
-        #         return
-        # 杈撳嚭閫愮瑪濮旀墭鏁版嵁
         # 涓婅瘉OrderStatus=b"D"琛ㄧず鎾ゅ崟
         item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
                 "Volume": pOrderDetail['Volume'],
@@ -387,6 +400,38 @@
                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
         self.l2_data_upload_manager.add_l2_order_detail(item, 0)
+
+    def OnRtnNGTSTick(self, pTick):
+        """
+        涓婅瘉鑲$エ鐨勯�愮瑪濮旀墭涓庢垚浜�
+        @param pTick:
+        @return:
+        """
+        try:
+            if pTick['TickType'] == b'T':
+                # 鎴愪氦
+                item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'],
+                        "TradeVolume": pTick['Volume'],
+                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
+                        "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
+                        "SellNo": pTick['SellNo'],
+                        "ExecType": '1'}
+                self.l2_data_upload_manager.add_transaction_detail(item)
+            elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D':
+                # 鎾ゅ崟
+                item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'],
+                        "Volume": pTick['Volume'],
+                        "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(),
+                        "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'],
+                        "SubSeq": pTick['SubSeq'], "OrderNO": '',
+                        "OrderStatus": pTick['TickType'].decode()}
+                if pTick['Side'] == b'1':
+                    item['OrderNO'] = pTick['BuyNo']
+                elif pTick['Side'] == b'2':
+                    item['OrderNO'] = pTick['SellNo']
+                self.l2_data_upload_manager.add_l2_order_detail(item, 0)
+        except Exception as e:
+            logger_local_huaxin_l2_subscript.exception(e)
 
     def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                             FirstLevelSellOrderVolumes):
@@ -460,33 +505,44 @@
         for sell_index in range(0, FirstLevelSellNum):
             print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
 
-    def OnRtnXTSTick(self, pTick):
-        # 杈撳嚭涓婃捣鍊哄埜閫愮瑪鏁版嵁鈥�
-        print(
-            "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
-                pTick['TickType'],
-                pTick['SecurityID'],
-                pTick['Price'],
-                pTick['Volume'],
-                pTick['TickTime'],
-                pTick['MainSeq'],
-                pTick['SubSeq'],
-                pTick['BuyNo'],
-                pTick['SellNo']))
 
-    def OnRtnNGTSTick(self, pTick):
-        # 杈撳嚭涓婃捣鑲″熀閫愮瑪鏁版嵁鈥�
-        print(
-            "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % (
-                pTick['TickType'],
-                pTick['SecurityID'],
-                pTick['Price'],
-                pTick['Volume'],
-                pTick['TickTime'],
-                pTick['MainSeq'],
-                pTick['SubSeq'],
-                pTick['BuyNo'],
-                pTick['SellNo']))
+class SubscriptDefend:
+    """
+    璁㈤槄瀹堟姢
+    瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄
+    """
+    __l2_market_update_time = {}
+
+    @classmethod
+    def set_l2_market_update(cls, code):
+        cls.__l2_market_update_time[code] = time.time()
+
+    @classmethod
+    def run(cls):
+        while True:
+            try:
+                now_time = tool.get_now_time_as_int()
+                if now_time < int("093015"):
+                    continue
+                if int("112945") < now_time < int("130015"):
+                    continue
+                if int("145645") < now_time:
+                    continue
+                if spi.subscripted_codes:
+                    codes = []
+                    for code in spi.subscripted_codes:
+                        # 鑾峰彇涓婃鏇存柊鏃堕棿
+                        update_time = cls.__l2_market_update_time.get(code)
+                        if update_time and time.time() - update_time > 15:
+                            # 闇�瑕侀噸鏂拌闃�
+                            codes.append(code)
+                    if codes:
+                        logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}")
+                        spi.subscribe_codes(codes)
+            except:
+                pass
+            finally:
+                time.sleep(15)
 
 
 class MyL2ActionCallback(L2ActionCallback):
@@ -507,9 +563,9 @@
     g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST
 
     # case 1缂撳瓨妯″紡
-    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
+    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True)
     # case 2闈炵紦瀛樻ā寮�
-    api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
+    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
     global spi
     spi = Lev2MdSpi(api, l2_data_upload_manager)
     api.RegisterSpi(spi)
@@ -545,11 +601,7 @@
                     value = value.decode("utf-8")
                 data = json.loads(value)
                 _type = data["type"]
-                if _type == "listen_volume":
-                    volume = data["data"]["volume"]
-                    code = data["data"]["code"]
-                    spi.set_code_special_watch_volume(code, volume)
-                elif _type == "l2_cmd":
+                if _type == "l2_cmd":
                     __start_time = time.time()
                     # 绾跨▼姹�
                     __l2_cmd_thread_pool.submit(
@@ -565,43 +617,50 @@
 pipe_strategy = None
 
 
-def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
-        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
-    # def test_add_codes():
-    #     time.sleep(5)
-    #     # if value:
-    #     #     if type(value) == bytes:
-    #     #         value = value.decode("utf-8")
-    #     #     data = json.loads(value)
-    #     #     _type = data["type"]
-    #     #     if _type == "listen_volume":
-    #     #         volume = data["data"]["volume"]
-    #     #         code = data["data"]["code"]
-    #     #         spi.set_code_special_watch_volume(code, volume)
-    #     #     elif _type == "l2_cmd":
-    #     #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
-    #
-    #     demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59),
-    #                   ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)]
-    #
-    #     queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
-    #     time.sleep(1)
-    #
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
-    #     queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
-    #     time.sleep(0.1)
-    #     spi.l2_data_upload_manager.add_l2_order_detail(
-    #         {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
-    #          'OrderTime': '13000015',
-    #          'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+def test_add_codes(queue_r):
+    time.sleep(10)
+    # if value:
+    #     if type(value) == bytes:
+    #         value = value.decode("utf-8")
+    #     data = json.loads(value)
+    #     _type = data["type"]
+    #     if _type == "listen_volume":
+    #         volume = data["data"]["volume"]
+    #         code = data["data"]["code"]
+    #         spi.set_code_special_watch_volume(code, volume)
+    #     elif _type == "l2_cmd":
+    #         l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
 
+    demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200),
+                  ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200),
+                  ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200),
+                  ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)]
+
+    queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]}))
+    time.sleep(10)
+    while True:
+        try:
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0)
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+            # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}}))
+            time.sleep(0.1)
+            spi.l2_data_upload_manager.add_l2_order_detail(
+                {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0',
+                 'OrderTime': '13000015',
+                 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
+        except Exception as e:
+            logging.exception(e)
+        finally:
+            time.sleep(10)
+
+
+def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
     logger_system.info("L2杩涚▼ID锛歿}", os.getpid())
     logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}")
     try:
@@ -609,17 +668,16 @@
         if queue_r is not None:
             t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
             t1.start()
-
+        # 璁㈤槄瀹堟姢
+        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
         # 鍒濆鍖�
-        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
-        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
-        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
-                                                     transaction_queue_distribute_manager, market_queue)
+        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
+        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
         __init_l2(l2_data_upload_manager)
         l2_data_manager.run_upload_common()
         l2_data_manager.run_log()
-        # 娴嬭瘯
-        # threading.Thread(target=lambda: test_add_codes(),daemon=True).start()
+        # TODO 娴嬭瘯
+        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
         global l2CommandManager
         l2CommandManager = command_manager.L2CommandManager()
         l2CommandManager.init(MyL2ActionCallback())
@@ -666,13 +724,13 @@
              'OrderTime': '13000015',
              'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0)
 
-    queue_r = multiprocessing.Queue()
+    queue_r = multiprocessing.Queue(maxsize=1024)
     order_queues = []
     transaction_queues = []
-    market_queue = multiprocessing.Queue()
+    market_queue = multiprocessing.Queue(maxsize=1024)
     for i in range(20):
-        order_queues.append(multiprocessing.Queue())
-        transaction_queues.append(multiprocessing.Queue())
+        order_queues.append(multiprocessing.Queue(maxsize=1024))
+        transaction_queues.append(multiprocessing.Queue(maxsize=1024))
     threading.Thread(target=test_add_codes).start()
 
     run(queue_r, order_queues, transaction_queues, market_queue)

--
Gitblit v1.8.0