From 32203dcb2d06b93e4b6c81f9121b00531a91395e Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 06 六月 2025 18:43:07 +0800 Subject: [PATCH] bug修复 --- api/outside_api_command_manager.py | 273 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 273 insertions(+), 0 deletions(-) diff --git a/api/outside_api_command_manager.py b/api/outside_api_command_manager.py new file mode 100644 index 0000000..ba16e7a --- /dev/null +++ b/api/outside_api_command_manager.py @@ -0,0 +1,273 @@ +""" +澶栭儴鎺ュ彛绠$悊 +""" +import json +import logging +import random +import socket +import threading +import time + +# 蹇冭烦淇℃伅 +from huaxin_client import socket_util +from huaxin_client.client_network import SendResponseSkManager +from log_module.log import logger_system, logger_request_api +from utils import middle_api_protocol, tool + +MSG_TYPE_HEART = "heart" +# 鍛戒护淇℃伅 +MSG_TYPE_CMD = "cmd" + +CLIENT_TYPE_TRADE = "trade_low_suction" + +# 蹇冭烦鏃堕棿闂撮殧 +HEART_SPACE_TIME = 3 + +TRADE_DIRECTION_BUY = 1 +TRADE_DIRECTION_SELL = 2 + +TRADE_TYPE_ORDER = 1 +TRADE_TYPE_CANCEL_ORDER = 2 + +# 鏁版嵁鎿嶄綔 +OPERRATE_SET = 1 # 璁剧疆 +OPERRATE_DELETE = 2 # 鍒犻櫎 +OPERRATE_GET = 3 # 鑾峰彇 +OPERRATE_ADD = 4 # 鏂板 + +# 浠g爜鍚嶅崟绫诲瀷 +CODE_LIST_WHITE = "white" +CODE_LIST_BLACK = "black" +CODE_LIST_WANT = "want" +CODE_LIST_PAUSE_BUY = "pause_buy" +CODE_LIST_MUST_BUY = "must_buy" +CODE_LIST_GREEN = "green" + +# 绫诲瀷 +API_TYPE_TRADE = "trade" # 浜ゆ槗 +API_TYPE_TRADE_STATE = "trade_state" # 浜ゆ槗鐘舵�� +API_TYPE_TRADE_MODE = "trade_mode" # 浜ゆ槗妯″紡 +API_TYPE_SELL_RULE = "sell_rule" # 鍗栧嚭瑙勫垯 +API_TYPE_CODE_LIST = "code_list" # 浠g爜鍚嶅崟 +API_TYPE_EXPORT_L2 = "export_l2" # 瀵煎嚭L2鏁版嵁 +API_TYPE_INIT = "init" # 鍒濆鍖� +API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 浜ゆ槗鏁版嵁鍒锋柊 +API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 浠g爜灞炴�� +API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 浠g爜浜ゆ槗鐘舵�� +API_TYPE_GET_ENV = "get_env" # 鑾峰彇鐜淇℃伅 +API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 鍚屾L1闇�瑕佽闃呯殑浠g爜 +API_TYPE_SYSTEM_LOG = "system_log" # 绯荤粺鏃ュ織 +API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 浠庢暟鎹湇鍔″櫒鎷夊彇鏁版嵁 +API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 浠g爜浜ゆ槗淇℃伅 +API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2鏈夋晥鐩戝惉鏁伴噺 +API_TYPE_SAVE_RUNNING_DATA = "save_running_data" # 淇濆瓨杩愯鏃舵暟鎹� +API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 鑾峰彇浠g爜鎸佷粨淇℃伅 +API_TYPE_COMMON_REQUEST = "common_request" # 閫氱敤璇锋眰 + + +class ActionCallback(object): + # 浜ゆ槗 + def OnTrade(self, client_id, request_id, data): + pass + + # 浜ゆ槗鐘舵�� + def OnTradeState(self, client_id, request_id, data): + pass + + # 浜ゆ槗妯″紡 + def OnTradeMode(self, client_id, request_id, data): + pass + + # 鍗栧嚭瑙勫垯 + def OnSellRule(self, client_id, request_id, data): + pass + + # 浠g爜鍚嶅崟 + def OnCodeList(self, client_id, request_id, data): + pass + + def OnExportL2(self, client_id, request_id, data): + pass + + def OnEveryDayInit(self, client_id, request_id, data): + pass + + def OnRefreshTradeData(self, client_id, request_id, data): + pass + + def OnGetCodeAttribute(self, client_id, request_id, data): + pass + + def OnGetCodeTradeState(self, client_id, request_id, data): + pass + + def OnGetEnvInfo(self, client_id, request_id, data): + pass + + def OnSyncL2SubscriptCodes(self, client_id, request_id): + pass + + def OnGetFromDataServer(self, client_id, request_id, data): + pass + + # 浠g爜鐨勪氦鏄撲俊鎭� + def OnGetCodeTradeInfo(self, client_id, request_id, data): + pass + + def OnGetActiveListenCount(self, client_id, request_id): + pass + + def OnSaveRunningData(self, client_id, request_id): + pass + + def OnGetCodePositionInfo(self, client_id, request_id, data): + pass + + def OnCommonRequest(self, client_id, request_id, data): + pass + + +# 浜ゆ槗鎸囦护绠$悊 +# 浜ゆ槗鎸囦护绠$悊 +@tool.singleton +class ApiCommandManager: + trade_ls_client_dict = {} + trade_ls_client_count = 0 + + def __init__(self, addr, port, trade_action_callback, trade_ls_client_count=20): + self.trade_ls_client_dict.clear() + self.trade_ls_client_count = trade_ls_client_count + self.action_callback = trade_action_callback + self.ip_port = (addr, port) + + for i in range(trade_ls_client_count): + result = self.__create_and_run_client(CLIENT_TYPE_TRADE, i) + self.trade_ls_client_dict[result[0]] = result[1] + + def __create_client(self, client_type, rid): + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 鐢熸垚socket锛岃繛鎺erver + # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) + client.connect(self.ip_port) + client.send(SendResponseSkManager.format_response( + json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) + client.recv(1024) + return client + + def __create_and_run_client(self, type, index=None): + key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" + if index is not None: + key += f"_{index}" + sk = self.__create_client(type, key) + # 鍙戦�佸績璺� + self.__heartbeats_thread(type, key, sk) + self.__listen_command_thread(type, key, sk) + # print("create_and_run_client success", type, key) + logger_request_api.info(f"鍒涘缓鏈湴socket璇锋眰瀹㈡埛绔細{type}") + return key, sk + + # 鍚彇鎸囦护 + def __listen_command(self, _type, client_id, sk): + while True: + try: + result = socket_util.recv_data(sk)[0] + if result: + start_time = time.time() + try: + print("鎺ユ敹鏁版嵁", _type, result) + result_json = json.loads(result) + if result_json["type"] == MSG_TYPE_HEART: + # 杩斿洖鍐呭 + sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) + continue + + data = result_json["data"] + content_type = data["type"] + # print("鎺ユ敹鍐呭", data) + request_id = result_json.get('request_id') + if not socket_util.is_client_params_sign_right(result_json): + # print("绛惧悕閿欒") + # 绛惧悕鍑洪敊 + SendResponseSkManager.send_error_response(_type, request_id, client_id, + {"code": -1, "msg": "绛惧悕閿欒"}) + continue + if content_type == API_TYPE_COMMON_REQUEST: + self.action_callback.OnCommonRequest(client_id, request_id, data) + except Exception as e: + logging.exception(e) + finally: + use_time = int(time.time() - start_time) + if use_time > 5: + result_json = json.loads(result) + logger_request_api.info(f"瓒呮椂5s浠ヤ笂锛歿result_json['data']['type']}") + # 鍙戦�佸搷搴� + sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) + else: + raise Exception("鎺ユ敹鐨勫唴瀹逛负绌�") + + except Exception as e: + logging.exception(e) + if _type == CLIENT_TYPE_TRADE: + if client_id in self.trade_ls_client_dict: + self.trade_ls_client_dict.pop(client_id) + try: + sk.close() + except: + pass + # 缁撴潫褰撳墠鐨勬秷鎭惊鐜� + break + + def __heart_beats(self, _type, client_id, sk): + while True: + try: + sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) + # print("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id) + except Exception as e: + if _type == CLIENT_TYPE_TRADE: + if client_id in self.trade_ls_client_dict: + self.trade_ls_client_dict.pop(client_id) + try: + sk.close() + except: + pass + # 缁撴潫褰撳墠鐨勬秷鎭惊鐜� + break + time.sleep(HEART_SPACE_TIME) + + def __listen_command_thread(self, _type, rid, sk): + t1 = threading.Thread(target=lambda: self.__listen_command(_type, rid, sk)) + t1.setDaemon(True) + t1.start() + + def __heartbeats_thread(self, _type, rid, sk): + t1 = threading.Thread(target=lambda: self.__heart_beats(_type, rid, sk)) + t1.setDaemon(True) + t1.start() + + def __maintain_client(self): + logger_system.info(f"outside_api __maintain_client 绾跨▼ID:{tool.get_thread_id()}") + while True: + try: + if len(self.trade_ls_client_dict) < self.trade_ls_client_count: + for i in range(self.trade_ls_client_count - len(self.trade_ls_client_dict)): + result = self.__create_and_run_client(CLIENT_TYPE_TRADE) + self.trade_ls_client_dict[result[0]] = result[1] + except: + pass + time.sleep(1) + + # 缁存姢杩炴帴鏁扮殑绋冲畾 + def run(self, blocking=True): + # 缁存姢client + if blocking: + self.__maintain_client() + else: + t1 = threading.Thread(target=lambda: self.__maintain_client()) + t1.setDaemon(True) + t1.start() + + +if __name__ == "__main__": + manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, ActionCallback()) + manager.run() + input() -- Gitblit v1.8.0