From dcb8ca6aebca9cef3672c007438f3f459988a921 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 15 十一月 2023 12:37:51 +0800 Subject: [PATCH] 交易查询分离 --- huaxin_client/command_manager.py | 29 +++++++++++++++++++++++++++-- 1 files changed, 27 insertions(+), 2 deletions(-) diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py index e95b1d2..3ff2973 100644 --- a/huaxin_client/command_manager.py +++ b/huaxin_client/command_manager.py @@ -74,9 +74,10 @@ return cls._instance @classmethod - def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue): + def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue): cls.action_callback = trade_action_callback - cls.queue_strategy_trade_read = queue_strategy_trade_read + cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade + cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read @classmethod def process_command(cls, _type, client_id, result_json, sk=None): @@ -128,6 +129,26 @@ _type = val["type"] if _type != "test": async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}") + cls.process_command(_type, None, val) + except Exception as e: + async_log_util.exception(logger_local_huaxin_trade_debug, e) + logging.exception(e) + except Exception as e: + async_log_util.exception(logger_local_huaxin_trade_debug, e) + + @classmethod + def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): + if queue_strategy_trade_read_trade is None: + return + # 鏈湴鍛戒护鎺ユ敹 + try: + while True: + try: + val = queue_strategy_trade_read_trade.get() + if val: + _type = val["type"] + if _type != "test": + async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}") cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) except Exception as e: async_log_util.exception(logger_local_huaxin_trade_debug, e) @@ -138,11 +159,15 @@ # 缁存姢杩炴帴鏁扮殑绋冲畾 def run(self, blocking=True): if blocking: + t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() self.run_process_command(self.queue_strategy_trade_read) else: # 鎺ュ彈鍛戒护 t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) t1.start() + t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() # L2鎸囦护绠$悊 -- Gitblit v1.8.0