From 46f51dfb83f6e6a2784676bde64577e5f6f28cf0 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 11 三月 2025 14:31:34 +0800 Subject: [PATCH] 新版L2订阅/L2成交处理时间日志 --- huaxin_client/l2_client_v2.py | 68 ++++++++++++++++++--------------- 1 files changed, 37 insertions(+), 31 deletions(-) diff --git a/huaxin_client/l2_client_v2.py b/huaxin_client/l2_client_v2.py index 2e71fc2..d8020e8 100644 --- a/huaxin_client/l2_client_v2.py +++ b/huaxin_client/l2_client_v2.py @@ -46,12 +46,13 @@ # 涔板叆鐨勫ぇ鍗曡鍗曞彿 - def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): + def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False self.l2_data_upload_manager = l2_data_upload_manager self.codes_volume_and_price_dict = {} + self.processor_index = processor_index def __split_codes(self, codes): szse_codes = [] @@ -91,29 +92,29 @@ def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) - logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}") - logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}") + logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉({self.processor_index})锛歿sh}") + logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉({self.processor_index})锛歿sz}") if sh: if ENABLE_NGST: result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}") + logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh({self.processor_index})锛歿result}") else: # 璁㈤槄閫愮瑪濮旀墭 result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") + logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh({self.processor_index})锛歿result}") # 璁㈤槄閫愮瑪鎴愪氦 result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") + logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh({self.processor_index})锛歿result}") result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}") + logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh({self.processor_index})锛歿result}") if sz: result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}") + logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz({self.processor_index})锛歿result}") result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}") + logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz({self.processor_index})锛歿result}") result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}") + logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz({self.processor_index})锛歿result}") def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): @@ -130,7 +131,7 @@ codes.add(code) self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) - logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes)) + logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟({})锛歿}", self.processor_index, len(codes)) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) @@ -148,11 +149,13 @@ self.__unsubscribe(del_codes) if add_codes: - logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}") + logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({self.processor_index}) ({'缂撳瓨' if from_cache else ''}):{len(add_codes)}") for c in add_codes: - logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}") + logger_l2_codes_subscript.info( + f"l2濮旀墭鏁版嵁杩囨护鏉′欢({self.processor_index})锛歿c} - {self.codes_volume_and_price_dict.get(c)}") - logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes)) + logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫({})锛宎dd-{} del-{}", self.processor_index, len(add_codes), + len(del_codes)) # 璁剧疆鏈�杩戠殑浠g爜鍒楄〃 self.latest_codes_set = codes @@ -167,16 +170,16 @@ # 淇濆瓨涓�浠芥渶鏂扮殑鏁版嵁 self.__set_latest_datas(codes_data) - @classmethod - def __set_latest_datas(cls, codes_data): + def __set_latest_datas(self, codes_data): + path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" data_str = json.dumps([tool.get_now_date_str(), codes_data]) - with open(constant.L2_CODES_INFO_PATH, mode='w') as f: + with open(path_str, mode='w') as f: f.write(data_str) - @classmethod - def __get_latest_datas(cls): - if os.path.exists(constant.L2_CODES_INFO_PATH): - with open(constant.L2_CODES_INFO_PATH, mode='r') as f: + def __get_latest_datas(self): + path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" + if os.path.exists(path_str): + with open(path_str, mode='r') as f: str_ = f.readline() data_json = json.loads(str_) if data_json[0] == tool.get_now_date_str(): @@ -199,7 +202,7 @@ if pRspInfo['ErrorID'] == 0: print("----L2琛屾儏鐧诲綍鎴愬姛----") self.is_login = True - logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛") + logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛({self.processor_index})") # 鍒濆璁剧疆鍊� if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: threading.Thread( @@ -218,10 +221,11 @@ def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubOrderDetail", pRspInfo) # try: - print("璁㈤槄缁撴灉锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], + print(f"璁㈤槄缁撴灉({self.processor_index})锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], + pRspInfo["ErrorID"], pRspInfo["ErrorMsg"]) async_log_util.info(logger_local_huaxin_l2_subscript, - f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") + f"璁㈤槄缁撴灉({self.processor_index})锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) @@ -244,7 +248,7 @@ def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): async_log_util.info(logger_local_huaxin_l2_subscript, - f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") + f"NGTS璁㈤槄缁撴灉({self.processor_index})锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) @@ -257,7 +261,7 @@ def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): try: code = pSpecificSecurity['SecurityID'] - logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}") + logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄({self.processor_index})锛歿code}") self.subscripted_codes.discard(code) if bIsLast == 1: print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) @@ -529,7 +533,7 @@ logging.exception(e) -def __init_l2(l2_data_upload_manager): +def __init_l2(l2_data_upload_manager, processor_index): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp鏂瑰紡 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP @@ -541,7 +545,7 @@ # case 2闈炵紦瀛樻ā寮� # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi - spi = Lev2MdSpi(api, l2_data_upload_manager) + spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index) api.RegisterSpi(spi) # -------------------姝e紡妯″紡------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: @@ -634,12 +638,14 @@ time.sleep(10) -def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None: +def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list, + processor_index) -> None: """ 杩愯 @param queue_r: @param queue_data_callback锛� 浣庨鏁版嵁鍥炶皟闃熷垪 @param channel_list: [((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))] + @param processor_index锛氬鐞嗗櫒绱㈠紩 @return: """ logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) @@ -653,8 +659,8 @@ threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 鍒濆鍖� data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list) - l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager) - __init_l2(l2_data_upload_manager) + l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback) + __init_l2(l2_data_upload_manager, processor_index) l2_data_manager_v2.run_upload_common() l2_data_manager_v2.run_log() # TODO 娴嬭瘯 -- Gitblit v1.8.0