From 5715545bec1d88fe9cc4ea79db0a5d1148694590 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 14 十一月 2023 14:35:54 +0800 Subject: [PATCH] L2数据后置过滤 --- huaxin_client/l2_data_manager.py | 69 ++++++++++++++++++++++++---------- 1 files changed, 48 insertions(+), 21 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 67074fa..568d6d4 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -38,19 +38,22 @@ self.temp_transaction_queue_dict = {} self.filter_order_condition_dict = {} - # TODO 闇�瑕佽皟鐢� # 璁剧疆璁㈠崟杩囨护鏉′欢 - def set_order_fileter_condition(self, code, min_volume, special_volumes=None, special_volumes_expire_time=None): + def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, + special_volumes_expire_time=None): if special_volumes is None: special_volumes = set() - self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] + if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: + self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) + else: + self.filter_order_condition_dict[code] = [min_volume, special_volumes, special_volumes_expire_time] # 杩囨护璁㈠崟 def __filter_order(self, item): filter_condition = self.filter_order_condition_dict.get(item[0]) if filter_condition: # item[2]涓洪噺 - if item[2] >= filter_condition[0]: + if item[2] >= filter_condition[0][0]: return item if filter_condition[1] and item[2] in filter_condition[1]: if filter_condition[2] and time.time() > filter_condition[2]: @@ -62,26 +65,48 @@ return None return item + # 杩囨护璁㈠崟 + + def __filter_transaction(self, item): + filter_condition = self.filter_order_condition_dict.get(item[0]) + if filter_condition: + # item[2]涓洪噺 + if abs(item[1] - filter_condition[0][1]) < 0.201: + return item + return None + return item + # 娣诲姞濮旀墭璇︽儏 - def add_l2_order_detail(self, data, start_time, istransaction=False): + def add_l2_order_detail(self, data, start_time=0, istransaction=False): code = data["SecurityID"] - queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) - if not queue_info: - return - queue_info[1].put_nowait( - (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], - data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + # 涓嶇洿鎺ュ姞鍏� + # queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) + # if not queue_info: + # return + # queue_info[1].put_nowait( + # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], + # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + + q: collections.deque = self.temp_order_queue_dict.get(code) + q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], + data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # 娣诲姞閫愮瑪鎴愪氦 def add_transaction_detail(self, data): code = data["SecurityID"] - queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) - if not queue_info: - return - # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜� - queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], - data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], - data['SellNo'], data['ExecType'])) + # 涓嶇洿鎺ュ姞鍏� + # queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) + # if not queue_info: + # return + # # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜� + # queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], + # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], + # data['SellNo'], data['ExecType'])) + + q: collections.deque = self.temp_transaction_queue_dict.get(code) + q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], + data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], + data['SellNo'], data['ExecType'])) def add_market_data(self, data): # 鍔犲叆涓婁紶闃熷垪 @@ -93,8 +118,8 @@ self.transaction_queue_distribute_manager.distribute_queue(code) self.temp_order_queue_dict[code] = collections.deque() self.temp_transaction_queue_dict[code] = collections.deque() - # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() - # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() + threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() + threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait((code, datas, time.time())) @@ -138,7 +163,9 @@ try: if not q: data = q.popleft() - temp_list.append(data) + data = self.__filter_transaction(data) + if data: + temp_list.append(data) else: if temp_list: # 涓婁紶鏁版嵁 -- Gitblit v1.8.0