From 7a08ac115597c920a11731ba584fae9f6028ecb2 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 29 四月 2025 19:10:11 +0800
Subject: [PATCH] L2成交数据精确订阅

---
 huaxin_client/l2_client_test.py |  112 ++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 100 insertions(+), 12 deletions(-)

diff --git a/huaxin_client/l2_client_test.py b/huaxin_client/l2_client_test.py
index 445715a..5081fc4 100644
--- a/huaxin_client/l2_client_test.py
+++ b/huaxin_client/l2_client_test.py
@@ -39,20 +39,81 @@
 
 
 class L2TransactionDataManager:
-    def __init__(self, code):
+    def __init__(self, code, accurate_buy=False):
+        """
+        @param code:
+        @param accurate_buy: 鏄惁闇�瑕佺簿纭殑涔板崟淇℃伅
+        """
         self.code = code
         self.__latest_buy_order = None
         self.__big_buy_orders = []
+        # 绮剧‘鐨勪拱鍗曚俊鎭紝{涔板崟鍙凤細璁㈠崟淇℃伅}
+        self.__big_accurate_buy_order_dict = {}
         self.__latest_sell_order = None
         self.__big_sell_orders = []
+        self.big_accurate_buy_order_queue = queue.Queue(maxsize=10240)
         self.big_buy_order_queue = queue.Queue(maxsize=10240)
         self.big_sell_order_queue = queue.Queue(maxsize=10240)
+        self.accurate_buy = accurate_buy
+        self.__last_accurate_buy_count = 0
 
     def get_big_buy_orders(self):
         return self.__big_buy_orders
 
     def get_big_sell_orders(self):
         return self.__big_sell_orders
+
+    def add_transaction_data_for_accurate_buy(self, data):
+        """
+        鑾峰彇绮剧‘鐨勪拱鍗曚俊鎭�
+        @param data:
+        @return:
+        """
+
+        def format_timestamp(timestamp):
+            time_str = str(timestamp)
+            return int(time_str[:5] if time_str[0] == '9' else time_str[:6])
+
+        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
+        # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
+        #         "TradeVolume": pTransaction['TradeVolume'],
+        #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
+        #         "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
+        #         "SellNo": pTransaction['SellNo'],
+        #         "ExecType": pTransaction['ExecType'].decode()}
+        money = round(item[2] * item[3])
+        volume = item[3]
+        price = item[2]
+        order_time = data["OrderTime"]
+        if item[0] not in self.__big_accurate_buy_order_dict:
+            # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�)
+            self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price]
+        buy_order_info = self.__big_accurate_buy_order_dict[item[0]]
+        buy_order_info[1] += volume
+        buy_order_info[2] += money
+        buy_order_info[3] = order_time
+        buy_order_info[4] = price
+        # 灏嗗ぇ鍗曞啓鍏ユ湰鍦版枃浠�
+        if self.__latest_buy_order[0] != item[0]:
+            # 鏈夊彲鑳芥槸澶у崟鎴愪氦瀹屾垚锛� 鍒ゆ柇涓婁釜璁㈠崟鏄惁鏄ぇ鍗�
+            last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0])
+            if last_buy_order[2] > 299e4:
+                self.big_accurate_buy_order_queue.put_nowait(last_buy_order)
+            # 濡傛灉鏁版嵁杩囧闇�瑕佺Щ闄よ繃闀挎椂闂寸殑灏忛噾棰濇暟鎹�
+            accurate_buy_count = len(self.__big_accurate_buy_order_dict)
+            if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000:
+                # 瓒呰繃1w鏉℃暟鎹笖鏂板2000鏉℃暟鎹�
+                # 瓒呰繃1w鏉℃暟鎹氨瑕佺Щ闄�30鍒嗛挓涔嬪墠鐨勬暟鎹�
+                now_time_int = int(tool.trade_time_add_second(tool.get_now_time_str(), -1800).replace(":", ""))
+                try:
+                    remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if
+                                        now_time_int - format_timestamp(
+                                            self.__big_accurate_buy_order_dict[x][3]) > 0]
+                    if remove_order_nos:
+                        for order_no in remove_order_nos:
+                            self.__big_accurate_buy_order_dict.pop(order_no)
+                finally:
+                    self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict)
 
     def add_transaction_data(self, data):
         item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
@@ -66,6 +127,10 @@
         volume = item[3]
         price = item[2]
         order_time = data["OrderTime"]
+
+        if self.accurate_buy:
+            self.add_transaction_data_for_accurate_buy(data)
+
         if not self.__latest_buy_order:
             # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�)
             self.__latest_buy_order = [item[0], 0, 0, order_time, price]
@@ -76,7 +141,8 @@
             self.__latest_buy_order[4] = price
         else:
             if self.__latest_buy_order[2] > 1e6:
-                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3], self.__latest_buy_order[4])
+                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2],
+                     self.__latest_buy_order[3], self.__latest_buy_order[4])
                 self.__big_buy_orders.append(d)
                 self.big_buy_order_queue.put_nowait(d)
 
@@ -91,7 +157,8 @@
             self.__latest_sell_order[4] = price
         else:
             if self.__latest_sell_order[2] > 1e6:
-                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3], self.__latest_sell_order[4])
+                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2],
+                     self.__latest_sell_order[3], self.__latest_sell_order[4])
                 self.__big_sell_orders.append(d)
                 self.big_sell_order_queue.put_nowait(d)
             self.__latest_sell_order = [item[1], volume, money, order_time, price]
@@ -110,12 +177,13 @@
     # 浠g爜鐨勪笂娆℃垚浜ょ殑璁㈠崟鍞竴绱㈠紩
     __last_transaction_keys_dict = {}
 
-    def __init__(self, api, codes):
+    def __init__(self, api, codes, special_codes):
         lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
         self.__api = api
         self.is_login = False
         self.codes = codes
         self.codes_volume_and_price_dict = {}
+        self.special_codes = special_codes
 
     def __split_codes(self, codes):
         szse_codes = []
@@ -227,7 +295,8 @@
                     "ExecType": pTransaction['ExecType'].decode()}
             if item["SecurityID"] not in l2_transaction_data_dict:
                 l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
-            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
+            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item,
+                                                                              item["SecurityID"] in self.special_codes)
 
     def OnRtnNGTSTick(self, pTick):
         """
@@ -246,12 +315,13 @@
                         "ExecType": '1'}
                 if item["SecurityID"] not in l2_transaction_data_dict:
                     l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
-                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
+                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, item[
+                    "SecurityID"] in self.special_codes)
         except Exception as e:
             logger_local_huaxin_l2_subscript.exception(e)
 
 
-def __init_l2(codes):
+def __init_l2(codes, special_codes):
     print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
     # case 1: Tcp鏂瑰紡
     # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -264,7 +334,7 @@
     # case 2闈炵紦瀛樻ā寮�
     # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
     global spi
-    spi = Lev2MdSpi(api, codes)
+    spi = Lev2MdSpi(api, codes, special_codes)
     api.RegisterSpi(spi)
     # -------------------姝e紡妯″紡-------------------------------------
     if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -285,11 +355,18 @@
     api.Init()
 
 
-def run(codes, _queue: multiprocessing.Queue) -> None:
+def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None:
+    """
+    杩愯璁㈤槄
+    @param accurate_buy_order_queue: 绮剧‘澶у崟闃熷垪
+    @param codes: 璁㈤槄鐨勪唬鐮�
+    @param _queue: 鏁版嵁浼犺緭鐨勯槦鍒�
+    @param special_codes: 闇�瑕佺‘瀹氬畬鏁村ぇ鍗曠殑浠g爜
+    @return:
+    """
     try:
         log.close_print()
-
-        __init_l2(codes)
+        __init_l2(codes, special_codes)
         logger_system.info(f"L2璁㈤槄鏈嶅姟鍚姩鎴愬姛:")
     except Exception as e:
         logger_system.exception(e)
@@ -306,6 +383,18 @@
                             break
                 except:
                     pass
+
+                try:
+                    while True:
+                        result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False)
+                        if result:
+                            accurate_buy_order_queue.put_nowait((code, 0, result))
+                        else:
+                            break
+                except:
+                    pass
+
+
                 try:
                     while True:
                         result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
@@ -319,4 +408,3 @@
             pass
         finally:
             time.sleep(1)
-

--
Gitblit v1.8.0