Administrator
2023-11-15 dcb8ca6aebca9cef3672c007438f3f459988a921
huaxin_client/command_manager.py
@@ -74,9 +74,10 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_trade_read = queue_strategy_trade_read
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
@@ -128,6 +129,26 @@
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command(_type, None, val)
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    @classmethod
    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
        if queue_strategy_trade_read_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = queue_strategy_trade_read_trade.get()
                    if val:
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
@@ -138,11 +159,15 @@
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
# L2指令管理