From 7a08ac115597c920a11731ba584fae9f6028ecb2 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 29 四月 2025 19:10:11 +0800 Subject: [PATCH] L2成交数据精确订阅 --- huaxin_client/l2_client_test.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 100 insertions(+), 12 deletions(-) diff --git a/huaxin_client/l2_client_test.py b/huaxin_client/l2_client_test.py index 445715a..5081fc4 100644 --- a/huaxin_client/l2_client_test.py +++ b/huaxin_client/l2_client_test.py @@ -39,20 +39,81 @@ class L2TransactionDataManager: - def __init__(self, code): + def __init__(self, code, accurate_buy=False): + """ + @param code: + @param accurate_buy: 鏄惁闇�瑕佺簿纭殑涔板崟淇℃伅 + """ self.code = code self.__latest_buy_order = None self.__big_buy_orders = [] + # 绮剧‘鐨勪拱鍗曚俊鎭紝{涔板崟鍙凤細璁㈠崟淇℃伅} + self.__big_accurate_buy_order_dict = {} self.__latest_sell_order = None self.__big_sell_orders = [] + self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240) self.big_buy_order_queue = queue.Queue(maxsize=10240) self.big_sell_order_queue = queue.Queue(maxsize=10240) + self.accurate_buy = accurate_buy + self.__last_accurate_buy_count = 0 def get_big_buy_orders(self): return self.__big_buy_orders def get_big_sell_orders(self): return self.__big_sell_orders + + def add_transaction_data_for_accurate_buy(self, data): + """ + 鑾峰彇绮剧‘鐨勪拱鍗曚俊鎭� + @param data: + @return: + """ + + def format_timestamp(timestamp): + time_str = str(timestamp) + return int(time_str[:5] if time_str[0] == '9' else time_str[:6]) + + item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"]) + # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], + # "TradeVolume": pTransaction['TradeVolume'], + # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], + # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], + # "SellNo": pTransaction['SellNo'], + # "ExecType": pTransaction['ExecType'].decode()} + money = round(item[2] * item[3]) + volume = item[3] + price = item[2] + order_time = data["OrderTime"] + if item[0] not in self.__big_accurate_buy_order_dict: + # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�) + self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price] + buy_order_info = self.__big_accurate_buy_order_dict[item[0]] + buy_order_info[1] += volume + buy_order_info[2] += money + buy_order_info[3] = order_time + buy_order_info[4] = price + # 灏嗗ぇ鍗曞啓鍏ユ湰鍦版枃浠� + if self.__latest_buy_order[0] != item[0]: + # 鏈夊彲鑳芥槸澶у崟鎴愪氦瀹屾垚锛� 鍒ゆ柇涓婁釜璁㈠崟鏄惁鏄ぇ鍗� + last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0]) + if last_buy_order[2] > 299e4: + self.big_accurate_buy_order_queue.put_nowait(last_buy_order) + # 濡傛灉鏁版嵁杩囧闇�瑕佺Щ闄よ繃闀挎椂闂寸殑灏忛噾棰濇暟鎹� + accurate_buy_count = len(self.__big_accurate_buy_order_dict) + if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000: + # 瓒呰繃1w鏉℃暟鎹笖鏂板2000鏉℃暟鎹� + # 瓒呰繃1w鏉℃暟鎹氨瑕佺Щ闄�30鍒嗛挓涔嬪墠鐨勬暟鎹� + now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -1800).replace(":", "")) + try: + remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if + now_time_int - format_timestamp( + self.__big_accurate_buy_order_dict[x][3]) > 0] + if remove_order_nos: + for order_no in remove_order_nos: + self.__big_accurate_buy_order_dict.pop(order_no) + finally: + self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict) def add_transaction_data(self, data): item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"]) @@ -66,6 +127,10 @@ volume = item[3] price = item[2] order_time = data["OrderTime"] + + if self.accurate_buy: + self.add_transaction_data_for_accurate_buy(data) + if not self.__latest_buy_order: # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�) self.__latest_buy_order = [item[0], 0, 0, order_time, price] @@ -76,7 +141,8 @@ self.__latest_buy_order[4] = price else: if self.__latest_buy_order[2] > 1e6: - d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3], self.__latest_buy_order[4]) + d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], + self.__latest_buy_order[3], self.__latest_buy_order[4]) self.__big_buy_orders.append(d) self.big_buy_order_queue.put_nowait(d) @@ -91,7 +157,8 @@ self.__latest_sell_order[4] = price else: if self.__latest_sell_order[2] > 1e6: - d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3], self.__latest_sell_order[4]) + d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], + self.__latest_sell_order[3], self.__latest_sell_order[4]) self.__big_sell_orders.append(d) self.big_sell_order_queue.put_nowait(d) self.__latest_sell_order = [item[1], volume, money, order_time, price] @@ -110,12 +177,13 @@ # 浠g爜鐨勪笂娆℃垚浜ょ殑璁㈠崟鍞竴绱㈠紩 __last_transaction_keys_dict = {} - def __init__(self, api, codes): + def __init__(self, api, codes, special_codes): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api self.is_login = False self.codes = codes self.codes_volume_and_price_dict = {} + self.special_codes = special_codes def __split_codes(self, codes): szse_codes = [] @@ -227,7 +295,8 @@ "ExecType": pTransaction['ExecType'].decode()} if item["SecurityID"] not in l2_transaction_data_dict: l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) - l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) + l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, + item["SecurityID"] in self.special_codes) def OnRtnNGTSTick(self, pTick): """ @@ -246,12 +315,13 @@ "ExecType": '1'} if item["SecurityID"] not in l2_transaction_data_dict: l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"]) - l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item) + l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, item[ + "SecurityID"] in self.special_codes) except Exception as e: logger_local_huaxin_l2_subscript.exception(e) -def __init_l2(codes): +def __init_l2(codes, special_codes): print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) # case 1: Tcp鏂瑰紡 # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP @@ -264,7 +334,7 @@ # case 2闈炵紦瀛樻ā寮� # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi - spi = Lev2MdSpi(api, codes) + spi = Lev2MdSpi(api, codes, special_codes) api.RegisterSpi(spi) # -------------------姝e紡妯″紡------------------------------------- if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: @@ -285,11 +355,18 @@ api.Init() -def run(codes, _queue: multiprocessing.Queue) -> None: +def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None: + """ + 杩愯璁㈤槄 + @param accurate_buy_order_queue: 绮剧‘澶у崟闃熷垪 + @param codes: 璁㈤槄鐨勪唬鐮� + @param _queue: 鏁版嵁浼犺緭鐨勯槦鍒� + @param special_codes: 闇�瑕佺‘瀹氬畬鏁村ぇ鍗曠殑浠g爜 + @return: + """ try: log.close_print() - - __init_l2(codes) + __init_l2(codes, special_codes) logger_system.info(f"L2璁㈤槄鏈嶅姟鍚姩鎴愬姛:") except Exception as e: logger_system.exception(e) @@ -306,6 +383,18 @@ break except: pass + + try: + while True: + result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False) + if result: + accurate_buy_order_queue.put_nowait((code, 0, result)) + else: + break + except: + pass + + try: while True: result = l2_transaction_data_manager.big_sell_order_queue.get(block=False) @@ -319,4 +408,3 @@ pass finally: time.sleep(1) - -- Gitblit v1.8.0