From 1110af9cc42cbf6a3ebbb953f18585cb37ba5b8c Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期一, 08 一月 2024 15:24:35 +0800 Subject: [PATCH] bug修复/日志添加 --- middle_server.py | 303 +++++++++++++++++++++++++------------------------- 1 files changed, 152 insertions(+), 151 deletions(-) diff --git a/middle_server.py b/middle_server.py index 7226375..f757e7f 100644 --- a/middle_server.py +++ b/middle_server.py @@ -1,6 +1,4 @@ -import datetime import hashlib -import io import json import logging import queue @@ -14,7 +12,7 @@ import socket_manager from db import mysql_data from db.redis_manager import RedisUtils, RedisManager -from log import logger_debug +from log import logger_debug, logger_request_debug from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool from utils.juejin_util import JueJinHttpApi @@ -95,88 +93,67 @@ {"code": 100, "msg": f"JSON瑙f瀽澶辫触"}).encode( encoding='utf-8'))) continue - if data_json["type"] == 'register': - client_type = data_json["data"]["client_type"] - rid = data_json["rid"] - socket_manager.ClientSocketManager.add_client(client_type, rid, sk) - sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) - try: - # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict) - while True: - result, header = self.getRecvData(sk) - try: - resultJSON = json.loads(result) - if resultJSON["type"] == 'heart': - # 璁板綍娲昏穬瀹㈡埛绔� - socket_manager.ClientSocketManager.heart(resultJSON['client_id']) - except json.decoder.JSONDecodeError as e: - print("JSON瑙f瀽鍑洪敊", result, header) - if not result: - sk.close() - break - time.sleep(1) - except ConnectionResetError as ee: - socket_manager.ClientSocketManager.del_client(rid) - except Exception as e: - logging.exception(e) - - elif data_json["type"] == "response": - # 涓诲姩瑙﹀彂鐨勫搷搴� - try: - client_id = data_json["client_id"] - # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}") - # # 璁剧疆鍝嶅簲鍐呭 - hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data']) - finally: - sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) - elif data_json["type"] == "l2_subscript_codes": - # 璁剧疆璁㈤槄鐨勪唬鐮� - try: - data = data_json["data"] - datas = data["data"] - print("l2_subscript_codes", data_json) - global_data_cache_util.huaxin_subscript_codes = datas - global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() - finally: - sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) - elif data_json["type"] == "redis": - try: - data = data_json["data"] - ctype = data["ctype"] - - result_str = '' - if ctype == "queue_size": - # TODO 璁剧疆闃熷垪澶у皬 - result_str = json.dumps({"code": 0}) - elif ctype == "cmd": - data = data["data"] - db = data["db"] - cmd = data["cmd"] - key = data["key"] - args = data.get("args") - redis = RedisManager(db).getRedis() - method = getattr(RedisUtils, cmd) - args_ = [redis, key] - if args is not None: - if type(args) == tuple or type(args) == list: - args = list(args) - for a in args: - args_.append(a) - else: - args_.append(args) - args_ = tuple(args_) - result = method(*args_) - if type(result) == set: - result = list(result) - result_str = json.dumps({"code": 0, "data": result}) - elif ctype == "cmds": + thread_id = random.randint(0, 1000000) + logger_request_debug.info(f"middle_server 璇锋眰寮�濮�({thread_id})锛歿data_json.get('type')}") + try: + if data_json["type"] == 'register': + client_type = data_json["data"]["client_type"] + rid = data_json["rid"] + socket_manager.ClientSocketManager.add_client(client_type, rid, sk) + sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) + try: + # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict) + while True: + result, header = self.getRecvData(sk) + try: + resultJSON = json.loads(result) + if resultJSON["type"] == 'heart': + # 璁板綍娲昏穬瀹㈡埛绔� + socket_manager.ClientSocketManager.heart(resultJSON['client_id']) + except json.decoder.JSONDecodeError as e: + print("JSON瑙f瀽鍑洪敊", result, header) + if not result: + sk.close() + break + time.sleep(1) + except ConnectionResetError as ee: + socket_manager.ClientSocketManager.del_client(rid) + except Exception as e: + logging.exception(e) + elif data_json["type"] == "response": + # 涓诲姩瑙﹀彂鐨勫搷搴� + try: + client_id = data_json["client_id"] + # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}") + # # 璁剧疆鍝嶅簲鍐呭 + hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data']) + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_subscript_codes": + # 璁剧疆璁㈤槄鐨勪唬鐮� + try: + data = data_json["data"] datas = data["data"] - result_list=[] - for d in datas: - db = d["db"] - cmd = d["cmd"] - key = d["key"] - args = d.get("args") + print("l2_subscript_codes", data_json) + global_data_cache_util.huaxin_subscript_codes = datas + global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "redis": + try: + data = data_json["data"] + ctype = data["ctype"] + + result_str = '' + if ctype == "queue_size": + # TODO 璁剧疆闃熷垪澶у皬 + result_str = json.dumps({"code": 0}) + elif ctype == "cmd": + data = data["data"] + db = data["db"] + cmd = data["cmd"] + key = data["key"] + args = data.get("args") redis = RedisManager(db).getRedis() method = getattr(RedisUtils, cmd) args_ = [redis, key] @@ -191,76 +168,100 @@ result = method(*args_) if type(result) == set: result = list(result) - result_list.append(result) - result_str = json.dumps({"code": 0, "data": result_list}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logger_debug.exception(e) - logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}") - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "mysql": - try: + result_str = json.dumps({"code": 0, "data": result}) + elif ctype == "cmds": + datas = data["data"] + result_list=[] + for d in datas: + db = d["db"] + cmd = d["cmd"] + key = d["key"] + args = d.get("args") + redis = RedisManager(db).getRedis() + method = getattr(RedisUtils, cmd) + args_ = [redis, key] + if args is not None: + if type(args) == tuple or type(args) == list: + args = list(args) + for a in args: + args_.append(a) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + if type(result) == set: + result = list(result) + result_list.append(result) + result_str = json.dumps({"code": 0, "data": result_list}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + except Exception as e: + logger_debug.exception(e) + logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}") + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "mysql": + try: + data = data_json["data"] + data = data["data"] + db = data["db"] + cmd = data["cmd"] + args = data.get("args") + mysql = mysql_data.Mysqldb() + method = getattr(mysql, cmd) + args_ = [] + if args: + if type(args) == tuple or type(args) == list: + args_ = list(args) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + result_str = json.dumps({"code": 0, "data": result}, default=str) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "juejin": + # 鎺橀噾璇锋眰 + try: + data = data_json["data"] + data = data["data"] + path_ = data["path"] + params = data.get("params") + result = JueJinHttpApi.request(path_, params) + result_str = json.dumps({"code": 0, "data": result}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "kpl": + # 寮�鐩樺暒璇锋眰 + try: + data = data_json["data"] + data = data["data"] + url = data["url"] + data_ = data.get("data") + result = kpl_api_util.request(url, data_) + result_str = json.dumps({"code": 0, "data": result}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "kp_msg": + # 鐪嬬洏娑堟伅 data = data_json["data"] data = data["data"] - db = data["db"] - cmd = data["cmd"] - args = data.get("args") - mysql = mysql_data.Mysqldb() - method = getattr(mysql, cmd) - args_ = [] - if args: - if type(args) == tuple or type(args) == list: - args_ = list(args) - else: - args_.append(args) - args_ = tuple(args_) - result = method(*args_) - result_str = json.dumps({"code": 0, "data": result}, default=str) + msg = data["msg"] + kp_client_msg_manager.add_msg(msg) + result_str = json.dumps({"code": 0, "data": {}}) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "juejin": - # 鎺橀噾璇锋眰 - try: - data = data_json["data"] - data = data["data"] - path_ = data["path"] - params = data.get("params") - result = JueJinHttpApi.request(path_, params) - result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "kpl": - # 寮�鐩樺暒璇锋眰 - try: - data = data_json["data"] - data = data["data"] - url = data["url"] - data_ = data.get("data") - result = kpl_api_util.request(url, data_) - result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "kp_msg": - # 鐪嬬洏娑堟伅 - data = data_json["data"] - data = data["data"] - msg = data["msg"] - kp_client_msg_manager.add_msg(msg) - result_str = json.dumps({"code": 0, "data": {}}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - pass - + pass + finally: + logger_request_debug.info(f"middle_server 璇锋眰缁撴潫({thread_id})锛歿data_json.get('type')}") else: # 鏂紑杩炴帴 break -- Gitblit v1.8.0