From 6a0d3ff5832e57ee1b1374d086f24b3c1679b332 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 05 九月 2025 18:22:24 +0800
Subject: [PATCH] bug修复/降低测撤单率

---
 huaxin_client/command_manager.py |  209 ++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 153 insertions(+), 56 deletions(-)

diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py
index 4c2e2bc..63ff02f 100644
--- a/huaxin_client/command_manager.py
+++ b/huaxin_client/command_manager.py
@@ -2,13 +2,19 @@
 """
 鍛戒护绠$悊鍣�
 """
+import concurrent.futures
 import json
 import logging
+import multiprocessing
 import threading
+import time
+
+import zmq
 
 from huaxin_client import socket_util
 from huaxin_client.client_network import SendResponseSkManager
-from log_module.log import logger_local_huaxin_trade_debug
+from log_module import async_log_util
+from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug
 
 MSG_TYPE_HEART = "heart"
 # 鍛戒护淇℃伅
@@ -20,6 +26,7 @@
 CLIENT_TYPE_POSITION_LIST = "position_list"
 CLIENT_TYPE_MONEY = "money"
 CLIENT_TYPE_DEAL = "deal"
+CLIENT_TYPE_ORDER_FOUND_DETAIL = "order_found_detail"
 
 CLIENT_TYPE_CMD_L2 = "l2_cmd"
 
@@ -29,33 +36,36 @@
 
 class TradeActionCallback(object):
     # 浜ゆ槗
-    def OnTrade(self, client_id, request_id, type_, data):
+    def OnTrade(self, client_id, request_id, sk, type_, data):
         pass
 
     # 濮旀墭鍒楄〃
-    def OnDelegateList(self, client_id, request_id):
+    def OnDelegateList(self, client_id, request_id, sk, can_cancel):
         pass
 
     # 鎴愪氦鍒楄〃
-    def OnDealList(self, client_id, request_id):
+    def OnDealList(self, client_id, request_id, sk):
         pass
 
     # 鎴愪氦鍒楄〃
-    def OnPositionList(self, client_id, request_id):
+    def OnPositionList(self, client_id, request_id, sk):
         pass
 
     # 鑾峰彇璧勯噾淇℃伅
-    def OnMoney(self, client_id, request_id):
+    def OnMoney(self, client_id, request_id, sk):
         pass
 
     # 娴嬭瘯
-    def OnTest(self, client_id, request_id, data):
+    def OnTest(self, client_id, request_id, data, sk):
+        pass
+
+    def OnOrderFoundDetail(self, client_id, request_id, sk, data):
         pass
 
 
 class L2ActionCallback(object):
     # 鐩戝惉L2鏁版嵁
-    def OnSetL2Position(self, client_id, request_id, codes_data):
+    def OnSetL2Position(self, codes_data):
         pass
 
 
@@ -63,6 +73,7 @@
 class TradeCommandManager:
     trade_client_dict = {}
     _instance = None
+    process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
 
     def __new__(cls, *args, **kwargs):
         if not cls._instance:
@@ -70,80 +81,173 @@
         return cls._instance
 
     @classmethod
-    def init(cls, trade_action_callback, pipe_l2, pipe_strategy):
+    def init(cls, trade_action_callback: TradeActionCallback,
+             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
+             queue_strategy_trade_read_for_read: multiprocessing.Queue):
         cls.action_callback = trade_action_callback
-        cls.pipe_strategy = pipe_strategy
-        cls.pipe_l2 = pipe_l2
+        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
+        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
 
     @classmethod
-    def __process_command(cls, _type, client_id, result_json):
+    def process_command(cls, _type, client_id, result_json, sk=None):
+        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command锛� {result_json}")
+        # 鏌ョ湅鏄惁鏄缃甃2鐨勪唬鐮�
         try:
             data = result_json["data"]
-            print("鎺ユ敹鍐呭", result_json)
             request_id = result_json.get('request_id')
-            if not socket_util.is_client_params_sign_right(result_json):
-                print("绛惧悕閿欒")
-                # 绛惧悕鍑洪敊
-                SendResponseSkManager.send_error_response(_type, request_id, client_id,
-                                                          {"code": -1, "msg": "绛惧悕閿欒"})
-                return
+            # 鏆傛椂鍙栨秷绛惧悕
+            # if not socket_util.is_client_params_sign_right(result_json):
+            #     print("绛惧悕閿欒")
+            #     # 绛惧悕鍑洪敊
+            #     SendResponseSkManager.send_error_response(_type, request_id, client_id,
+            #                                               {"code": -1, "msg": "绛惧悕閿欒"})
+            #     return
 
             if _type == CLIENT_TYPE_TRADE:
                 # 浜ゆ槗
                 ctype = data["trade_type"]
-                cls.action_callback.OnTrade(client_id, request_id, ctype, data)
+                async_log_util.info(logger_trade, f"浜ゆ槗寮�濮�:{request_id}")
+                cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
+                async_log_util.info(logger_trade, f"浜ゆ槗缁撴潫:{request_id}")
             elif _type == CLIENT_TYPE_MONEY:
-                cls.action_callback.OnMoney(client_id, request_id)
+                cls.action_callback.OnMoney(client_id, request_id, sk)
             elif _type == CLIENT_TYPE_DEAL_LIST:
-                cls.action_callback.OnDealList(client_id, request_id)
+                cls.action_callback.OnDealList(client_id, request_id, sk)
             elif _type == CLIENT_TYPE_DELEGATE_LIST:
                 can_cancel = data["can_cancel"]
-                cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
+                cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel)
             elif _type == CLIENT_TYPE_POSITION_LIST:
-                cls.action_callback.OnPositionList(client_id, request_id)
+                cls.action_callback.OnPositionList(client_id, request_id, sk)
+
+            elif _type == CLIENT_TYPE_ORDER_FOUND_DETAIL:
+                cls.action_callback.OnOrderFoundDetail(client_id, request_id, sk, data)
+
+
             elif _type == "test":
-                cls.action_callback.OnTest(client_id, request_id, data)
+                cls.action_callback.OnTest(client_id, request_id, data, sk)
         except Exception as e:
-            logger_local_huaxin_trade_debug.debug(f"__process_command鍑洪敊锛歿result_json}")
-            logging.exception(e)
-            logging.error(result_json)
+            async_log_util.error(logger_local_huaxin_contact_debug, f"process_command鍑洪敊锛� {result_json}")
+            # logging.exception(e)
+            # logging.error(result_json)
 
     @classmethod
-    def run_process_command(cls, pipe_strategy):
-        if pipe_strategy is None:
+    def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue):
+        if queue_strategy_trade is None:
             return
         # 鏈湴鍛戒护鎺ユ敹
+        try:
+            while True:
+                try:
+                    val = queue_strategy_trade.get()
+                    if val:
+                        _type = val["type"]
+                        if _type != "test":
+                            async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}")
+                        cls.process_command(_type, None, val)
+                except Exception as e:
+                    logger_local_huaxin_trade_debug.exception(e)
+        except Exception as e:
+            async_log_util.exception(logger_local_huaxin_trade_debug, e)
+
+    @classmethod
+    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
+        if queue_strategy_trade_read_trade is None:
+            return
+        # 鏈湴鍛戒护鎺ユ敹
+        try:
+            while True:
+                try:
+                    val = queue_strategy_trade_read_trade.get()
+                    if val:
+                        _type = val["type"]
+                        if _type != "test":
+                            async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}")
+                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
+                except Exception as e:
+                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
+                    logging.exception(e)
+        except Exception as e:
+            async_log_util.exception(logger_local_huaxin_trade_debug, e)
+
+    ###############ZEROMQ鍗忚鎺ユ敹鍛戒护#################
+    @classmethod
+    def __create_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹涓嬪崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
         while True:
+            data = socket.recv_json()
             try:
-                val = pipe_strategy.recv()
-                if val:
-                    val = json.loads(val)
-                    print("run_process_command",val)
-                    _type = val["type"]
-                    _data = val["data"]
-                    # 鏌ョ湅鏄惁鏄缃甃2鐨勪唬鐮�
-                    if _type == CLIENT_TYPE_CMD_L2:
-                        cls.pipe_l2.send(
-                            json.dumps({"type": "set_l2_codes", "data": _data}))
-                    else:
-                        t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True)
-                        t1.start()
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 1, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"涓嬪崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms   request_id:{request_id}")
             except Exception as e:
                 logger_local_huaxin_trade_debug.exception(e)
-                logging.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
+
+    @classmethod
+    def __create_cancel_order_command_reciever(cls, ipc_addr):
+        """
+        鎺ユ敹鎾ゅ崟鍛戒护
+        @param ipc_addr: ipc鍦板潃
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv_json()
+            try:
+                request_id = data.get('request_id')
+                use_time = time.time() - data.get('time')
+                data = data.get('data')
+                cls.action_callback.OnTrade(None, request_id, None, 2, data)
+                async_log_util.info(logger_local_huaxin_trade_debug, f"鎾ゅ崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms   request_id:{request_id}")
+
+            except Exception as e:
+                logger_local_huaxin_trade_debug.exception(e)
+            finally:
+                socket.send_string("SUCCESS")
 
     # 缁存姢杩炴帴鏁扮殑绋冲畾
-    def run(self, blocking=True):
+    def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True):
         if blocking:
-            self.run_process_command(self.pipe_strategy)
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
+            t1.start()
+            self.run_process_command(self.queue_strategy_trade_read)
         else:
             # 鎺ュ彈鍛戒护
-            t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True)
+            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True)
+            t1.start()
+            t1 = threading.Thread(
+                target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True)
             t1.start()
 
 
 # L2鎸囦护绠$悊
 class L2CommandManager:
+    action_callback = None
 
     @classmethod
     def init(cls, l2_action_callback):
@@ -152,16 +256,9 @@
     @classmethod
     def process_command(cls, _type, client_id, result_json):
         data = result_json["data"]
-        request_id = result_json["request_id"]
-        ctype = data["type"]
-        if not socket_util.is_client_params_sign_right(result_json):
-            # 绛惧悕鍑洪敊
-            SendResponseSkManager.send_error_response(_type, request_id, client_id,
-                                                      {"code": -1, "msg": "绛惧悕閿欒"})
-            return
-        codes_data = data["data"]
+        ctype = result_json["type"]
         if ctype == CLIENT_TYPE_CMD_L2:
-            cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
+            cls.action_callback.OnSetL2Position(data)
 
 
 if __name__ == "__main__":

--
Gitblit v1.8.0