From 95e18d831b6e1e3509e24e1fe3eed9f1d0b70f6d Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期四, 31 七月 2025 14:13:27 +0800 Subject: [PATCH] 添加推送日志 --- middle_server.py | 425 +++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 301 insertions(+), 124 deletions(-) diff --git a/middle_server.py b/middle_server.py index e81f66c..14393c2 100644 --- a/middle_server.py +++ b/middle_server.py @@ -1,6 +1,6 @@ -import datetime +import builtins +import copy import hashlib -import io import json import logging import queue @@ -10,13 +10,35 @@ import threading import time +import constant import socket_manager from db import mysql_data from db.redis_manager import RedisUtils, RedisManager -from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util +from log_module import log, async_log_util +from log_module.log import logger_debug, logger_response_size +from middle_l1_data_server import L1DataManager +from output import push_msg_manager +from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \ + block_web_api from utils.juejin_util import JueJinHttpApi trade_data_request_queue = queue.Queue() + +__mysql_config_dict = {} + + +def get_mysql_config(db_name): + """ + 鑾峰彇mysql鐨勯厤缃� + :param db_name: + :return: + """ + if db_name in __mysql_config_dict: + return __mysql_config_dict.get(db_name) + config = copy.deepcopy(constant.MYSQL_CONFIG) + config["database"] = db_name + __mysql_config_dict[db_name] = config + return config class MyTCPServer(socketserver.TCPServer): @@ -93,146 +115,300 @@ {"code": 100, "msg": f"JSON瑙f瀽澶辫触"}).encode( encoding='utf-8'))) continue - print(data_json["type"]) - if data_json["type"] == 'register': - client_type = data_json["data"]["client_type"] - rid = data_json["rid"] - socket_manager.ClientSocketManager.add_client(client_type, rid, sk) - sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) - try: - # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict) - while True: - result, header = self.getRecvData(sk) - try: - resultJSON = json.loads(result) - if resultJSON["type"] == 'heart': - # 璁板綍娲昏穬瀹㈡埛绔� - socket_manager.ClientSocketManager.heart(resultJSON['client_id']) - except json.decoder.JSONDecodeError as e: - if not result: - sk.close() - print("JSON瑙f瀽鍑洪敊", result, header) - time.sleep(1) - except ConnectionResetError as ee: - socket_manager.ClientSocketManager.del_client(rid) - except Exception as e: - logging.exception(e) + type_ = data_json["type"] + __start_time = time.time() + try: + if data_json["type"] == 'register': + client_type = data_json["data"]["client_type"] + rid = data_json["rid"] + socket_manager.ClientSocketManager.add_client(client_type, rid, sk) + sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) + logger_debug.info(f"register: {self.request.getpeername()}") + try: + # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict) + while True: + result, header = self.getRecvData(sk) + try: + resultJSON = json.loads(result) + if resultJSON["type"] == 'heart': + # 璁板綍娲昏穬瀹㈡埛绔� + socket_manager.ClientSocketManager.heart(resultJSON['client_id']) + except json.decoder.JSONDecodeError as e: + print("JSON瑙f瀽鍑洪敊", result, header) + if not result: + sk.close() + break + time.sleep(1) + except ConnectionResetError as ee: + socket_manager.ClientSocketManager.del_client(rid) + except Exception as e: + logging.exception(e) + elif data_json["type"] == "response": + # 涓诲姩瑙﹀彂鐨勫搷搴� + try: + client_id = data_json["client_id"] + # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}") + # # 璁剧疆鍝嶅簲鍐呭 + hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data']) + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_subscript_codes": + # 璁剧疆璁㈤槄鐨勪唬鐮� + try: + data = data_json["data"] + datas = data["data"] + # print("l2_subscript_codes", data_json) + global_data_cache_util.huaxin_subscript_codes = datas + global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_subscript_codes_rate": + # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙 + try: + data = data_json["data"] + datas = data["data"] + # print("l2_subscript_codes", data_json) + global_data_cache_util.huaxin_subscript_codes_rate = datas + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_position_subscript_codes": + # 璁剧疆璁㈤槄鐨勪唬鐮� + try: + data = data_json["data"] + datas = data["data"] + print("l2_position_subscript_codes", data_json) + global_data_cache_util.huaxin_position_subscript_codes = datas + global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str() + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "redis": + try: + data = data_json["data"] + ctype = data["ctype"] - elif data_json["type"] == "response": - # 涓诲姩瑙﹀彂鐨勫搷搴� - try: - client_id = data_json["client_id"] - # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}") - # # 璁剧疆鍝嶅簲鍐呭 - hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data']) - finally: - sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) - elif data_json["type"] == "l2_subscript_codes": - # 璁剧疆璁㈤槄鐨勪唬鐮� - try: - data = data_json["data"] - datas = data["data"] - print("l2_subscript_codes", data_json) - global_data_cache_util.huaxin_subscript_codes = datas - finally: - sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) - elif data_json["type"] == "redis": - try: - data = data_json["data"] - ctype = data["ctype"] - data = data["data"] - result_str = '' - if ctype == "queue_size": - # TODO 璁剧疆闃熷垪澶у皬 - result_str = json.dumps({"code": 0}) - elif ctype == "cmd": + result_str = '' + if ctype == "queue_size": + # TODO 璁剧疆闃熷垪澶у皬 + result_str = json.dumps({"code": 0}) + elif ctype == "cmd": + data = data["data"] + db = data["db"] + cmd = data["cmd"] + key = data["key"] + args = data.get("args") + redis = RedisManager(db).getRedis() + method = getattr(RedisUtils, cmd) + args_ = [redis, key] + if args is not None: + if builtins.type(args) == tuple or builtins.type(args) == list: + args = list(args) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + if builtins.type(result) == set: + result = list(result) + result_str = json.dumps({"code": 0, "data": result}) + elif ctype == "cmds": + datas = data["data"] + result_list = [] + for d in datas: + db = d["db"] + cmd = d["cmd"] + key = d["key"] + args = d.get("args") + redis = RedisManager(db).getRedis() + method = getattr(RedisUtils, cmd) + args_ = [redis, key] + if args is not None: + if builtins.type(args) == tuple or builtins.type(args) == list: + args = list(args) + if cmd == "setex": + args_.append(args[0]) + if type(args[1]) == list: + args_.append(json.dumps(args[1])) + else: + args_.append(args[1]) + else: + for a in args: + args_.append(a) + else: + args_.append(args) + args_ = tuple(args_) + result = method(*args_) + if builtins.type(result) == set: + result = list(result) + result_list.append(result) + result_str = json.dumps({"code": 0, "data": result_list}) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"redis:{len(result_bytes)}") + except Exception as e: + logger_debug.exception(e) + logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}") + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "mysql": + try: + data = data_json["data"] + data = data["data"] db = data["db"] cmd = data["cmd"] - key = data["key"] args = data.get("args") - redis = RedisManager(db).getRedis() - method = getattr(RedisUtils, cmd) - args_ = [redis, key] + mysql_config = get_mysql_config(db) + mysql = mysql_data.Mysqldb(mysql_config) + method = getattr(mysql, cmd) + args_ = [] if args: - if type(args) == tuple or type(args) == list: - args = list(args) - for a in args: - args_.append(a) + if builtins.type(args) == tuple or builtins.type(args) == list: + args_ = list(args) else: args_.append(args) args_ = tuple(args_) result = method(*args_) + result_str = json.dumps({"code": 0, "data": result}, default=str) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, + f"mysql:{len(result_bytes)}:#{str(data)[:100]}") + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "juejin": + # 鎺橀噾璇锋眰 + try: + data = data_json["data"] + data = data["data"] + path_ = data["path"] + params = data.get("params") + result = JueJinHttpApi.request(path_, params) result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "mysql": - try: + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"juejin:{len(result_bytes)}") + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "kpl": + # 寮�鐩樺暒璇锋眰 + try: + data = data_json["data"] + data = data["data"] + url = data["url"] + data_ = data.get("data") + result = kpl_api_util.request(url, data_) + if data_.find("a=ZhiShuStockList_W8") >= 0: + # 鎸囨暟涓嬮潰鐨勬垚鍒嗚偂 + result_obj = json.loads(result) + if "list" in result_obj: + fiexed_indexes = [0, 1, 2, 4, 6, 23, 24, 9, 40, 42] + for d in result_obj["list"]: + for i in range(len(d)): + if i in fiexed_indexes: + continue + d[i] = 0 + result = json.dumps(result_obj) + result_str = json.dumps({"code": 0, "data": result}) + result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8')) + sk.sendall(result_bytes) + async_log_util.info(logger_response_size, f"kpl:{len(result_bytes)}#{data_}") + except Exception as e: + logging.exception(e) + sk.sendall(socket_util.load_header( + json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) + elif data_json["type"] == "kp_msg": + # 鐪嬬洏娑堟伅 data = data_json["data"] data = data["data"] - db = data["db"] - cmd = data["cmd"] - args = data.get("args") - mysql = mysql_data.Mysqldb() - method = getattr(mysql, cmd) - args_ = [] - if args: - if type(args) == tuple or type(args) == list: - args_ = list(args) - else: - args_.append(args) - args_ = tuple(args_) - result = method(*args_) - result_str = json.dumps({"code": 0, "data": result}) + msg = data["msg"] + kp_client_msg_manager.add_msg(msg) + result_str = json.dumps({"code": 0, "data": {}}) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "juejin": - # 鎺橀噾璇锋眰 - try: + pass + elif data_json["type"] == "push_msg": + data = data_json["data"]["data"] + _type = data["type"] + data = data.get("data") + logger_debug.info(f"鎺ㄩ�佹秷鎭細{data_json}") + push_msg_manager.push_msg(_type, data) + result_str = json.dumps({"code": 0, "data": {}}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + elif data_json["type"] == 'l1_data': + datas = data_json["data"] + L1DataManager().add_datas(datas) + break + elif data_json["type"] == 'get_l1_target_codes': + # 鑾峰彇鐩爣浠g爜 + codes = L1DataManager().get_target_codes() + result_str = json.dumps({"code": 0, "data": list(codes)}) + sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) + break + # 鑾峰彇涓夋柟鏉垮潡 + elif data_json["type"] == 'get_third_blocks': data = data_json["data"] data = data["data"] - path_ = data["path"] - params = data.get("params") - result = JueJinHttpApi.request(path_, params) - result_str = json.dumps({"code": 0, "data": result}) + source = data["source"] + code = data["code"] + result_str = json.dumps({"code": 1, "msg": "source涓嶅尮閰�"}) + if source == 2: + # 閫氳揪淇� + try: + blocks = block_web_api.get_tdx_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + result_str = json.dumps({"code": 1, "msg": str(e)}) + elif source == 3: + # 鍚岃姳椤� + try: + blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + try: + block_web_api.THSBlocksApi.load_hexin_v() + blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e1: + result_str = json.dumps({"code": 1, "msg": str(e1)}) + elif source == 4: + # 涓滄柟璐㈠瘜 + try: + blocks = block_web_api.get_eastmoney_block(code) + result_str = json.dumps({"code": 0, "data": list(blocks)}) + except Exception as e: + result_str = json.dumps({"code": 1, "msg": str(e)}) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "kpl": - # 寮�鐩樺暒璇锋眰 - try: - data = data_json["data"] - data = data["data"] - url = data["url"] - data_ = data.get("data") - result = kpl_api_util.request(url, data_) - result_str = json.dumps({"code": 0, "data": result}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - except Exception as e: - logging.exception(e) - sk.sendall(socket_util.load_header( - json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8'))) - elif data_json["type"] == "kp_msg": - # 鐪嬬洏娑堟伅 - data = data_json["data"] - data = data["data"] - msg = data["msg"] - kp_client_msg_manager.add_msg(msg) - result_str = json.dumps({"code": 0, "data": {}}) - sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) - pass + break + elif data_json["type"] == 'low_suction': + # TODO 浣庡惛閫氶亾 + datas = data_json["data"] + pass + + + + except Exception as e: + log.logger_tuoguan_request_debug.exception(e) + finally: + if time.time() - __start_time > 2: + log.logger_tuoguan_request_debug.info( + f"鑰楁椂锛歿int(time.time() - __start_time)}s 鏁版嵁锛歿data_json}") else: # 鏂紑杩炴帴 break # sk.close() except Exception as e: + # log.logger_tuoguan_request_debug.exception(e) logging.exception(e) break @@ -263,12 +439,13 @@ pass -def run(): +def run(port=constant.MIDDLE_SERVER_PORT): print("create MiddleServer") t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) t1.start() - laddr = "0.0.0.0", 10008 + laddr = "0.0.0.0", port + print("MiddleServer is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle tcpserver.serve_forever() -- Gitblit v1.8.0