From 6a0d3ff5832e57ee1b1374d086f24b3c1679b332 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 05 九月 2025 18:22:24 +0800
Subject: [PATCH] bug修复/降低测撤单率

---
 huaxin_client/command_manager.py |  173 ++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 133 insertions(+), 40 deletions(-)

diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py
index 785ea4c..63ff02f 100644
--- a/huaxin_client/command_manager.py
+++ b/huaxin_client/command_manager.py
@@ -5,11 +5,16 @@
 import concurrent.futures
 import json
 import logging
+import multiprocessing
 import threading
+import time
+
+import zmq
 
 from huaxin_client import socket_util
 from huaxin_client.client_network import SendResponseSkManager
-from log_module.log import logger_local_huaxin_trade_debug
+from log_module import async_log_util
+from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug
 
 MSG_TYPE_HEART = "heart"
 # 鍛戒护淇℃伅
@@ -21,6 +26,7 @@
 CLIENT_TYPE_POSITION_LIST = "position_list"
 CLIENT_TYPE_MONEY = "money"
 CLIENT_TYPE_DEAL = "deal"
+CLIENT_TYPE_ORDER_FOUND_DETAIL = "order_found_detail"
 
 CLIENT_TYPE_CMD_L2 = "l2_cmd"
 
@@ -53,10 +59,13 @@
     def OnTest(self, client_id, request_id, data, sk):
         pass
 
+    def OnOrderFoundDetail(self, client_id, request_id, sk, data):
+        pass
+
 
 class L2ActionCallback(object):
     # 鐩戝惉L2鏁版嵁
-    def OnSetL2Position(self, client_id, request_id, codes_data):
+    def OnSetL2Position(self, codes_data):
         pass
 
 
@@ -64,7 +73,7 @@
 class TradeCommandManager:
     trade_client_dict = {}
     _instance = None
-    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
+    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
 
     def __new__(cls, *args, **kwargs):
         if not cls._instance:
@@ -72,34 +81,34 @@
         return cls._instance
 
     @classmethod
-    def init(cls, trade_action_callback, pipe_l2, pipe_strategy):
+    def init(cls, trade_action_callback: TradeActionCallback,
+             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
+             queue_strategy_trade_read_for_read: multiprocessing.Queue):
         cls.action_callback = trade_action_callback
-        cls.pipe_strategy = pipe_strategy
-        cls.pipe_l2 = pipe_l2
+        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
+        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
 
     @classmethod
     def process_command(cls, _type, client_id, result_json, sk=None):
+        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command锛� {result_json}")
         # 鏌ョ湅鏄惁鏄缃甃2鐨勪唬鐮�
-        if _type == CLIENT_TYPE_CMD_L2:
-            cls.pipe_l2.send(
-                json.dumps({"type": "set_l2_codes", "data": result_json["data"]}))
-            return
-
         try:
             data = result_json["data"]
-            print("鎺ユ敹鍐呭", result_json)
             request_id = result_json.get('request_id')
-            if not socket_util.is_client_params_sign_right(result_json):
-                print("绛惧悕閿欒")
-                # 绛惧悕鍑洪敊
-                SendResponseSkManager.send_error_response(_type, request_id, client_id,
-                                                          {"code": -1, "msg": "绛惧悕閿欒"})
-                return
+            # 鏆傛椂鍙栨秷绛惧悕
+            # if not socket_util.is_client_params_sign_right(result_json):
+            #     print("绛惧悕閿欒")
+            #     # 绛惧悕鍑洪敊
+            #     SendResponseSkManager.send_error_response(_type, request_id, client_id,
+            #                                               {"code": -1, "msg": "绛惧悕閿欒"})
+            #     return
 
             if _type == CLIENT_TYPE_TRADE:
                 # 浜ゆ槗
                 ctype = data["trade_type"]
+                async_log_util.info(logger_trade, f"浜ゆ槗寮�濮�:{request_id}")
                 cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
+                async_log_util.info(logger_trade, f"浜ゆ槗缁撴潫:{request_id}")
             elif _type == CLIENT_TYPE_MONEY:
                 cls.action_callback.OnMoney(client_id, request_id, sk)
             elif _type == CLIENT_TYPE_DEAL_LIST:
@@ -109,45 +118,136 @@
                 cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
             elif _type == CLIENT_TYPE_POSITION_LIST:
                 cls.action_callback.OnPositionList(client_id, request_id, sk)
+
+            elif _type == CLIENT_TYPE_ORDER_FOUND_DETAIL:
+                cls.action_callback.OnOrderFoundDetail(client_id, request_id, sk, data)
+
+
             elif _type == "test":
                 cls.action_callback.OnTest(client_id, request_id, data, sk)
         except Exception as e:
-            logger_local_huaxin_trade_debug.debug(f"process_command鍑洪敊锛歿result_json}")
-            logging.exception(e)
-            logging.error(result_json)
+            async_log_util.error(logger_local_huaxin_contact_debug, f"process_command鍑洪敊锛� {result_json}")
+            # logging.exception(e)
+            # logging.error(result_json)
 
     @classmethod
-    def run_process_command(cls, pipe_strategy):
-        if pipe_strategy is None:
+    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
+        if queue_strategy_trade is None:
             return
         # 鏈湴鍛戒护鎺ユ敹
         try:
             while True:
                 try:
-                    val = pipe_strategy.recv()
+                    val = queue_strategy_trade.get()
                     if val:
-                        val = json.loads(val)
-                        print("run_process_command", val)
                         _type = val["type"]
-                        threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start()
+                        if _type != "test":
+                            async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}")
+                        cls.process_command(_type, None, val)
                 except Exception as e:
                     logger_local_huaxin_trade_debug.exception(e)
+        except Exception as e:
+            async_log_util.exception(logger_local_huaxin_trade_debug, e)
+
+    @classmethod
+    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
+        if queue_strategy_trade_read_trade is None:
+            return
+        # 鏈湴鍛戒护鎺ユ敹
+        try:
+            while True:
+                try:
+                    val = queue_strategy_trade_read_trade.get()
+                    if val:
+                        _type = val["type"]
+                        if _type != "test":
+                            async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}")
+                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
+                except Exception as e:
+                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                     logging.exception(e)
         except Exception as e:
-            logger_local_huaxin_trade_debug.exception(e)
+            async_log_util.exception(logger_local_huaxin_trade_debug, e)
+
+    ###############ZEROMQ鍗忚鎺ユ敹鍛戒护#################
+    @classmethod
+    def __create_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹涓嬪崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv_json()
+            try:
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 1, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"涓嬪崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms   request_id:{request_id}")
+            except Exception as e:
+                logger_local_huaxin_trade_debug.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
+
+    @classmethod
+    def __create_cancel_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹鎾ゅ崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv_json()
+            try:
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 2, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"鎾ゅ崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms   request_id:{request_id}")
+
+            except Exception as e:
+                logger_local_huaxin_trade_debug.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
 
     # 缁存姢杩炴帴鏁扮殑绋冲畾
-    def run(self, blocking=True):
+    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
         if blocking:
-            self.run_process_command(self.pipe_strategy)
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
+            t1.start()
+            self.run_process_command(self.queue_strategy_trade_read)
         else:
             # 鎺ュ彈鍛戒护
-            t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True)
+            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
             t1.start()
 
 
 # L2鎸囦护绠$悊
 class L2CommandManager:
+    action_callback = None
 
     @classmethod
     def init(cls, l2_action_callback):
@@ -156,16 +256,9 @@
     @classmethod
     def process_command(cls, _type, client_id, result_json):
         data = result_json["data"]
-        request_id = result_json["request_id"]
-        ctype = data["type"]
-        if not socket_util.is_client_params_sign_right(result_json):
-            # 绛惧悕鍑洪敊
-            SendResponseSkManager.send_error_response(_type, request_id, client_id,
-                                                      {"code": -1, "msg": "绛惧悕閿欒"})
-            return
-        codes_data = data["data"]
+        ctype = result_json["type"]
         if ctype == CLIENT_TYPE_CMD_L2:
-            cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
+            cls.action_callback.OnSetL2Position(data)
 
 
 if __name__ == "__main__":

--
Gitblit v1.8.0