From d91210fd9d205558fb3a0acb7e5cc8c6f0c600f2 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 04 九月 2025 18:18:27 +0800
Subject: [PATCH] 人为移想管理

---
 huaxin_client/l2_client_test.py |  223 ++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 193 insertions(+), 30 deletions(-)

diff --git a/huaxin_client/l2_client_test.py b/huaxin_client/l2_client_test.py
index c7d9abb..ad0e818 100644
--- a/huaxin_client/l2_client_test.py
+++ b/huaxin_client/l2_client_test.py
@@ -1,13 +1,15 @@
 # -*- coding: utf-8 -*-
 import logging
+import multiprocessing
 import queue
 import time
 import lev2mdapi
+from l2.huaxin import l2_huaxin_util
 from log_module import log
 from log_module.log import logger_local_huaxin_l2_subscript, logger_system
 from utils import tool
 
-IS_TEST = True
+IS_TEST = False
 
 ###B绫�###
 Front_Address = "tcp://10.0.1.101:6900"
@@ -31,19 +33,33 @@
 g_SubBondMarketData = False
 g_SubBondTransaction = False
 g_SubBondOrderDetail = False
-set_codes_data_queue = queue.Queue()
+set_codes_data_queue = queue.Queue(maxsize=102400)
 market_code_dict = {}
 
 ENABLE_NGST = True
 
 
 class L2TransactionDataManager:
-    def __init__(self, code):
+    def __init__(self, code, accurate_buy=False):
+        """
+        @param code:
+        @param accurate_buy: 鏄惁闇�瑕佺簿纭殑涔板崟淇℃伅
+        """
         self.code = code
         self.__latest_buy_order = None
         self.__big_buy_orders = []
+        # 绮剧‘鐨勪拱鍗曚俊鎭紝{涔板崟鍙凤細璁㈠崟淇℃伅}
+        self.__big_accurate_buy_order_dict = {}
+        self.__big_accurate_sell_order_dict = {}
         self.__latest_sell_order = None
         self.__big_sell_orders = []
+        self.big_accurate_buy_order_queue = queue.Queue(maxsize=102400)
+        self.big_accurate_sell_order_queue = queue.Queue(maxsize=102400)
+        self.big_buy_order_queue = queue.Queue(maxsize=102400)
+        self.big_sell_order_queue = queue.Queue(maxsize=102400)
+        self.accurate_buy = accurate_buy
+        self.__last_accurate_buy_count = 0
+        self.__last_accurate_sell_count = 0
 
     def get_big_buy_orders(self):
         return self.__big_buy_orders
@@ -51,8 +67,87 @@
     def get_big_sell_orders(self):
         return self.__big_sell_orders
 
-    def add_transaction_data(self, data):
-        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"])
+    def add_transaction_data_for_accurate(self, item, big_order_money_threshold=299e4):
+        """
+        鑾峰彇绮剧‘鐨勪拱鍗曚俊鎭�
+        @param big_order_money_threshold: 澶у崟闃堝��
+        @param data:
+        @return:
+        """
+
+        def format_timestamp(timestamp):
+            time_str = str(timestamp)
+            return int(time_str[:5] if time_str[0] == '9' else time_str[:6])
+
+        money = round(item[2] * item[3])
+        volume = item[3]
+        price = item[2]
+        order_time = item[4]
+        if item[0] not in self.__big_accurate_buy_order_dict:
+            # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�, 寮�濮嬫垚浜ゆ椂闂�, 寮�濮嬫垚浜や环鏍�)
+            self.__big_accurate_buy_order_dict[item[0]] = [item[0], 0, 0, order_time, price, order_time, price]
+        buy_order_info = self.__big_accurate_buy_order_dict[item[0]]
+        buy_order_info[1] += volume
+        buy_order_info[2] += money
+        buy_order_info[3] = order_time
+        buy_order_info[4] = price
+        # 灏嗗ぇ鍗曞啓鍏ユ湰鍦版枃浠�
+        if self.__latest_buy_order and self.__latest_buy_order[0] != item[0]:
+            # 鏈夊彲鑳芥槸澶у崟鎴愪氦瀹屾垚锛� 鍒ゆ柇涓婁釜璁㈠崟鏄惁鏄ぇ鍗�
+            last_buy_order = self.__big_accurate_buy_order_dict.get(self.__latest_buy_order[0])
+
+            if last_buy_order[2] > big_order_money_threshold:
+                self.big_accurate_buy_order_queue.put_nowait(last_buy_order)
+
+            # 濡傛灉鏁版嵁杩囧闇�瑕佺Щ闄よ繃闀挎椂闂寸殑灏忛噾棰濇暟鎹�
+            accurate_buy_count = len(self.__big_accurate_buy_order_dict.keys())
+            if accurate_buy_count > 10000 and accurate_buy_count - self.__last_accurate_buy_count > 2000:
+                # 瓒呰繃1w鏉℃暟鎹笖鏂板2000鏉℃暟鎹�
+                # 瓒呰繃1w鏉℃暟鎹氨瑕佺Щ闄�30鍒嗛挓涔嬪墠鐨勬暟鎹�
+                now_time_int = int(
+                    tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", ""))
+                try:
+                    remove_order_nos = [x for x in self.__big_accurate_buy_order_dict if
+                                        format_timestamp(self.__big_accurate_buy_order_dict[x][3]) < now_time_int]
+                    if remove_order_nos:
+                        for order_no in remove_order_nos:
+                            self.__big_accurate_buy_order_dict.pop(order_no)
+                finally:
+                    self.__last_accurate_buy_count = len(self.__big_accurate_buy_order_dict)
+
+        # 缁熻鍗栧崟
+        if item[1] not in self.__big_accurate_sell_order_dict:
+            # (鍗栧崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�, 寮�濮嬫垚浜ゆ椂闂�, 寮�濮嬫垚浜や环鏍�)
+            self.__big_accurate_sell_order_dict[item[1]] = [item[1], 0, 0, order_time, price, order_time, price]
+        sell_order_info = self.__big_accurate_sell_order_dict[item[1]]
+        sell_order_info[1] += volume
+        sell_order_info[2] += money
+        sell_order_info[3] = order_time
+        sell_order_info[4] = price
+        if self.__latest_sell_order and self.__latest_sell_order[0] != item[1]:
+            # 鏈夊彲鑳芥槸澶у崟鎴愪氦瀹屾垚锛� 鍒ゆ柇涓婁釜璁㈠崟鏄惁鏄ぇ鍗�
+            last_sell_order = self.__big_accurate_sell_order_dict.get(self.__latest_sell_order[0])
+            if last_sell_order[2] > big_order_money_threshold:
+                self.big_accurate_sell_order_queue.put_nowait(last_sell_order)
+            # 濡傛灉鏁版嵁杩囧闇�瑕佺Щ闄よ繃闀挎椂闂寸殑灏忛噾棰濇暟鎹�
+            accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
+            if accurate_sell_count > 10000 and accurate_sell_count - self.__last_accurate_sell_count > 2000:
+                # 瓒呰繃1w鏉℃暟鎹笖鏂板2000鏉℃暟鎹�
+                # 瓒呰繃1w鏉℃暟鎹氨瑕佺Щ闄�30鍒嗛挓涔嬪墠鐨勬暟鎹�
+                now_time_int = int(
+                    tool.trade_time_add_second(l2_huaxin_util.convert_time(order_time), -3600).replace(":", ""))
+                try:
+                    remove_order_nos = [x for x in self.__big_accurate_sell_order_dict if
+                                        now_time_int > format_timestamp(
+                                            self.__big_accurate_sell_order_dict[x][3])]
+                    if remove_order_nos:
+                        for order_no in remove_order_nos:
+                            self.__big_accurate_sell_order_dict.pop(order_no)
+                finally:
+                    self.__last_accurate_sell_count = len(self.__big_accurate_sell_order_dict.keys())
+
+    def add_transaction_data(self, data, big_order_money_threshold=299e4):
+        item = (data["BuyNo"], data["SellNo"], data["TradePrice"], data["TradeVolume"], data["OrderTime"])
         # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
         #         "TradeVolume": pTransaction['TradeVolume'],
         #         "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
@@ -61,25 +156,47 @@
         #         "ExecType": pTransaction['ExecType'].decode()}
         money = round(item[2] * item[3])
         volume = item[3]
+        price = item[2]
+        order_time = item[4]
+
+        if self.accurate_buy:
+            self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4)
+
         if not self.__latest_buy_order:
-            self.__latest_buy_order = [item[0], 0, 0]
+            # (涔板崟鍙�, 閲�, 閲戦, 鏃堕棿, 鏈�鏂版垚浜や环鏍�)
+            self.__latest_buy_order = [item[0], 0, 0, order_time, price]
         if self.__latest_buy_order[0] == item[0]:
             self.__latest_buy_order[1] += volume
             self.__latest_buy_order[2] += money
+            self.__latest_buy_order[3] = order_time
+            self.__latest_buy_order[4] = price
         else:
-            if self.__latest_buy_order[2] > 1e6:
-                self.__big_buy_orders.append((self.__latest_buy_order[0],self.__latest_buy_order[1], self.__latest_buy_order[2]))
-            self.__latest_buy_order = [item[0],volume,  money]
+            if self.__latest_buy_order[2] >= big_order_money_threshold:
+                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2],
+                     self.__latest_buy_order[3], self.__latest_buy_order[4])
+                self.__big_buy_orders.append(d)
+                self.big_buy_order_queue.put_nowait(d)
+
+            self.__latest_buy_order = [item[0], volume, money, order_time, price]
 
         if not self.__latest_sell_order:
-            self.__latest_sell_order = [item[1], 0, 0]
+            self.__latest_sell_order = [item[1], 0, 0, order_time, price]
         if self.__latest_sell_order[0] == item[1]:
             self.__latest_sell_order[1] += volume
             self.__latest_sell_order[2] += money
+            self.__latest_sell_order[3] = order_time
+            self.__latest_sell_order[4] = price
         else:
-            if self.__latest_sell_order[2] > 1e6:
-                self.__big_sell_orders.append((self.__latest_sell_order[0],self.__latest_sell_order[1], self.__latest_sell_order[2]))
-            self.__latest_sell_order = [item[1], volume,  money]
+            if self.__latest_sell_order[2] > big_order_money_threshold:
+                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2],
+                     self.__latest_sell_order[3], self.__latest_sell_order[4])
+                self.__big_sell_orders.append(d)
+                self.big_sell_order_queue.put_nowait(d)
+            self.__latest_sell_order = [item[1], volume, money, order_time, price]
+
+
+# 涔板叆鐨勫ぇ鍗曡鍗曞彿
+l2_transaction_data_dict = {}
 
 
 class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -91,15 +208,13 @@
     # 浠g爜鐨勪笂娆℃垚浜ょ殑璁㈠崟鍞竴绱㈠紩
     __last_transaction_keys_dict = {}
 
-    # 涔板叆鐨勫ぇ鍗曡鍗曞彿
-    __l2_transaction_data_dict = {}
-
-    def __init__(self, api, codes):
+    def __init__(self, api, codes, special_codes):
         lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
         self.__api = api
         self.is_login = False
         self.codes = codes
         self.codes_volume_and_price_dict = {}
+        self.special_codes = special_codes
 
     def __split_codes(self, codes):
         szse_codes = []
@@ -209,9 +324,10 @@
                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'],
                     "SellNo": pTransaction['SellNo'],
                     "ExecType": pTransaction['ExecType'].decode()}
-            if item["SecurityID"] not in self.__l2_transaction_data_dict:
-                self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
-            self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
+            if item["SecurityID"] not in l2_transaction_data_dict:
+                l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
+                    "SecurityID"] in self.special_codes)
+            l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, big_order_money_threshold=60e4)
 
     def OnRtnNGTSTick(self, pTick):
         """
@@ -228,14 +344,15 @@
                         "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'],
                         "SellNo": pTick['SellNo'],
                         "ExecType": '1'}
-                if item["SecurityID"] not in self.__l2_transaction_data_dict:
-                    self.__l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"])
-                self.__l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item)
+                if item["SecurityID"] not in l2_transaction_data_dict:
+                    l2_transaction_data_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], item[
+                        "SecurityID"] in self.special_codes)
+                l2_transaction_data_dict[item["SecurityID"]].add_transaction_data(item, big_order_money_threshold=60e4)
         except Exception as e:
             logger_local_huaxin_l2_subscript.exception(e)
 
 
-def __init_l2(codes):
+def __init_l2(codes, special_codes):
     print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
     # case 1: Tcp鏂瑰紡
     # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -248,7 +365,7 @@
     # case 2闈炵紦瀛樻ā寮�
     # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
     global spi
-    spi = Lev2MdSpi(api, codes)
+    spi = Lev2MdSpi(api, codes, special_codes)
     api.RegisterSpi(spi)
     # -------------------姝e紡妯″紡-------------------------------------
     if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -269,17 +386,63 @@
     api.Init()
 
 
-def run(codes) -> None:
+def run(codes, _queue: multiprocessing.Queue, accurate_buy_order_queue: multiprocessing.Queue, special_codes) -> None:
+    """
+    杩愯璁㈤槄
+    @param accurate_buy_order_queue: 绮剧‘澶у崟闃熷垪
+    @param codes: 璁㈤槄鐨勪唬鐮�
+    @param _queue: 鏁版嵁浼犺緭鐨勯槦鍒�
+    @param special_codes: 闇�瑕佺‘瀹氬畬鏁村ぇ鍗曠殑浠g爜
+    @return:
+    """
     try:
         log.close_print()
-        __init_l2(codes)
+        __init_l2(codes, special_codes)
         logger_system.info(f"L2璁㈤槄鏈嶅姟鍚姩鎴愬姛:")
     except Exception as e:
         logger_system.exception(e)
     while True:
-        time.sleep(2)
+        try:
+            # 璇诲彇涓�閬�
+            for code in l2_transaction_data_dict:
+                l2_transaction_data_manager: L2TransactionDataManager = l2_transaction_data_dict[code]
+
+                while True:
+                    if not l2_transaction_data_manager.big_buy_order_queue.empty():
+                        result = l2_transaction_data_manager.big_buy_order_queue.get(block=False)
+                        if result:
+                            _queue.put_nowait((code, 0, result))
+                    else:
+                        break
+
+                while True:
+                    if not l2_transaction_data_manager.big_accurate_buy_order_queue.empty():
+                        result = l2_transaction_data_manager.big_accurate_buy_order_queue.get(block=False)
+                        if result:
+                            accurate_buy_order_queue.put_nowait((code, 0, result))
+                    else:
+                        break
+
+                while True:
+                    if not l2_transaction_data_manager.big_accurate_sell_order_queue.empty():
+                        result = l2_transaction_data_manager.big_accurate_sell_order_queue.get(block=False)
+                        if result:
+                            accurate_buy_order_queue.put_nowait((code, 1, result))
+                    else:
+                        break
+
+                while True:
+                    if not l2_transaction_data_manager.big_sell_order_queue.empty():
+                        result = l2_transaction_data_manager.big_sell_order_queue.get(block=False)
+                        if result:
+                            _queue.put_nowait((code, 1, result))
+                    else:
+                        break
+        except:
+            pass
+        finally:
+            time.sleep(1)
 
 
 if __name__ == "__main__":
-    run({"000009", "601618"})
-    input()
+    pass

--
Gitblit v1.8.0