From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- outside_api_command_manager.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 95 insertions(+), 18 deletions(-) diff --git a/outside_api_command_manager.py b/outside_api_command_manager.py index 03c80c0..93b140f 100644 --- a/outside_api_command_manager.py +++ b/outside_api_command_manager.py @@ -11,12 +11,14 @@ # 蹇冭烦淇℃伅 from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager -from utils import middle_api_protocol +from log_module.log import logger_system, logger_request_api +from utils import middle_api_protocol, tool MSG_TYPE_HEART = "heart" # 鍛戒护淇℃伅 MSG_TYPE_CMD = "cmd" +CLIENT_TYPE_COMMON = "common" CLIENT_TYPE_TRADE = "trade" # 蹇冭烦鏃堕棿闂撮殧 @@ -32,17 +34,21 @@ OPERRATE_SET = 1 # 璁剧疆 OPERRATE_DELETE = 2 # 鍒犻櫎 OPERRATE_GET = 3 # 鑾峰彇 +OPERRATE_ADD = 4 # 鏂板 # 浠g爜鍚嶅崟绫诲瀷 CODE_LIST_WHITE = "white" CODE_LIST_BLACK = "black" CODE_LIST_WANT = "want" CODE_LIST_PAUSE_BUY = "pause_buy" +CODE_LIST_MUST_BUY = "must_buy" +CODE_LIST_GREEN = "green" # 绫诲瀷 API_TYPE_TRADE = "trade" # 浜ゆ槗 API_TYPE_TRADE_STATE = "trade_state" # 浜ゆ槗鐘舵�� API_TYPE_TRADE_MODE = "trade_mode" # 浜ゆ槗妯″紡 +API_TYPE_SELL_RULE = "sell_rule" # 鍗栧嚭瑙勫垯 API_TYPE_CODE_LIST = "code_list" # 浠g爜鍚嶅崟 API_TYPE_EXPORT_L2 = "export_l2" # 瀵煎嚭L2鏁版嵁 API_TYPE_INIT = "init" # 鍒濆鍖� @@ -50,6 +56,14 @@ API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 浠g爜灞炴�� API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 浠g爜浜ゆ槗鐘舵�� API_TYPE_GET_ENV = "get_env" # 鑾峰彇鐜淇℃伅 +API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 鍚屾L1闇�瑕佽闃呯殑浠g爜 +API_TYPE_SYSTEM_LOG = "system_log" # 绯荤粺鏃ュ織 +API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 浠庢暟鎹湇鍔″櫒鎷夊彇鏁版嵁 +API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 浠g爜浜ゆ槗淇℃伅 +API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2鏈夋晥鐩戝惉鏁伴噺 +API_TYPE_SAVE_RUNNING_DATA = "save_running_data" # 淇濆瓨杩愯鏃舵暟鎹� +API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 鑾峰彇浠g爜鎸佷粨淇℃伅 +API_TYPE_COMMON_REQUEST = "common_request" # 閫氱敤璇锋眰 class ActionCallback(object): @@ -63,6 +77,10 @@ # 浜ゆ槗妯″紡 def OnTradeMode(self, client_id, request_id, data): + pass + + # 鍗栧嚭瑙勫垯 + def OnSellRule(self, client_id, request_id, data): pass # 浠g爜鍚嶅崟 @@ -87,11 +105,37 @@ def OnGetEnvInfo(self, client_id, request_id, data): pass + def OnSyncL2SubscriptCodes(self, client_id, request_id): + pass + + def OnGetFromDataServer(self, client_id, request_id, data): + pass + + # 浠g爜鐨勪氦鏄撲俊鎭� + def OnGetCodeTradeInfo(self, client_id, request_id, data): + pass + + def OnGetActiveListenCount(self, client_id, request_id): + pass + + def OnSaveRunningData(self, client_id, request_id): + pass + + def OnGetCodePositionInfo(self, client_id, request_id, data): + pass + + def OnCommonRequest(self, client_id, request_id, data): + pass + # 浜ゆ槗鎸囦护绠$悊 # 浜ゆ槗鎸囦护绠$悊 class ApiCommandManager: + common_client_dict = {} trade_client_dict = {} + trade_client_count = 0 + common_client_count = 0 + _instance = None def __new__(cls, *args, **kwargs): @@ -119,16 +163,22 @@ # 鍙戦�佸績璺� cls.__heartbeats_thread(type, key, sk) cls.__listen_command_thread(type, key, sk) - print("create_and_run_client success", type, key) + # print("create_and_run_client success", type, key) + logger_request_api.info(f"鍒涘缓鏈湴socket璇锋眰瀹㈡埛绔細{type}") return key, sk @classmethod - def init(cls, addr, port, trade_action_callback, trade_client_count=20): - cls.trade_client_dict = {} + def init(cls, addr, port, trade_action_callback, common_client_count=20, trade_client_count=20): + cls.common_client_dict.clear() + cls.common_client_count = common_client_count + cls.trade_client_dict.clear() cls.trade_client_count = trade_client_count cls.action_callback = trade_action_callback cls.ip_port = (addr, port) + for i in range(common_client_count): + result = cls.__create_and_run_client(CLIENT_TYPE_COMMON, i) + cls.common_client_dict[result[0]] = result[1] for i in range(trade_client_count): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i) cls.trade_client_dict[result[0]] = result[1] @@ -140,8 +190,9 @@ try: result = socket_util.recv_data(sk)[0] if result: + start_time = time.time() try: - print("鎺ユ敹鏁版嵁", _type, result) + # print("鎺ユ敹鏁版嵁", _type, result) result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 杩斿洖鍐呭 @@ -150,15 +201,14 @@ data = result_json["data"] content_type = data["type"] - print("鎺ユ敹鍐呭", data) + # print("鎺ユ敹鍐呭", data) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): - print("绛惧悕閿欒") + # print("绛惧悕閿欒") # 绛惧悕鍑洪敊 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "绛惧悕閿欒"}) continue - if content_type == API_TYPE_TRADE: # 浜ゆ槗 cls.action_callback.OnTrade(client_id, request_id, data) @@ -166,6 +216,8 @@ cls.action_callback.OnTradeState(client_id, request_id, data) elif content_type == API_TYPE_TRADE_MODE: cls.action_callback.OnTradeMode(client_id, request_id, data) + elif content_type == API_TYPE_SELL_RULE: + cls.action_callback.OnSellRule(client_id, request_id, data) elif content_type == API_TYPE_CODE_LIST: cls.action_callback.OnCodeList(client_id, request_id, data) elif content_type == API_TYPE_EXPORT_L2: @@ -180,10 +232,29 @@ cls.action_callback.OnGetCodeTradeState(client_id, request_id, data) elif content_type == API_TYPE_GET_ENV: cls.action_callback.OnGetEnvInfo(client_id, request_id, data) + elif content_type == API_TYPE_SYNC_L1_TARGET_CODES: + cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id) + elif content_type == API_TYPE_SYSTEM_LOG: + cls.action_callback.OnSystemLog(client_id, request_id, data) + elif content_type == API_TYPE_GET_FROM_DATA_SERVER: + cls.action_callback.OnGetFromDataServer(client_id, request_id, data) + elif content_type == API_TYPE_CODE_TRADE_INFO: + cls.action_callback.OnGetCodeTradeInfo(client_id, request_id, data) + elif content_type == API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT: + cls.action_callback.OnGetActiveListenCount(client_id, request_id) + elif content_type == API_TYPE_SAVE_RUNNING_DATA: + cls.action_callback.OnSaveRunningData(client_id, request_id) + elif content_type == API_TYPE_GET_CODE_POSITION_INFO: + cls.action_callback.OnGetCodePositionInfo(client_id, request_id, data) + elif content_type == API_TYPE_COMMON_REQUEST: + cls.action_callback.OnCommonRequest(client_id, request_id, data) except Exception as e: logging.exception(e) - pass finally: + use_time = int(time.time() - start_time) + if use_time > 5: + result_json = json.loads(result) + logger_request_api.info(f"瓒呮椂5s浠ヤ笂锛歿result_json['data']['type']}") # 鍙戦�佸搷搴� sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) else: @@ -191,10 +262,12 @@ except Exception as e: logging.exception(e) - if _type == CLIENT_TYPE_TRADE: + if _type == CLIENT_TYPE_COMMON: + if client_id in cls.common_client_dict: + cls.common_client_dict.pop(client_id) + elif _type == CLIENT_TYPE_TRADE: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) - print("pop trade client", client_id) try: sk.close() except: @@ -206,14 +279,13 @@ def __heart_beats(cls, _type, client_id, sk): while True: try: - sk.send(SendResponseSkManager.format_response( - json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) + sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) # print("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id) except Exception as e: - logging.error("閿欒") - print("蹇冭烦淇℃伅鍙戦�佸け璐�", _type, client_id) - logging.exception(e) - if _type == CLIENT_TYPE_TRADE: + if _type == CLIENT_TYPE_COMMON: + if client_id in cls.common_client_dict: + cls.common_client_dict.pop(client_id) + elif _type == CLIENT_TYPE_TRADE: if client_id in cls.trade_client_dict: cls.trade_client_dict.pop(client_id) try: @@ -238,10 +310,15 @@ @classmethod def __maintain_client(cls): + logger_system.info(f"outside_api __maintain_client 绾跨▼ID:{tool.get_thread_id()}") while True: try: + if len(cls.common_client_dict) < cls.common_client_count: + for i in range(cls.common_client_count - len(cls.common_client_dict)): + result = cls.__create_and_run_client(CLIENT_TYPE_COMMON) + cls.common_client_dict[result[0]] = result[1] + if len(cls.trade_client_dict) < cls.trade_client_count: - print("__maintain_client", CLIENT_TYPE_TRADE, cls.trade_client_count - len(cls.trade_client_dict)) for i in range(cls.trade_client_count - len(cls.trade_client_dict)): result = cls.__create_and_run_client(CLIENT_TYPE_TRADE) cls.trade_client_dict[result[0]] = result[1] -- Gitblit v1.8.0