From 27e49e5782e07566aac42d6363bd5233bf5e396d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 16 五月 2024 19:19:51 +0800 Subject: [PATCH] 可转债仿真交易/print方法替换 --- utils/outside_api_command_manager.py | 179 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 173 insertions(+), 6 deletions(-) diff --git a/utils/outside_api_command_manager.py b/utils/outside_api_command_manager.py index ce1fddc..d65631a 100644 --- a/utils/outside_api_command_manager.py +++ b/utils/outside_api_command_manager.py @@ -10,7 +10,7 @@ # 蹇冭烦淇℃伅 from huaxin_client.client_network import SendResponseSkManager -from log_module.log import logger_system, logger_request_api +from log_module.log import logger_system, logger_request_api, printlog from utils import middle_api_protocol, tool, socket_util MSG_TYPE_HEART = "heart" @@ -97,7 +97,7 @@ # 鍙戦�佸績璺� cls.__heartbeats_thread(type, key, sk) cls.__listen_command_thread(type, key, sk) - print("create_and_run_client success", type, key) + printlog("create_and_run_client success", type, key) return key, sk @classmethod @@ -119,7 +119,7 @@ if result: start_time = time.time() try: - print("鎺ユ敹鏁版嵁", _type, result) + printlog("鎺ユ敹鏁版嵁", _type, result) result_json = json.loads(result) if result_json["type"] == MSG_TYPE_HEART: # 杩斿洖鍐呭 @@ -128,10 +128,10 @@ data = result_json["data"] content_type = data["type"] - print("鎺ユ敹鍐呭", data) + printlog("鎺ユ敹鍐呭", data) request_id = result_json.get('request_id') if not socket_util.is_client_params_sign_right(result_json): - print("绛惧悕閿欒") + printlog("绛惧悕閿欒") # 绛惧悕鍑洪敊 SendResponseSkManager.send_error_response(_type, request_id, client_id, {"code": -1, "msg": "绛惧悕閿欒"}) @@ -176,7 +176,7 @@ while True: try: sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) - # print("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id) + # printlog("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id) except Exception as e: if _type == CLIENT_TYPE_TRADE_SELL: if client_id in cls.trade_client_dict: @@ -225,6 +225,173 @@ t1.start() +class NewApiCommandManager: + """ + 鏂扮増浜ゆ槗鎸囦护绠$悊 + """ + client_dict = {} # 淇濆瓨褰撳墠鐨勫鎴风,鏍煎紡:{client_type:{client_id:socket}} + client_count_dict = {} # 姣忕client鐨勬渶澶т釜鏁�,鏍煎紡锛歿client_type:count} + action_callback = None + + _instance = None + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super().__new__(cls, *args, **kwargs) + return cls._instance + + @classmethod + def __create_client(cls, client_type, rid): + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 鐢熸垚socket锛岃繛鎺erver + # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) + # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000)) + client.connect(cls.ip_port) + client.send(SendResponseSkManager.format_response( + json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8"))) + client.recv(1024) + return client + + @classmethod + def __create_and_run_client(cls, type, index=None): + key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}" + if index is not None: + key += f"_{index}" + sk = cls.__create_client(type, key) + # 鍙戦�佸績璺� + cls.__heartbeats_thread(type, key, sk) + cls.__listen_command_thread(type, key, sk) + printlog("create_and_run_client success", type, key) + return key, sk + + @classmethod + def init(cls, addr, port, action_callback, clients_info): + """ + 鍒濆鍖� + :param addr: 鏈嶅姟鍣ㄥ湴鍧� + :param port: 鏈嶅姟鍣ㄧ鍙� + :param trade_action_callback: 鍥炶皟 + :param clients_info: 瀹㈡埛绔俊鎭細[(绫诲瀷,鏁伴噺)] + :return: + """ + cls.client_dict.clear() + cls.client_count_dict.clear() + cls.action_callback = action_callback + cls.ip_port = (addr, port) + # 鍒濆鍖� + for client_info in clients_info: + cls.client_dict[client_info[0]] = {} + cls.client_count_dict[client_info[0]] = client_info[1] + # 鍒涘缓杩炴帴瀹㈡埛绔� + for client_type in cls.client_count_dict: + for i in range(cls.client_count_dict[client_type]): + result = cls.__create_and_run_client(client_type, i) + cls.client_dict[client_type][result[0]] = result[1] + + # 鍚彇鎸囦护 + @classmethod + def __listen_command(cls, _type, client_id, sk): + while True: + try: + result = socket_util.recv_data(sk)[0] + if result: + start_time = time.time() + try: + printlog("鎺ユ敹鏁版嵁", _type, result) + result_json = json.loads(result) + if result_json["type"] == MSG_TYPE_HEART: + # 杩斿洖鍐呭 + sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')) + continue + + data = result_json["data"] + content_type = data["type"] + printlog("鎺ユ敹鍐呭", data) + request_id = result_json.get('request_id') + if not socket_util.is_client_params_sign_right(result_json): + printlog("绛惧悕閿欒") + # 绛惧悕鍑洪敊 + SendResponseSkManager.send_error_response(_type, request_id, client_id, + {"code": -1, "msg": "绛惧悕閿欒"}) + continue + cls.action_callback(client_id, request_id, data) + except Exception as e: + logging.exception(e) + finally: + use_time = int(time.time() - start_time) + if use_time > 5: + result_json = json.loads(result) + logger_request_api.info(f"瓒呮椂5s浠ヤ笂锛歿result_json['data']['type']}") + # 鍙戦�佸搷搴� + sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8')) + else: + raise Exception("鎺ユ敹鐨勫唴瀹逛负绌�") + except Exception as e: + logging.exception(e) + if _type in cls.client_dict: + if client_id in cls.client_dict[_type]: + cls.client_dict[_type].pop(client_id) + try: + sk.close() + except: + pass + # 缁撴潫褰撳墠鐨勬秷鎭惊鐜� + break + + @classmethod + def __heart_beats(cls, _type, client_id, sk): + while True: + try: + sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))) + # printlog("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id) + except Exception as e: + if _type in cls.client_dict: + if client_id in cls.client_dict[_type]: + cls.client_dict[_type].pop(client_id) + try: + sk.close() + except: + pass + # 缁撴潫褰撳墠鐨勬秷鎭惊鐜� + break + time.sleep(HEART_SPACE_TIME) + + @classmethod + def __listen_command_thread(cls, _type, rid, sk): + t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk)) + t1.setDaemon(True) + t1.start() + + @classmethod + def __heartbeats_thread(cls, _type, rid, sk): + t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk)) + t1.setDaemon(True) + t1.start() + + @classmethod + def __maintain_client(cls): + logger_system.info(f"outside_api __maintain_client 绾跨▼ID:{tool.get_thread_id()}") + while True: + try: + for client_type in cls.client_count_dict: + if len(cls.client_dict[client_type]) < cls.client_count_dict[client_type]: + for i in range(cls.client_count_dict[client_type] - len(cls.client_dict[client_type])): + result = cls.__create_and_run_client(client_type) + cls.client_dict[client_type][result[0]] = result[1] + except: + pass + time.sleep(1) + + # 缁存姢杩炴帴鏁扮殑绋冲畾 + def run(self, blocking=True): + # 缁存姢client + if blocking: + self.__maintain_client() + else: + t1 = threading.Thread(target=lambda: self.__maintain_client()) + t1.setDaemon(True) + t1.start() + + if __name__ == "__main__": manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, None) manager.run() -- Gitblit v1.8.0