From c4e5d846ac630133fbabb96b4d36a5b16f51a962 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 03 九月 2025 09:50:46 +0800 Subject: [PATCH] bug修复 --- huaxin_client/l2_client.py | 249 ++++++++++++++++++++++++++++++++----------------- 1 files changed, 160 insertions(+), 89 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index c9d599f..5876188 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -17,7 +17,7 @@ from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log -from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript +from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug from utils import tool ###B绫�### @@ -50,8 +50,9 @@ SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] -set_codes_data_queue = queue.Queue() -market_code_dict = {} +set_codes_data_queue = queue.Queue(maxsize=1000) + +ENABLE_NGST = True class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): @@ -76,9 +77,10 @@ szse_codes = [] sse_codes = [] for code in codes: - if code.find("00") == 0: + market_type = tool.get_market_type(code) + if market_type == tool.MARKET_TYPE_SZSE: szse_codes.append(code.encode()) - elif code.find("60") == 0: + elif market_type == tool.MARKET_TYPE_SSE: sse_codes.append(code.encode()) return sse_codes, szse_codes @@ -90,27 +92,38 @@ logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}") logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}") if sh: - # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 - self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 - self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + if ENABLE_NGST: + result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}") + else: + # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 + self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 + self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + def subscribe_codes(self, _codes): + self.__subscribe(_codes) + def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}") logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}") if sh: - # 璁㈤槄閫愮瑪濮旀墭 - result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") - # 璁㈤槄閫愮瑪鎴愪氦 - result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") + if ENABLE_NGST: + result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}") + else: + # 璁㈤槄閫愮瑪濮旀墭 + result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") + # 璁㈤槄閫愮瑪鎴愪氦 + result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}") @@ -135,8 +148,9 @@ for d in codes_data: code = d[0] codes.add(code) - self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4]) - self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4]) + self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) + self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) + logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes)) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) @@ -145,9 +159,9 @@ self.l2_data_upload_manager.release_distributed_upload_queue(c) l2_data_manager.del_target_code(c) for c in codes: - self.l2_data_upload_manager.distribute_upload_queue(c) + self.l2_data_upload_manager.distribute_upload_queue(c, codes) l2_data_manager.add_target_code(c) - except Exception as e: + except Exception as e: # TODO 娓呴櫎鍘熸潵杩樻病閲婃斁鎺夌殑鏁版嵁 logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}") logger_system.exception(e) self.__subscribe(add_codes) @@ -155,6 +169,8 @@ if add_codes: logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}") + for c in add_codes: + logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}") logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes)) @@ -229,6 +245,8 @@ if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -237,6 +255,29 @@ print("OnRspUnSubOrderDetail", bIsLast) try: code = pSpecificSecurity['SecurityID'] + self.subscripted_codes.discard(code) + if bIsLast == 1: + print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) + l2_data_manager.add_subscript_codes(self.subscripted_codes) + except Exception as e: + logging.exception(e) + + def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): + async_log_util.info(logger_local_huaxin_l2_subscript, + f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") + if pRspInfo["ErrorID"] == 0: + print("璁㈤槄鎴愬姛") + self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) + if bIsLast == 1: + print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) + l2_data_manager.add_subscript_codes(self.subscripted_codes) + + def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): + try: + code = pSpecificSecurity['SecurityID'] + logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}") self.subscripted_codes.discard(code) if bIsLast == 1: print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) @@ -259,34 +300,42 @@ def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSTick") - # 4.0.5鐗堟湰鎺ュ彛 - def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): - print("OnRspSubNGTSTick") - def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 浼犲叆锛氭椂闂达紝鐜颁环,鎴愪氦鎬婚噺,涔�1锛屼拱2锛屼拱3,涔�4,涔�5,鍗�1,鍗�2,鍗�3,鍗�4,鍗�5 try: + buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), + (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), + (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), + (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), + (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])] + for i in range(6, 11): + if not pDepthMarketData[f"BidVolume{i}"]: + break + buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}'])) + + sells = [ + (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), + (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), + (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), + (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), + (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) + ] + for i in range(6, 11): + if not pDepthMarketData[f"AskVolume{i}"]: + break + sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}'])) + d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], "lastPrice": pDepthMarketData['LastPrice'], "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], "totalValueTrade": pDepthMarketData['TotalValueTrade'], "totalAskVolume": pDepthMarketData['TotalAskVolume'], "avgAskPrice": pDepthMarketData["AvgAskPrice"], - "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), - (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), - (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), - (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), - (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], - "sell": [ - (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), - (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), - (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), - (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), - (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) - ]} - market_code_dict[pDepthMarketData['SecurityID']] = time.time() + "buy": buys, + "sell": sells} self.l2_data_upload_manager.add_market_data(d) + SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) except: pass @@ -343,28 +392,6 @@ self.l2_data_upload_manager.add_transaction_detail(item) def OnRtnOrderDetail(self, pOrderDetail): - # can_listen = False - # code = str(pOrderDetail['SecurityID']) - # start_time = 0 - # if code in self.special_code_volume_for_order_dict: - # start_time = time.time() - # if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ - # 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: - # # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗� - # if self.special_code_volume_for_order_dict[code][1] > time.time(): - # # 鐗规畩閲忕洃鍚� - # can_listen = True - # else: - # self.special_code_volume_for_order_dict.pop(code) - # if not can_listen: - # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) - # if min_volume is None: - # # 榛樿绛涢��50w - # if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: - # return - # elif pOrderDetail['Volume'] < min_volume: - # return - # 杈撳嚭閫愮瑪濮旀墭鏁版嵁 # 涓婅瘉OrderStatus=b"D"琛ㄧず鎾ゅ崟 item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], "Volume": pOrderDetail['Volume'], @@ -373,6 +400,38 @@ "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], "OrderStatus": pOrderDetail['OrderStatus'].decode()} self.l2_data_upload_manager.add_l2_order_detail(item, 0) + + def OnRtnNGTSTick(self, pTick): + """ + 涓婅瘉鑲$エ鐨勯�愮瑪濮旀墭涓庢垚浜� + @param pTick: + @return: + """ + try: + if pTick['TickType'] == b'T': + # 鎴愪氦 + item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], + "TradeVolume": pTick['Volume'], + "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], + "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], + "SellNo": pTick['SellNo'], + "ExecType": '1'} + self.l2_data_upload_manager.add_transaction_detail(item) + elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D': + # 鎾ゅ崟 + item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'], + "Volume": pTick['Volume'], + "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(), + "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], + "SubSeq": pTick['SubSeq'], "OrderNO": '', + "OrderStatus": pTick['TickType'].decode()} + if pTick['Side'] == b'1': + item['OrderNO'] = pTick['BuyNo'] + elif pTick['Side'] == b'2': + item['OrderNO'] = pTick['SellNo'] + self.l2_data_upload_manager.add_l2_order_detail(item, 0) + except Exception as e: + logger_local_huaxin_l2_subscript.exception(e) def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): @@ -446,33 +505,44 @@ for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) - def OnRtnXTSTick(self, pTick): - # 杈撳嚭涓婃捣鍊哄埜閫愮瑪鏁版嵁鈥� - print( - "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( - pTick['TickType'], - pTick['SecurityID'], - pTick['Price'], - pTick['Volume'], - pTick['TickTime'], - pTick['MainSeq'], - pTick['SubSeq'], - pTick['BuyNo'], - pTick['SellNo'])) - def OnRtnNGTSTick(self, pTick): - # 杈撳嚭涓婃捣鑲″熀閫愮瑪鏁版嵁鈥� - print( - "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( - pTick['TickType'], - pTick['SecurityID'], - pTick['Price'], - pTick['Volume'], - pTick['TickTime'], - pTick['MainSeq'], - pTick['SubSeq'], - pTick['BuyNo'], - pTick['SellNo'])) +class SubscriptDefend: + """ + 璁㈤槄瀹堟姢 + 瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄 + """ + __l2_market_update_time = {} + + @classmethod + def set_l2_market_update(cls, code): + cls.__l2_market_update_time[code] = time.time() + + @classmethod + def run(cls): + while True: + try: + now_time = tool.get_now_time_as_int() + if now_time < int("093015"): + continue + if int("112945") < now_time < int("130015"): + continue + if int("145645") < now_time: + continue + if spi.subscripted_codes: + codes = [] + for code in spi.subscripted_codes: + # 鑾峰彇涓婃鏇存柊鏃堕棿 + update_time = cls.__l2_market_update_time.get(code) + if update_time and time.time() - update_time > 15: + # 闇�瑕侀噸鏂拌闃� + codes.append(code) + if codes: + logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}") + spi.subscribe_codes(codes) + except: + pass + finally: + time.sleep(15) class MyL2ActionCallback(L2ActionCallback): @@ -598,7 +668,8 @@ if queue_r is not None: t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() - + # 璁㈤槄瀹堟姢 + threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 鍒濆鍖� data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) @@ -653,13 +724,13 @@ 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - queue_r = multiprocessing.Queue() + queue_r = multiprocessing.Queue(maxsize=1024) order_queues = [] transaction_queues = [] - market_queue = multiprocessing.Queue() + market_queue = multiprocessing.Queue(maxsize=1024) for i in range(20): - order_queues.append(multiprocessing.Queue()) - transaction_queues.append(multiprocessing.Queue()) + order_queues.append(multiprocessing.Queue(maxsize=1024)) + transaction_queues.append(multiprocessing.Queue(maxsize=1024)) threading.Thread(target=test_add_codes).start() run(queue_r, order_queues, transaction_queues, market_queue) -- Gitblit v1.8.0