From 95e18d831b6e1e3509e24e1fe3eed9f1d0b70f6d Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期四, 31 七月 2025 14:13:27 +0800 Subject: [PATCH] 添加推送日志 --- middle_server.py | 188 +++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 165 insertions(+), 23 deletions(-) diff --git a/middle_server.py b/middle_server.py index f757e7f..14393c2 100644 --- a/middle_server.py +++ b/middle_server.py @@ -1,3 +1,5 @@ +import builtins +import copy import hashlib import json import logging @@ -12,11 +14,31 @@ import socket_manager from db import mysql_data from db.redis_manager import RedisUtils, RedisManager -from log import logger_debug, logger_request_debug -from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool +from log_module import log, async_log_util +from log_module.log import logger_debug, logger_response_size +from middle_l1_data_server import L1DataManager +from output import push_msg_manager +from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \ + block_web_api from utils.juejin_util import JueJinHttpApi trade_data_request_queue = queue.Queue() + +__mysql_config_dict = {} + + +def get_mysql_config(db_name): + """ + 鑾峰彇mysql鐨勯厤缃� + :param db_name: + :return: + """ + if db_name in __mysql_config_dict: + return __mysql_config_dict.get(db_name) + config = copy.deepcopy(constant.MYSQL_CONFIG) + config["database"] = db_name + __mysql_config_dict[db_name] = config + return config class MyTCPServer(socketserver.TCPServer): @@ -93,14 +115,15 @@ {"code": 100, "msg": f"JSON瑙f瀽澶辫触"}).encode( encoding='utf-8'))) continue - thread_id = random.randint(0, 1000000) - logger_request_debug.info(f"middle_server 璇锋眰寮�濮�({thread_id})锛歿data_json.get('type')}") + type_ = data_json["type"] + __start_time = time.time() try: if data_json["type"] == 'register': client_type = data_json["data"]["client_type"] rid = data_json["rid"] socket_manager.ClientSocketManager.add_client(client_type, rid, sk) sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) + logger_debug.info(f"register: {self.request.getpeername()}") try: # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict) while True: @@ -134,9 +157,28 @@ try: data = data_json["data"] datas = data["data"] - print("l2_subscript_codes", data_json) + # print("l2_subscript_codes", data_json) global_data_cache_util.huaxin_subscript_codes = datas global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_subscript_codes_rate": + # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙 + try: + data = data_json["data"] + datas = data["data"] + # print("l2_subscript_codes", data_json) + global_data_cache_util.huaxin_subscript_codes_rate = datas + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_position_subscript_codes": + # 璁剧疆璁㈤槄鐨勪唬鐮� + try: + data = data_json["data"] + datas = data["data"] + print("l2_position_subscript_codes", data_json) + global_data_cache_util.huaxin_position_subscript_codes = datas + global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str() finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "redis": @@ -158,20 +200,27 @@ method = getattr(RedisUtils, cmd) args_ = [redis, key] if args is not None: - if type(args) == tuple or type(args) == list: + if builtins.type(args) == tuple or builtins.type(args) == list: args = list(args) - for a in args: - args_.append(a) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) else: args_.append(args) args_ = tuple(args_) result = method(*args_) - if type(result) == set: + if builtins.type(result) == set: result = list(result) result_str = json.dumps({"code": 0, "data": result}) elif ctype == "cmds": datas = data["data"] - result_list=[] + result_list = [] for d in datas: db = d["db"] cmd = d["cmd"] @@ -181,19 +230,28 @@ method = getattr(RedisUtils, cmd) args_ = [redis, key] if args is not None: - if type(args) == tuple or type(args) == list: + if builtins.type(args) == tuple or builtins.type(args) == list: args = list(args) - for a in args: - args_.append(a) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) else: args_.append(args) args_ = tuple(args_) result = method(*args_) - if type(result) == set: + if builtins.type(result) == set: result = list(result) result_list.append(result) result_str = json.dumps({"code": 0, "data": result_list}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"redis:{len(result_bytes)}") except Exception as e: logger_debug.exception(e) logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}") @@ -207,18 +265,22 @@ db = data["db"] cmd = data["cmd"] args = data.get("args") - mysql = mysql_data.Mysqldb() + mysql_config = get_mysql_config(db) + mysql = mysql_data.Mysqldb(mysql_config) method = getattr(mysql, cmd) args_ = [] if args: - if type(args) == tuple or type(args) == list: + if builtins.type(args) == tuple or builtins.type(args) == list: args_ = list(args) else: args_.append(args) args_ = tuple(args_) result = method(*args_) result_str = json.dumps({"code": 0, "data": result}, default=str) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, + f"mysql:{len(result_bytes)}:#{str(data)[:100]}") except Exception as e: logging.exception(e) sk.sendall(socket_util.load_header( @@ -232,7 +294,9 @@ params = data.get("params") result = JueJinHttpApi.request(path_, params) result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"juejin:{len(result_bytes)}") except Exception as e: logging.exception(e) sk.sendall(socket_util.load_header( @@ -245,8 +309,21 @@ url = data["url"] data_ = data.get("data") result = kpl_api_util.request(url, data_) + if data_.find("a=ZhiShuStockList_W8") >= 0: + # 鎸囨暟涓嬮潰鐨勬垚鍒嗚偂 + result_obj = json.loads(result) + if "list" in result_obj: + fiexed_indexes = [0, 1, 2, 4, 6, 23, 24, 9, 40, 42] + for d in result_obj["list"]: + for i in range(len(d)): + if i in fiexed_indexes: + continue + d[i] = 0 + result = json.dumps(result_obj) result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"kpl:{len(result_bytes)}#{data_}") except Exception as e: logging.exception(e) sk.sendall(socket_util.load_header( @@ -260,13 +337,78 @@ result_str = json.dumps({"code": 0, "data": {}}) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) pass + elif data_json["type"] == "push_msg": + data = data_json["data"]["data"] + _type = data["type"] + data = data.get("data") + logger_debug.info(f"鎺ㄩ�佹秷鎭細{data_json}") + push_msg_manager.push_msg(_type, data) + result_str = json.dumps({"code": 0, "data": {}}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + elif data_json["type"] == 'l1_data': + datas = data_json["data"] + L1DataManager().add_datas(datas) + break + elif data_json["type"] == 'get_l1_target_codes': + # 鑾峰彇鐩爣浠g爜 + codes = L1DataManager().get_target_codes() + result_str = json.dumps({"code": 0, "data": list(codes)}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + break + # 鑾峰彇涓夋柟鏉垮潡 + elif data_json["type"] == 'get_third_blocks': + data = data_json["data"] + data = data["data"] + source = data["source"] + code = data["code"] + result_str = json.dumps({"code": 1, "msg": "source涓嶅尮閰�"}) + if source == 2: + # 閫氳揪淇� + try: + blocks = block_web_api.get_tdx_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + result_str = json.dumps({"code": 1, "msg": str(e)}) + elif source == 3: + # 鍚岃姳椤� + try: + blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + try: + block_web_api.THSBlocksApi.load_hexin_v() + blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e1: + result_str = json.dumps({"code": 1, "msg": str(e1)}) + elif source == 4: + # 涓滄柟璐㈠瘜 + try: + blocks = block_web_api.get_eastmoney_block(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + result_str = json.dumps({"code": 1, "msg": str(e)}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + break + elif data_json["type"] == 'low_suction': + # TODO 浣庡惛閫氶亾 + datas = data_json["data"] + pass + + + + except Exception as e: + log.logger_tuoguan_request_debug.exception(e) finally: - logger_request_debug.info(f"middle_server 璇锋眰缁撴潫({thread_id})锛歿data_json.get('type')}") + if time.time() - __start_time > 2: + log.logger_tuoguan_request_debug.info( + f"鑰楁椂锛歿int(time.time() - __start_time)}s 鏁版嵁锛歿data_json}") else: # 鏂紑杩炴帴 break # sk.close() except Exception as e: + # log.logger_tuoguan_request_debug.exception(e) logging.exception(e) break @@ -297,12 +439,12 @@ pass -def run(): +def run(port=constant.MIDDLE_SERVER_PORT): print("create MiddleServer") t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) t1.start() - laddr = "0.0.0.0", constant.MIDDLE_SERVER_PORT + laddr = "0.0.0.0", port print("MiddleServer is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle tcpserver.serve_forever() -- Gitblit v1.8.0