From 95e18d831b6e1e3509e24e1fe3eed9f1d0b70f6d Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期四, 31 七月 2025 14:13:27 +0800
Subject: [PATCH] 添加推送日志

---
 middle_server.py |  425 +++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 301 insertions(+), 124 deletions(-)

diff --git a/middle_server.py b/middle_server.py
index e81f66c..14393c2 100644
--- a/middle_server.py
+++ b/middle_server.py
@@ -1,6 +1,6 @@
-import datetime
+import builtins
+import copy
 import hashlib
-import io
 import json
 import logging
 import queue
@@ -10,13 +10,35 @@
 import threading
 import time
 
+import constant
 import socket_manager
 from db import mysql_data
 from db.redis_manager import RedisUtils, RedisManager
-from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util
+from log_module import log, async_log_util
+from log_module.log import logger_debug, logger_response_size
+from middle_l1_data_server import L1DataManager
+from output import push_msg_manager
+from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
+    block_web_api
 from utils.juejin_util import JueJinHttpApi
 
 trade_data_request_queue = queue.Queue()
+
+__mysql_config_dict = {}
+
+
+def get_mysql_config(db_name):
+    """
+    鑾峰彇mysql鐨勯厤缃�
+    :param db_name:
+    :return:
+    """
+    if db_name in __mysql_config_dict:
+        return __mysql_config_dict.get(db_name)
+    config = copy.deepcopy(constant.MYSQL_CONFIG)
+    config["database"] = db_name
+    __mysql_config_dict[db_name] = config
+    return config
 
 
 class MyTCPServer(socketserver.TCPServer):
@@ -93,146 +115,300 @@
                             {"code": 100, "msg": f"JSON瑙f瀽澶辫触"}).encode(
                             encoding='utf-8')))
                         continue
-                    print(data_json["type"])
-                    if data_json["type"] == 'register':
-                        client_type = data_json["data"]["client_type"]
-                        rid = data_json["rid"]
-                        socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
-                        sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
-                        try:
-                            # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict)
-                            while True:
-                                result, header = self.getRecvData(sk)
-                                try:
-                                    resultJSON = json.loads(result)
-                                    if resultJSON["type"] == 'heart':
-                                        # 璁板綍娲昏穬瀹㈡埛绔�
-                                        socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
-                                except json.decoder.JSONDecodeError as e:
-                                    if not result:
-                                        sk.close()
-                                    print("JSON瑙f瀽鍑洪敊", result, header)
-                                time.sleep(1)
-                        except ConnectionResetError as ee:
-                            socket_manager.ClientSocketManager.del_client(rid)
-                        except Exception as e:
-                            logging.exception(e)
+                    type_ = data_json["type"]
+                    __start_time = time.time()
+                    try:
+                        if data_json["type"] == 'register':
+                            client_type = data_json["data"]["client_type"]
+                            rid = data_json["rid"]
+                            socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
+                            sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
+                            logger_debug.info(f"register:  {self.request.getpeername()}")
+                            try:
+                                # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict)
+                                while True:
+                                    result, header = self.getRecvData(sk)
+                                    try:
+                                        resultJSON = json.loads(result)
+                                        if resultJSON["type"] == 'heart':
+                                            # 璁板綍娲昏穬瀹㈡埛绔�
+                                            socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
+                                    except json.decoder.JSONDecodeError as e:
+                                        print("JSON瑙f瀽鍑洪敊", result, header)
+                                        if not result:
+                                            sk.close()
+                                            break
+                                    time.sleep(1)
+                            except ConnectionResetError as ee:
+                                socket_manager.ClientSocketManager.del_client(rid)
+                            except Exception as e:
+                                logging.exception(e)
+                        elif data_json["type"] == "response":
+                            # 涓诲姩瑙﹀彂鐨勫搷搴�
+                            try:
+                                client_id = data_json["client_id"]
+                                # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}")
+                                # # 璁剧疆鍝嶅簲鍐呭
+                                hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_subscript_codes":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮�
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                # print("l2_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_subscript_codes = datas
+                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_subscript_codes_rate":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                # print("l2_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_subscript_codes_rate = datas
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_position_subscript_codes":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮�
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                print("l2_position_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_position_subscript_codes = datas
+                                global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str()
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "redis":
+                            try:
+                                data = data_json["data"]
+                                ctype = data["ctype"]
 
-                    elif data_json["type"] == "response":
-                        # 涓诲姩瑙﹀彂鐨勫搷搴�
-                        try:
-                            client_id = data_json["client_id"]
-                            # hx_logger_trade_callback.info(f"response锛歳equest_id-{data_json['request_id']}")
-                            # # 璁剧疆鍝嶅簲鍐呭
-                            hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
-                        finally:
-                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "l2_subscript_codes":
-                        # 璁剧疆璁㈤槄鐨勪唬鐮�
-                        try:
-                            data = data_json["data"]
-                            datas = data["data"]
-                            print("l2_subscript_codes", data_json)
-                            global_data_cache_util.huaxin_subscript_codes = datas
-                        finally:
-                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "redis":
-                        try:
-                            data = data_json["data"]
-                            ctype = data["ctype"]
-                            data = data["data"]
-                            result_str = ''
-                            if ctype == "queue_size":
-                                # TODO 璁剧疆闃熷垪澶у皬
-                                result_str = json.dumps({"code": 0})
-                            elif ctype == "cmd":
+                                result_str = ''
+                                if ctype == "queue_size":
+                                    # TODO 璁剧疆闃熷垪澶у皬
+                                    result_str = json.dumps({"code": 0})
+                                elif ctype == "cmd":
+                                    data = data["data"]
+                                    db = data["db"]
+                                    cmd = data["cmd"]
+                                    key = data["key"]
+                                    args = data.get("args")
+                                    redis = RedisManager(db).getRedis()
+                                    method = getattr(RedisUtils, cmd)
+                                    args_ = [redis, key]
+                                    if args is not None:
+                                        if builtins.type(args) == tuple or builtins.type(args) == list:
+                                            args = list(args)
+                                            if cmd == "setex":
+                                                args_.append(args[0])
+                                                if type(args[1]) == list:
+                                                    args_.append(json.dumps(args[1]))
+                                                else:
+                                                    args_.append(args[1])
+                                            else:
+                                                for a in args:
+                                                    args_.append(a)
+                                        else:
+                                            args_.append(args)
+                                    args_ = tuple(args_)
+                                    result = method(*args_)
+                                    if builtins.type(result) == set:
+                                        result = list(result)
+                                    result_str = json.dumps({"code": 0, "data": result})
+                                elif ctype == "cmds":
+                                    datas = data["data"]
+                                    result_list = []
+                                    for d in datas:
+                                        db = d["db"]
+                                        cmd = d["cmd"]
+                                        key = d["key"]
+                                        args = d.get("args")
+                                        redis = RedisManager(db).getRedis()
+                                        method = getattr(RedisUtils, cmd)
+                                        args_ = [redis, key]
+                                        if args is not None:
+                                            if builtins.type(args) == tuple or builtins.type(args) == list:
+                                                args = list(args)
+                                                if cmd == "setex":
+                                                    args_.append(args[0])
+                                                    if type(args[1]) == list:
+                                                        args_.append(json.dumps(args[1]))
+                                                    else:
+                                                        args_.append(args[1])
+                                                else:
+                                                    for a in args:
+                                                        args_.append(a)
+                                            else:
+                                                args_.append(args)
+                                        args_ = tuple(args_)
+                                        result = method(*args_)
+                                        if builtins.type(result) == set:
+                                            result = list(result)
+                                        result_list.append(result)
+                                    result_str = json.dumps({"code": 0, "data": result_list})
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"redis:{len(result_bytes)}")
+                            except Exception as e:
+                                logger_debug.exception(e)
+                                logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}")
+                                logging.exception(e)
+                                sk.sendall(socket_util.load_header(
+                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "mysql":
+                            try:
+                                data = data_json["data"]
+                                data = data["data"]
                                 db = data["db"]
                                 cmd = data["cmd"]
-                                key = data["key"]
                                 args = data.get("args")
-                                redis = RedisManager(db).getRedis()
-                                method = getattr(RedisUtils, cmd)
-                                args_ = [redis, key]
+                                mysql_config = get_mysql_config(db)
+                                mysql = mysql_data.Mysqldb(mysql_config)
+                                method = getattr(mysql, cmd)
+                                args_ = []
                                 if args:
-                                    if type(args) == tuple or type(args) == list:
-                                        args = list(args)
-                                        for a in args:
-                                            args_.append(a)
+                                    if builtins.type(args) == tuple or builtins.type(args) == list:
+                                        args_ = list(args)
                                     else:
                                         args_.append(args)
                                 args_ = tuple(args_)
                                 result = method(*args_)
+                                result_str = json.dumps({"code": 0, "data": result}, default=str)
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size,
+                                                    f"mysql:{len(result_bytes)}:#{str(data)[:100]}")
+                            except Exception as e:
+                                logging.exception(e)
+                                sk.sendall(socket_util.load_header(
+                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "juejin":
+                            # 鎺橀噾璇锋眰
+                            try:
+                                data = data_json["data"]
+                                data = data["data"]
+                                path_ = data["path"]
+                                params = data.get("params")
+                                result = JueJinHttpApi.request(path_, params)
                                 result_str = json.dumps({"code": 0, "data": result})
-                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
-                        except Exception as e:
-                            logging.exception(e)
-                            sk.sendall(socket_util.load_header(
-                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "mysql":
-                        try:
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"juejin:{len(result_bytes)}")
+                            except Exception as e:
+                                logging.exception(e)
+                                sk.sendall(socket_util.load_header(
+                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "kpl":
+                            # 寮�鐩樺暒璇锋眰
+                            try:
+                                data = data_json["data"]
+                                data = data["data"]
+                                url = data["url"]
+                                data_ = data.get("data")
+                                result = kpl_api_util.request(url, data_)
+                                if data_.find("a=ZhiShuStockList_W8") >= 0:
+                                    # 鎸囨暟涓嬮潰鐨勬垚鍒嗚偂
+                                    result_obj = json.loads(result)
+                                    if "list" in result_obj:
+                                        fiexed_indexes = [0, 1, 2, 4, 6, 23, 24, 9, 40, 42]
+                                        for d in result_obj["list"]:
+                                            for i in range(len(d)):
+                                                if i in fiexed_indexes:
+                                                    continue
+                                                d[i] = 0
+                                        result = json.dumps(result_obj)
+                                result_str = json.dumps({"code": 0, "data": result})
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"kpl:{len(result_bytes)}#{data_}")
+                            except Exception as e:
+                                logging.exception(e)
+                                sk.sendall(socket_util.load_header(
+                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "kp_msg":
+                            # 鐪嬬洏娑堟伅
                             data = data_json["data"]
                             data = data["data"]
-                            db = data["db"]
-                            cmd = data["cmd"]
-                            args = data.get("args")
-                            mysql = mysql_data.Mysqldb()
-                            method = getattr(mysql, cmd)
-                            args_ = []
-                            if args:
-                                if type(args) == tuple or type(args) == list:
-                                    args_ = list(args)
-                                else:
-                                    args_.append(args)
-                            args_ = tuple(args_)
-                            result = method(*args_)
-                            result_str = json.dumps({"code": 0, "data": result})
+                            msg = data["msg"]
+                            kp_client_msg_manager.add_msg(msg)
+                            result_str = json.dumps({"code": 0, "data": {}})
                             sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
-                        except Exception as e:
-                            logging.exception(e)
-                            sk.sendall(socket_util.load_header(
-                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "juejin":
-                        # 鎺橀噾璇锋眰
-                        try:
+                            pass
+                        elif data_json["type"] == "push_msg":
+                            data = data_json["data"]["data"]
+                            _type = data["type"]
+                            data = data.get("data")
+                            logger_debug.info(f"鎺ㄩ�佹秷鎭細{data_json}")
+                            push_msg_manager.push_msg(_type, data)
+                            result_str = json.dumps({"code": 0, "data": {}})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                        elif data_json["type"] == 'l1_data':
+                            datas = data_json["data"]
+                            L1DataManager().add_datas(datas)
+                            break
+                        elif data_json["type"] == 'get_l1_target_codes':
+                            # 鑾峰彇鐩爣浠g爜
+                            codes = L1DataManager().get_target_codes()
+                            result_str = json.dumps({"code": 0, "data": list(codes)})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                            break
+                        # 鑾峰彇涓夋柟鏉垮潡
+                        elif data_json["type"] == 'get_third_blocks':
                             data = data_json["data"]
                             data = data["data"]
-                            path_ = data["path"]
-                            params = data.get("params")
-                            result = JueJinHttpApi.request(path_, params)
-                            result_str = json.dumps({"code": 0, "data": result})
+                            source = data["source"]
+                            code = data["code"]
+                            result_str = json.dumps({"code": 1, "msg": "source涓嶅尮閰�"})
+                            if source == 2:
+                                # 閫氳揪淇�
+                                try:
+                                    blocks = block_web_api.get_tdx_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
+                            elif source == 3:
+                                # 鍚岃姳椤�
+                                try:
+                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    try:
+                                        block_web_api.THSBlocksApi.load_hexin_v()
+                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                    except Exception as e1:
+                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
+                            elif source == 4:
+                                # 涓滄柟璐㈠瘜
+                                try:
+                                    blocks = block_web_api.get_eastmoney_block(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                             sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
-                        except Exception as e:
-                            logging.exception(e)
-                            sk.sendall(socket_util.load_header(
-                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "kpl":
-                        # 寮�鐩樺暒璇锋眰
-                        try:
-                            data = data_json["data"]
-                            data = data["data"]
-                            url = data["url"]
-                            data_ = data.get("data")
-                            result = kpl_api_util.request(url, data_)
-                            result_str = json.dumps({"code": 0, "data": result})
-                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
-                        except Exception as e:
-                            logging.exception(e)
-                            sk.sendall(socket_util.load_header(
-                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
-                    elif data_json["type"] == "kp_msg":
-                        # 鐪嬬洏娑堟伅
-                        data = data_json["data"]
-                        data = data["data"]
-                        msg = data["msg"]
-                        kp_client_msg_manager.add_msg(msg)
-                        result_str = json.dumps({"code": 0, "data": {}})
-                        sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
-                        pass
+                            break
+                        elif data_json["type"] == 'low_suction':
+                            # TODO 浣庡惛閫氶亾
+                            datas = data_json["data"]
+                            pass
+
+
+
+                    except Exception as e:
+                        log.logger_tuoguan_request_debug.exception(e)
+                    finally:
+                        if time.time() - __start_time > 2:
+                            log.logger_tuoguan_request_debug.info(
+                                f"鑰楁椂锛歿int(time.time() - __start_time)}s  鏁版嵁锛歿data_json}")
                 else:
                     # 鏂紑杩炴帴
                     break
                 # sk.close()
             except Exception as e:
+                # log.logger_tuoguan_request_debug.exception(e)
                 logging.exception(e)
                 break
 
@@ -263,12 +439,13 @@
                 pass
 
 
-def run():
+def run(port=constant.MIDDLE_SERVER_PORT):
     print("create MiddleServer")
     t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
     t1.start()
 
-    laddr = "0.0.0.0", 10008
+    laddr = "0.0.0.0", port
+    print("MiddleServer is at: http://%s:%d/" % (laddr))
     tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle
     tcpserver.serve_forever()
 

--
Gitblit v1.8.0