From 25a69761838501a775896eb8a2234cb8ee0a446d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 14 十一月 2023 17:27:13 +0800 Subject: [PATCH] bug修复 --- huaxin_client/l2_data_manager.py | 69 ++++++++++++++++++++++------------ 1 files changed, 44 insertions(+), 25 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index de05165..313c4d5 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -37,6 +37,9 @@ self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} self.filter_order_condition_dict = {} + self.upload_l2_data_task_dict = {} + self.l2_order_codes = set() + self.l2_transaction_codes = set() # 璁剧疆璁㈠崟杩囨护鏉′欢 def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, @@ -46,7 +49,8 @@ if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) else: - self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, special_volumes_expire_time] + self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, + special_volumes_expire_time] # 杩囨护璁㈠崟 def __filter_order(self, item): @@ -114,12 +118,34 @@ # 鍒嗛厤涓婁紶闃熷垪 def distribute_upload_queue(self, code): - self.order_queue_distribute_manager.distribute_queue(code) - self.transaction_queue_distribute_manager.distribute_queue(code) - self.temp_order_queue_dict[code] = collections.deque() - self.temp_transaction_queue_dict[code] = collections.deque() - threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() - threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() + if not self.order_queue_distribute_manager.get_distributed_queue(code): + self.order_queue_distribute_manager.distribute_queue(code) + if not self.transaction_queue_distribute_manager.get_distributed_queue(code): + self.transaction_queue_distribute_manager.distribute_queue(code) + if code not in self.temp_order_queue_dict: + self.temp_order_queue_dict[code] = collections.deque() + if code not in self.temp_transaction_queue_dict: + self.temp_transaction_queue_dict[code] = collections.deque() + + if code not in self.upload_l2_data_task_dict: + t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) + t1.start() + t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) + t2.start() + self.upload_l2_data_task_dict[code] = (t1, t2) + # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� + + def release_distributed_upload_queue(self, code): + self.order_queue_distribute_manager.release_distribute_queue(code) + self.transaction_queue_distribute_manager.release_distribute_queue(code) + if code in self.temp_order_queue_dict: + self.temp_order_queue_dict[code].clear() + self.temp_order_queue_dict.pop(code) + if code in self.temp_transaction_queue_dict: + self.temp_transaction_queue_dict[code].clear() + self.temp_transaction_queue_dict.pop(code) + if code in self.upload_l2_data_task_dict: + self.upload_l2_data_task_dict.pop(code) def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait((code, datas, time.time())) @@ -132,7 +158,7 @@ upload_queue = queue_info[1] while True: try: - if not q: + if len(q) > 0: data = q.popleft() # 鍓嶇疆鏁版嵁澶勭悊锛岃繃婊ゆ帀鏃犵敤鐨勬暟鎹� data = self.__filter_order(data) @@ -142,14 +168,16 @@ if temp_list: # 涓婁紶鏁版嵁 self.__upload_l2_data(code, upload_queue, temp_list) - temp_list.clear() + temp_list = [] else: if code not in self.temp_order_queue_dict: + self.l2_order_codes.discard(code) break + self.l2_order_codes.add(code) time.sleep(0.001) - except: - pass + except Exception as e: + logging.exception(e) finally: pass @@ -161,7 +189,7 @@ temp_list = [] while True: try: - if not q: + if len(q) > 0: data = q.popleft() data = self.__filter_transaction(data) if data: @@ -170,26 +198,17 @@ if temp_list: # 涓婁紶鏁版嵁 self.__upload_l2_data(code, upload_queue, temp_list) - temp_list.clear() + temp_list = [] else: if code not in self.temp_transaction_queue_dict: + self.l2_transaction_codes.discard(code) break + self.l2_transaction_codes.add(code) time.sleep(0.002) except: pass finally: pass - - # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� - def release_distributed_upload_queue(self, code): - self.order_queue_distribute_manager.release_distribute_queue(code) - self.transaction_queue_distribute_manager.release_distribute_queue(code) - if code in self.temp_order_queue_dict: - self.temp_order_queue_dict[code].clear() - self.temp_order_queue_dict.pop(code) - if code in self.temp_transaction_queue_dict: - self.temp_transaction_queue_dict[code].clear() - self.temp_transaction_queue_dict.pop(code) def add_target_code(code): @@ -293,7 +312,7 @@ def __test(): - code = "002073" + # 鍒嗛厤鏁版嵁 pass -- Gitblit v1.8.0