From b86057f378a6567842e41f93df25f9188f0dc25c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 13 十一月 2023 15:43:39 +0800 Subject: [PATCH] 修改l2数据上传规则(未生效) --- huaxin_client/l2_data_manager.py | 70 ++++++++++++++++++++++++++++++++++ 1 files changed, 69 insertions(+), 1 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 0c00579..c89727a 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -11,9 +11,10 @@ # 娲诲姩鏃堕棿 from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager -from log_module import async_log_util +from log_module import async_log_util from log_module.log import logger_local_huaxin_l2_error, logger_system from utils import tool +import collections order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} @@ -33,6 +34,8 @@ self.order_queue_distribute_manager = order_queue_distribute_manager self.transaction_queue_distribute_manager = transaction_queue_distribute_manager self.market_data_queue = market_data_queue + self.temp_order_queue_dict = {} + self.temp_transaction_queue_dict = {} # 娣诲姞濮旀墭璇︽儏 def add_l2_order_detail(self, data, start_time, istransaction=False): @@ -63,11 +66,75 @@ def distribute_upload_queue(self, code): self.order_queue_distribute_manager.distribute_queue(code) self.transaction_queue_distribute_manager.distribute_queue(code) + self.temp_order_queue_dict[code] = collections.deque() + self.temp_transaction_queue_dict[code] = collections.deque() + # threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() + # threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() + + def __upload_l2_data(self, code, _queue, datas): + _queue.put_nowait((code, datas, time.time())) + + # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� + def __run_upload_order_task(self, code): + q: collections.deque = self.temp_order_queue_dict.get(code) + temp_list = [] + queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) + upload_queue = queue_info[1] + while True: + try: + if not q: + data = q.popleft() + temp_list.append(data) + else: + if temp_list: + # 涓婁紶鏁版嵁 + self.__upload_l2_data(code, upload_queue, temp_list) + temp_list.clear() + else: + if code not in self.temp_order_queue_dict: + break + time.sleep(0.001) + + except: + pass + finally: + pass + + # 澶勭悊鎴愪氦鏁版嵁骞朵笂浼� + def __run_upload_transaction_task(self, code): + q: collections.deque = self.temp_transaction_queue_dict.get(code) + queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) + upload_queue = queue_info[1] + temp_list = [] + while True: + try: + if not q: + data = q.popleft() + temp_list.append(data) + else: + if temp_list: + # 涓婁紶鏁版嵁 + self.__upload_l2_data(code, upload_queue, temp_list) + temp_list.clear() + else: + if code not in self.temp_transaction_queue_dict: + break + time.sleep(0.002) + except: + pass + finally: + pass # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� def release_distributed_upload_queue(self, code): self.order_queue_distribute_manager.release_distribute_queue(code) self.transaction_queue_distribute_manager.release_distribute_queue(code) + if code in self.temp_order_queue_dict: + self.temp_order_queue_dict[code].clear() + self.temp_order_queue_dict.pop(code) + if code in self.temp_transaction_queue_dict: + self.temp_transaction_queue_dict[code].clear() + self.temp_transaction_queue_dict.pop(code) def add_target_code(code): @@ -81,6 +148,7 @@ if code in target_codes_add_time: target_codes_add_time.pop(code) + def add_subscript_codes(codes): print("add_subscript_codes", codes) # 鍔犲叆涓婁紶闃熷垪 -- Gitblit v1.8.0