Administrator
2025-03-11 46f51dfb83f6e6a2784676bde64577e5f6f28cf0
huaxin_client/l2_client_v2.py
@@ -46,12 +46,13 @@
    # 买入的大单订单号
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
        self.is_login = False
        self.l2_data_upload_manager = l2_data_upload_manager
        self.codes_volume_and_price_dict = {}
        self.processor_index = processor_index
    def __split_codes(self, codes):
        szse_codes = []
@@ -91,29 +92,29 @@
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
        logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}")
        logger_local_huaxin_l2_subscript.info(f"订阅上证({self.processor_index}):{sh}")
        logger_local_huaxin_l2_subscript.info(f"订阅深证({self.processor_index}):{sz}")
        if sh:
            if ENABLE_NGST:
                result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh({self.processor_index}):{result}")
            else:
                # 订阅逐笔委托
                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh({self.processor_index}):{result}")
                # 订阅逐笔成交
                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}")
                logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh({self.processor_index}):{result}")
            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}")
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh({self.processor_index}):{result}")
        if sz:
            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz({self.processor_index}):{result}")
            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz({self.processor_index}):{result}")
            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}")
            logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz({self.processor_index}):{result}")
    def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
@@ -130,7 +131,7 @@
            codes.add(code)
            self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
            self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
        logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes))
        logger_l2_codes_subscript.info("华鑫L2订阅总数({}):{}", self.processor_index, len(codes))
        add_codes = codes - self.subscripted_codes
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
@@ -148,11 +149,13 @@
        self.__unsubscribe(del_codes)
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}")
            logger_system.info(f"新增L2订阅代码数量({self.processor_index}) ({'缓存' if from_cache else ''}):{len(add_codes)}")
            for c in add_codes:
                logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}")
                logger_l2_codes_subscript.info(
                    f"l2委托数据过滤条件({self.processor_index}):{c} - {self.codes_volume_and_price_dict.get(c)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        logger_l2_codes_subscript.info("华鑫L2订阅结束({}),add-{} del-{}", self.processor_index, len(add_codes),
                                       len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
@@ -167,16 +170,16 @@
            # 保存一份最新的数据
            self.__set_latest_datas(codes_data)
    @classmethod
    def __set_latest_datas(cls, codes_data):
    def __set_latest_datas(self, codes_data):
        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
        with open(path_str, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
    def __get_latest_datas(self):
        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
        if os.path.exists(path_str):
            with open(path_str, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
@@ -199,7 +202,7 @@
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            logger_system.info(f"L2行情登录成功")
            logger_system.info(f"L2行情登录成功({self.processor_index})")
            # 初始设置值
            if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
                threading.Thread(
@@ -218,10 +221,11 @@
    def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubOrderDetail", pRspInfo)
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
        print(f"订阅结果({self.processor_index}):", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"],
              pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
                            f"订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -244,7 +248,7 @@
    def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
                            f"NGTS订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -257,7 +261,7 @@
    def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        try:
            code = pSpecificSecurity['SecurityID']
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}")
            logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅({self.processor_index}):{code}")
            self.subscripted_codes.discard(code)
            if bIsLast == 1:
                print("取消订阅响应结束", self.subscripted_codes)
@@ -529,7 +533,7 @@
            logging.exception(e)
def __init_l2(l2_data_upload_manager):
def __init_l2(l2_data_upload_manager, processor_index):
    print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
    # case 1: Tcp方式
    # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -541,7 +545,7 @@
    # case 2非缓存模式
    # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
    global spi
    spi = Lev2MdSpi(api, l2_data_upload_manager)
    spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index)
    api.RegisterSpi(spi)
    # -------------------正式模式-------------------------------------
    if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -634,12 +638,14 @@
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None:
def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list,
        processor_index) -> None:
    """
    运行
    @param queue_r:
    @param queue_data_callback: 低频数据回调队列
    @param channel_list: [((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
    @param processor_index:处理器索引
    @return:
    """
    logger_system.info("L2进程ID:{}", os.getpid())
@@ -653,8 +659,8 @@
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list)
        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager)
        __init_l2(l2_data_upload_manager)
        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback)
        __init_l2(l2_data_upload_manager, processor_index)
        l2_data_manager_v2.run_upload_common()
        l2_data_manager_v2.run_log()
        # TODO 测试