From 46f51dfb83f6e6a2784676bde64577e5f6f28cf0 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 11 三月 2025 14:31:34 +0800
Subject: [PATCH] 新版L2订阅/L2成交处理时间日志

---
 huaxin_client/l2_client_v2.py |   68 ++++++++++++++++++---------------
 1 files changed, 37 insertions(+), 31 deletions(-)

diff --git a/huaxin_client/l2_client_v2.py b/huaxin_client/l2_client_v2.py
index 2e71fc2..d8020e8 100644
--- a/huaxin_client/l2_client_v2.py
+++ b/huaxin_client/l2_client_v2.py
@@ -46,12 +46,13 @@
 
     # 涔板叆鐨勫ぇ鍗曡鍗曞彿
 
-    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager):
+    def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index):
         lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
         self.__api = api
         self.is_login = False
         self.l2_data_upload_manager = l2_data_upload_manager
         self.codes_volume_and_price_dict = {}
+        self.processor_index = processor_index
 
     def __split_codes(self, codes):
         szse_codes = []
@@ -91,29 +92,29 @@
 
     def __subscribe(self, _codes):
         sh, sz = self.__split_codes(_codes)
-        logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}")
-        logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}")
+        logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉({self.processor_index})锛歿sh}")
+        logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉({self.processor_index})锛歿sz}")
         if sh:
             if ENABLE_NGST:
                 result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}")
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh({self.processor_index})锛歿result}")
             else:
                 # 璁㈤槄閫愮瑪濮旀墭
                 result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh({self.processor_index})锛歿result}")
                 # 璁㈤槄閫愮瑪鎴愪氦
                 result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
+                logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh({self.processor_index})锛歿result}")
 
             result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}")
+            logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh({self.processor_index})锛歿result}")
         if sz:
             result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}")
+            logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz({self.processor_index})锛歿result}")
             result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-            logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}")
+            logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz({self.processor_index})锛歿result}")
             result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-            logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}")
+            logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz({self.processor_index})锛歿result}")
 
     def __process_codes_data(self, codes_data, from_cache=False, delay=0.0):
 
@@ -130,7 +131,7 @@
             codes.add(code)
             self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5])
             self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5])
-        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes))
+        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟({})锛歿}", self.processor_index, len(codes))
         add_codes = codes - self.subscripted_codes
         del_codes = self.subscripted_codes - codes
         print("add del codes", add_codes, del_codes)
@@ -148,11 +149,13 @@
         self.__unsubscribe(del_codes)
 
         if add_codes:
-            logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}")
+            logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({self.processor_index}) ({'缂撳瓨' if from_cache else ''}):{len(add_codes)}")
             for c in add_codes:
-                logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}")
+                logger_l2_codes_subscript.info(
+                    f"l2濮旀墭鏁版嵁杩囨护鏉′欢({self.processor_index})锛歿c} - {self.codes_volume_and_price_dict.get(c)}")
 
-        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes))
+        logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫({})锛宎dd-{} del-{}", self.processor_index, len(add_codes),
+                                       len(del_codes))
 
         # 璁剧疆鏈�杩戠殑浠g爜鍒楄〃
         self.latest_codes_set = codes
@@ -167,16 +170,16 @@
             # 淇濆瓨涓�浠芥渶鏂扮殑鏁版嵁
             self.__set_latest_datas(codes_data)
 
-    @classmethod
-    def __set_latest_datas(cls, codes_data):
+    def __set_latest_datas(self, codes_data):
+        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
         data_str = json.dumps([tool.get_now_date_str(), codes_data])
-        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
+        with open(path_str, mode='w') as f:
             f.write(data_str)
 
-    @classmethod
-    def __get_latest_datas(cls):
-        if os.path.exists(constant.L2_CODES_INFO_PATH):
-            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
+    def __get_latest_datas(self):
+        path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}"
+        if os.path.exists(path_str):
+            with open(path_str, mode='r') as f:
                 str_ = f.readline()
                 data_json = json.loads(str_)
                 if data_json[0] == tool.get_now_date_str():
@@ -199,7 +202,7 @@
         if pRspInfo['ErrorID'] == 0:
             print("----L2琛屾儏鐧诲綍鎴愬姛----")
             self.is_login = True
-            logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛")
+            logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛({self.processor_index})")
             # 鍒濆璁剧疆鍊�
             if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0:
                 threading.Thread(
@@ -218,10 +221,11 @@
     def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubOrderDetail", pRspInfo)
         # try:
-        print("璁㈤槄缁撴灉锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
+        print(f"璁㈤槄缁撴灉({self.processor_index})锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"],
+              pRspInfo["ErrorID"],
               pRspInfo["ErrorMsg"])
         async_log_util.info(logger_local_huaxin_l2_subscript,
-                            f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
+                            f"璁㈤槄缁撴灉({self.processor_index})锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -244,7 +248,7 @@
 
     def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         async_log_util.info(logger_local_huaxin_l2_subscript,
-                            f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
+                            f"NGTS璁㈤槄缁撴灉({self.processor_index})锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
@@ -257,7 +261,7 @@
     def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         try:
             code = pSpecificSecurity['SecurityID']
-            logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}")
+            logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄({self.processor_index})锛歿code}")
             self.subscripted_codes.discard(code)
             if bIsLast == 1:
                 print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes)
@@ -529,7 +533,7 @@
             logging.exception(e)
 
 
-def __init_l2(l2_data_upload_manager):
+def __init_l2(l2_data_upload_manager, processor_index):
     print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
     # case 1: Tcp鏂瑰紡
     # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -541,7 +545,7 @@
     # case 2闈炵紦瀛樻ā寮�
     # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
     global spi
-    spi = Lev2MdSpi(api, l2_data_upload_manager)
+    spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index)
     api.RegisterSpi(spi)
     # -------------------姝e紡妯″紡-------------------------------------
     if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -634,12 +638,14 @@
             time.sleep(10)
 
 
-def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None:
+def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list,
+        processor_index) -> None:
     """
     杩愯
     @param queue_r:
     @param queue_data_callback锛� 浣庨鏁版嵁鍥炶皟闃熷垪
     @param channel_list: [((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))]
+    @param processor_index锛氬鐞嗗櫒绱㈠紩
     @return:
     """
     logger_system.info("L2杩涚▼ID锛歿}", os.getpid())
@@ -653,8 +659,8 @@
         threading.Thread(target=SubscriptDefend.run, daemon=True).start()
         # 鍒濆鍖�
         data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list)
-        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager)
-        __init_l2(l2_data_upload_manager)
+        l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback)
+        __init_l2(l2_data_upload_manager, processor_index)
         l2_data_manager_v2.run_upload_common()
         l2_data_manager_v2.run_log()
         # TODO 娴嬭瘯

--
Gitblit v1.8.0