From 6567e8cd1fb11ea10912bb3ac5bf2965c74c0e4b Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 06 一月 2025 18:09:47 +0800 Subject: [PATCH] 统计大卖单 --- huaxin_client/l2_client.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 50 insertions(+), 4 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index f88ef1d..aab2143 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -17,7 +17,7 @@ from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log -from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript +from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug from utils import tool ###B绫�### @@ -51,7 +51,6 @@ b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] set_codes_data_queue = queue.Queue(maxsize=1000) -market_code_dict = {} ENABLE_NGST = True @@ -106,6 +105,9 @@ self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + + def subscribe_codes(self, _codes): + self.__subscribe(_codes) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) @@ -241,6 +243,8 @@ if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -262,6 +266,8 @@ if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -326,8 +332,8 @@ "avgAskPrice": pDepthMarketData["AvgAskPrice"], "buy": buys, "sell": sells} - market_code_dict[pDepthMarketData['SecurityID']] = time.time() self.l2_data_upload_manager.add_market_data(d) + SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) except: pass @@ -498,6 +504,45 @@ print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) +class SubscriptDefend: + """ + 璁㈤槄瀹堟姢 + 瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄 + """ + __l2_market_update_time = {} + + @classmethod + def set_l2_market_update(cls, code): + cls.__l2_market_update_time[code] = time.time() + + @classmethod + def run(cls): + while True: + try: + now_time = tool.get_now_time_as_int() + if now_time < int("093015"): + continue + if int("112945") < now_time < int("130015"): + continue + if int("145645") < now_time: + continue + if spi.subscripted_codes: + codes = [] + for code in spi.subscripted_codes: + # 鑾峰彇涓婃鏇存柊鏃堕棿 + update_time = cls.__l2_market_update_time.get(code) + if update_time and time.time() - update_time > 15: + # 闇�瑕侀噸鏂拌闃� + codes.append(code) + if codes: + logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}") + spi.subscribe_codes(codes) + except: + pass + finally: + time.sleep(15) + + class MyL2ActionCallback(L2ActionCallback): def OnSetL2Position(self, codes_data): @@ -621,7 +666,8 @@ if queue_r is not None: t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() - + # 璁㈤槄瀹堟姢 + threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 鍒濆鍖� data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) -- Gitblit v1.8.0