Administrator
2025-01-06 6567e8cd1fb11ea10912bb3ac5bf2965c74c0e4b
huaxin_client/l2_client.py
@@ -17,7 +17,7 @@
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
from utils import tool
###B类###
@@ -51,7 +51,6 @@
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
set_codes_data_queue = queue.Queue(maxsize=1000)
market_code_dict = {}
ENABLE_NGST = True
@@ -106,6 +105,9 @@
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def subscribe_codes(self, _codes):
        self.__subscribe(_codes)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
@@ -241,6 +243,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -262,6 +266,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -326,8 +332,8 @@
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": buys,
                 "sell": sells}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
            self.l2_data_upload_manager.add_market_data(d)
            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
        except:
            pass
@@ -498,6 +504,45 @@
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
class SubscriptDefend:
    """
    订阅守护
    定义:当订阅的代码超过一定时间没有回调数据时重新订阅
    """
    __l2_market_update_time = {}
    @classmethod
    def set_l2_market_update(cls, code):
        cls.__l2_market_update_time[code] = time.time()
    @classmethod
    def run(cls):
        while True:
            try:
                now_time = tool.get_now_time_as_int()
                if now_time < int("093015"):
                    continue
                if int("112945") < now_time < int("130015"):
                    continue
                if int("145645") < now_time:
                    continue
                if spi.subscripted_codes:
                    codes = []
                    for code in spi.subscripted_codes:
                        # 获取上次更新时间
                        update_time = cls.__l2_market_update_time.get(code)
                        if update_time and time.time() - update_time > 15:
                            # 需要重新订阅
                            codes.append(code)
                    if codes:
                        logger_debug.info(f"重新订阅:{codes}")
                        spi.subscribe_codes(codes)
            except:
                pass
            finally:
                time.sleep(15)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
@@ -621,7 +666,8 @@
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 订阅守护
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)