From 5715545bec1d88fe9cc4ea79db0a5d1148694590 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 14 十一月 2023 14:35:54 +0800
Subject: [PATCH] L2数据后置过滤

---
 huaxin_client/l2_data_manager.py |   69 ++++++++++++++++++++++++----------
 1 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 67074fa..568d6d4 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -38,19 +38,22 @@
         self.temp_transaction_queue_dict = {}
         self.filter_order_condition_dict = {}
 
-    # TODO 闇�瑕佽皟鐢�
     # 璁剧疆璁㈠崟杩囨护鏉′欢
-    def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None):
+    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
+                                    special_volumes_expire_time=None):
         if special_volumes is None:
             special_volumes = set()
-        self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
+        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
+            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
+        else:
+            self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time]
 
     # 杩囨护璁㈠崟
     def __filter_order(self, item):
         filter_condition = self.filter_order_condition_dict.get(item[0])
         if filter_condition:
             # item[2]涓洪噺
-            if item[2] >= filter_condition[0]:
+            if item[2] >= filter_condition[0][0]:
                 return item
             if filter_condition[1] and item[2] in filter_condition[1]:
                 if filter_condition[2] and time.time() > filter_condition[2]:
@@ -62,26 +65,48 @@
             return None
         return item
 
+        # 杩囨护璁㈠崟
+
+    def __filter_transaction(self, item):
+        filter_condition = self.filter_order_condition_dict.get(item[0])
+        if filter_condition:
+            # item[2]涓洪噺
+            if abs(item[1] - filter_condition[0][1]) < 0.201:
+                return item
+            return None
+        return item
+
     # 娣诲姞濮旀墭璇︽儏
-    def add_l2_order_detail(self, data, start_time, istransaction=False):
+    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
         code = data["SecurityID"]
-        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
-        if not queue_info:
-            return
-        queue_info[1].put_nowait(
-            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
-             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
+        # 涓嶇洿鎺ュ姞鍏�
+        # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
+        # if not queue_info:
+        #     return
+        # queue_info[1].put_nowait(
+        #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
+        #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
+
+        q: collections.deque = self.temp_order_queue_dict.get(code)
+        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
+                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
 
     # 娣诲姞閫愮瑪鎴愪氦
     def add_transaction_detail(self, data):
         code = data["SecurityID"]
-        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
-        if not queue_info:
-            return
-        # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜�
-        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
-                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
-                                  data['SellNo'], data['ExecType']))
+        # 涓嶇洿鎺ュ姞鍏�
+        # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
+        # if not queue_info:
+        #     return
+        # # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜�
+        # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
+        #                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
+        #                           data['SellNo'], data['ExecType']))
+
+        q: collections.deque = self.temp_transaction_queue_dict.get(code)
+        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
+                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
+                  data['SellNo'], data['ExecType']))
 
     def add_market_data(self, data):
         # 鍔犲叆涓婁紶闃熷垪
@@ -93,8 +118,8 @@
         self.transaction_queue_distribute_manager.distribute_queue(code)
         self.temp_order_queue_dict[code] = collections.deque()
         self.temp_transaction_queue_dict[code] = collections.deque()
-        # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
-        # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
+        threading.Thread(target=lambda: self.__run_upload_order_task(code)).start()
+        threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start()
 
     def __upload_l2_data(self, code, _queue, datas):
         _queue.put_nowait((code, datas, time.time()))
@@ -138,7 +163,9 @@
             try:
                 if not q:
                     data = q.popleft()
-                    temp_list.append(data)
+                    data = self.__filter_transaction(data)
+                    if data:
+                        temp_list.append(data)
                 else:
                     if temp_list:
                         # 涓婁紶鏁版嵁

--
Gitblit v1.8.0