From 6a0d3ff5832e57ee1b1374d086f24b3c1679b332 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 05 九月 2025 18:22:24 +0800 Subject: [PATCH] bug修复/降低测撤单率 --- huaxin_client/command_manager.py | 209 ++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 153 insertions(+), 56 deletions(-) diff --git a/huaxin_client/command_manager.py b/huaxin_client/command_manager.py index 4c2e2bc..63ff02f 100644 --- a/huaxin_client/command_manager.py +++ b/huaxin_client/command_manager.py @@ -2,13 +2,19 @@ """ 鍛戒护绠$悊鍣� """ +import concurrent.futures import json import logging +import multiprocessing import threading +import time + +import zmq from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager -from log_module.log import logger_local_huaxin_trade_debug +from log_module import async_log_util +from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug MSG_TYPE_HEART = "heart" # 鍛戒护淇℃伅 @@ -20,6 +26,7 @@ CLIENT_TYPE_POSITION_LIST = "position_list" CLIENT_TYPE_MONEY = "money" CLIENT_TYPE_DEAL = "deal" +CLIENT_TYPE_ORDER_FOUND_DETAIL = "order_found_detail" CLIENT_TYPE_CMD_L2 = "l2_cmd" @@ -29,33 +36,36 @@ class TradeActionCallback(object): # 浜ゆ槗 - def OnTrade(self, client_id, request_id, type_, data): + def OnTrade(self, client_id, request_id, sk, type_, data): pass # 濮旀墭鍒楄〃 - def OnDelegateList(self, client_id, request_id): + def OnDelegateList(self, client_id, request_id, sk, can_cancel): pass # 鎴愪氦鍒楄〃 - def OnDealList(self, client_id, request_id): + def OnDealList(self, client_id, request_id, sk): pass # 鎴愪氦鍒楄〃 - def OnPositionList(self, client_id, request_id): + def OnPositionList(self, client_id, request_id, sk): pass # 鑾峰彇璧勯噾淇℃伅 - def OnMoney(self, client_id, request_id): + def OnMoney(self, client_id, request_id, sk): pass # 娴嬭瘯 - def OnTest(self, client_id, request_id, data): + def OnTest(self, client_id, request_id, data, sk): + pass + + def OnOrderFoundDetail(self, client_id, request_id, sk, data): pass class L2ActionCallback(object): # 鐩戝惉L2鏁版嵁 - def OnSetL2Position(self, client_id, request_id, codes_data): + def OnSetL2Position(self, codes_data): pass @@ -63,6 +73,7 @@ class TradeCommandManager: trade_client_dict = {} _instance = None + process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) def __new__(cls, *args, **kwargs): if not cls._instance: @@ -70,80 +81,173 @@ return cls._instance @classmethod - def init(cls, trade_action_callback, pipe_l2, pipe_strategy): + def init(cls, trade_action_callback: TradeActionCallback, + queue_strategy_trade_read_for_trade: multiprocessing.Queue, + queue_strategy_trade_read_for_read: multiprocessing.Queue): cls.action_callback = trade_action_callback - cls.pipe_strategy = pipe_strategy - cls.pipe_l2 = pipe_l2 + cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade + cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read @classmethod - def __process_command(cls, _type, client_id, result_json): + def process_command(cls, _type, client_id, result_json, sk=None): + async_log_util.info(logger_local_huaxin_contact_debug, f"process_command锛� {result_json}") + # 鏌ョ湅鏄惁鏄缃甃2鐨勪唬鐮� try: data = result_json["data"] - print("鎺ユ敹鍐呭", result_json) request_id = result_json.get('request_id') - if not socket_util.is_client_params_sign_right(result_json): - print("绛惧悕閿欒") - # 绛惧悕鍑洪敊 - SendResponseSkManager.send_error_response(_type, request_id, client_id, - {"code": -1, "msg": "绛惧悕閿欒"}) - return + # 鏆傛椂鍙栨秷绛惧悕 + # if not socket_util.is_client_params_sign_right(result_json): + # print("绛惧悕閿欒") + # # 绛惧悕鍑洪敊 + # SendResponseSkManager.send_error_response(_type, request_id, client_id, + # {"code": -1, "msg": "绛惧悕閿欒"}) + # return if _type == CLIENT_TYPE_TRADE: # 浜ゆ槗 ctype = data["trade_type"] - cls.action_callback.OnTrade(client_id, request_id, ctype, data) + async_log_util.info(logger_trade, f"浜ゆ槗寮�濮�:{request_id}") + cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data) + async_log_util.info(logger_trade, f"浜ゆ槗缁撴潫:{request_id}") elif _type == CLIENT_TYPE_MONEY: - cls.action_callback.OnMoney(client_id, request_id) + cls.action_callback.OnMoney(client_id, request_id, sk) elif _type == CLIENT_TYPE_DEAL_LIST: - cls.action_callback.OnDealList(client_id, request_id) + cls.action_callback.OnDealList(client_id, request_id, sk) elif _type == CLIENT_TYPE_DELEGATE_LIST: can_cancel = data["can_cancel"] - cls.action_callback.OnDelegateList(client_id, request_id, can_cancel) + cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel) elif _type == CLIENT_TYPE_POSITION_LIST: - cls.action_callback.OnPositionList(client_id, request_id) + cls.action_callback.OnPositionList(client_id, request_id, sk) + + elif _type == CLIENT_TYPE_ORDER_FOUND_DETAIL: + cls.action_callback.OnOrderFoundDetail(client_id, request_id, sk, data) + + elif _type == "test": - cls.action_callback.OnTest(client_id, request_id, data) + cls.action_callback.OnTest(client_id, request_id, data, sk) except Exception as e: - logger_local_huaxin_trade_debug.debug(f"__process_command鍑洪敊锛歿result_json}") - logging.exception(e) - logging.error(result_json) + async_log_util.error(logger_local_huaxin_contact_debug, f"process_command鍑洪敊锛� {result_json}") + # logging.exception(e) + # logging.error(result_json) @classmethod - def run_process_command(cls, pipe_strategy): - if pipe_strategy is None: + def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue): + if queue_strategy_trade is None: return # 鏈湴鍛戒护鎺ユ敹 + try: + while True: + try: + val = queue_strategy_trade.get() + if val: + _type = val["type"] + if _type != "test": + async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}") + cls.process_command(_type, None, val) + except Exception as e: + logger_local_huaxin_trade_debug.exception(e) + except Exception as e: + async_log_util.exception(logger_local_huaxin_trade_debug, e) + + @classmethod + def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue): + if queue_strategy_trade_read_trade is None: + return + # 鏈湴鍛戒护鎺ユ敹 + try: + while True: + try: + val = queue_strategy_trade_read_trade.get() + if val: + _type = val["type"] + if _type != "test": + async_log_util.info(logger_local_huaxin_contact_debug, f"鎺ュ彈鍒颁俊鎭細 {val}") + cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) + except Exception as e: + async_log_util.exception(logger_local_huaxin_trade_debug, e) + logging.exception(e) + except Exception as e: + async_log_util.exception(logger_local_huaxin_trade_debug, e) + + ###############ZEROMQ鍗忚鎺ユ敹鍛戒护################# + @classmethod + def __create_order_command_reciever(cls, ipc_addr): + """ + 鎺ユ敹涓嬪崟鍛戒护 + @param ipc_addr: ipc鍦板潃 + @return: + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(ipc_addr) while True: + data = socket.recv_json() try: - val = pipe_strategy.recv() - if val: - val = json.loads(val) - print("run_process_command",val) - _type = val["type"] - _data = val["data"] - # 鏌ョ湅鏄惁鏄缃甃2鐨勪唬鐮� - if _type == CLIENT_TYPE_CMD_L2: - cls.pipe_l2.send( - json.dumps({"type": "set_l2_codes", "data": _data})) - else: - t1 = threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True) - t1.start() + request_id = data.get('request_id') + use_time = time.time() - data.get('time') + data = data.get('data') + cls.action_callback.OnTrade(None, request_id, None, 1, data) + async_log_util.info(logger_local_huaxin_trade_debug, f"涓嬪崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms request_id:{request_id}") except Exception as e: logger_local_huaxin_trade_debug.exception(e) - logging.exception(e) + finally: + socket.send_string("SUCCESS") + + @classmethod + def __create_cancel_order_command_reciever(cls, ipc_addr): + """ + 鎺ユ敹鎾ゅ崟鍛戒护 + @param ipc_addr: ipc鍦板潃 + @return: + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(ipc_addr) + while True: + data = socket.recv_json() + try: + request_id = data.get('request_id') + use_time = time.time() - data.get('time') + data = data.get('data') + cls.action_callback.OnTrade(None, request_id, None, 2, data) + async_log_util.info(logger_local_huaxin_trade_debug, f"鎾ゅ崟閫氫俊鑰楁椂锛� {round(use_time*1000,3)}ms request_id:{request_id}") + + except Exception as e: + logger_local_huaxin_trade_debug.exception(e) + finally: + socket.send_string("SUCCESS") # 缁存姢杩炴帴鏁扮殑绋冲畾 - def run(self, blocking=True): + def run(self, order_ipc_addr, cancel_order_ipc_addr, blocking=True): if blocking: - self.run_process_command(self.pipe_strategy) + t1 = threading.Thread( + target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) + t1.start() + self.run_process_command(self.queue_strategy_trade_read) else: # 鎺ュ彈鍛戒护 - t1 = threading.Thread(target=lambda: self.run_process_command(self.pipe_strategy), daemon=True) + t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_order_command_reciever(order_ipc_addr), daemon=True) + t1.start() + t1 = threading.Thread( + target=lambda: self.__create_cancel_order_command_reciever(cancel_order_ipc_addr), daemon=True) t1.start() # L2鎸囦护绠$悊 class L2CommandManager: + action_callback = None @classmethod def init(cls, l2_action_callback): @@ -152,16 +256,9 @@ @classmethod def process_command(cls, _type, client_id, result_json): data = result_json["data"] - request_id = result_json["request_id"] - ctype = data["type"] - if not socket_util.is_client_params_sign_right(result_json): - # 绛惧悕鍑洪敊 - SendResponseSkManager.send_error_response(_type, request_id, client_id, - {"code": -1, "msg": "绛惧悕閿欒"}) - return - codes_data = data["data"] + ctype = result_json["type"] if ctype == CLIENT_TYPE_CMD_L2: - cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) + cls.action_callback.OnSetL2Position(data) if __name__ == "__main__": -- Gitblit v1.8.0