From 95e18d831b6e1e3509e24e1fe3eed9f1d0b70f6d Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期四, 31 七月 2025 14:13:27 +0800
Subject: [PATCH] 添加推送日志

---
 middle_server.py |  188 +++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 165 insertions(+), 23 deletions(-)

diff --git a/middle_server.py b/middle_server.py
index f757e7f..14393c2 100644
--- a/middle_server.py
+++ b/middle_server.py
@@ -1,3 +1,5 @@
+import builtins
+import copy
 import hashlib
 import json
 import logging
@@ -12,11 +14,31 @@
 import socket_manager
 from db import mysql_data
 from db.redis_manager import RedisUtils, RedisManager
-from log import logger_debug, logger_request_debug
-from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
+from log_module import log, async_log_util
+from log_module.log import logger_debug, logger_response_size
+from middle_l1_data_server import L1DataManager
+from output import push_msg_manager
+from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
+    block_web_api
 from utils.juejin_util import JueJinHttpApi
 
 trade_data_request_queue = queue.Queue()
+
+__mysql_config_dict = {}
+
+
+def get_mysql_config(db_name):
+    """
+    鑾峰彇mysql鐨勯厤缃�
+    :param db_name:
+    :return:
+    """
+    if db_name in __mysql_config_dict:
+        return __mysql_config_dict.get(db_name)
+    config = copy.deepcopy(constant.MYSQL_CONFIG)
+    config["database"] = db_name
+    __mysql_config_dict[db_name] = config
+    return config
 
 
 class MyTCPServer(socketserver.TCPServer):
@@ -93,14 +115,15 @@
                             {"code": 100, "msg": f"JSON瑙f瀽澶辫触"}).encode(
                             encoding='utf-8')))
                         continue
-                    thread_id = random.randint(0, 1000000)
-                    logger_request_debug.info(f"middle_server 璇锋眰寮�濮�({thread_id})锛歿data_json.get('type')}")
+                    type_ = data_json["type"]
+                    __start_time = time.time()
                     try:
                         if data_json["type"] == 'register':
                             client_type = data_json["data"]["client_type"]
                             rid = data_json["rid"]
                             socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                             sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
+                            logger_debug.info(f"register:  {self.request.getpeername()}")
                             try:
                                 # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict)
                                 while True:
@@ -134,9 +157,28 @@
                             try:
                                 data = data_json["data"]
                                 datas = data["data"]
-                                print("l2_subscript_codes", data_json)
+                                # print("l2_subscript_codes", data_json)
                                 global_data_cache_util.huaxin_subscript_codes = datas
                                 global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_subscript_codes_rate":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                # print("l2_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_subscript_codes_rate = datas
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_position_subscript_codes":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮�
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                print("l2_position_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_position_subscript_codes = datas
+                                global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str()
                             finally:
                                 sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                         elif data_json["type"] == "redis":
@@ -158,20 +200,27 @@
                                     method = getattr(RedisUtils, cmd)
                                     args_ = [redis, key]
                                     if args is not None:
-                                        if type(args) == tuple or type(args) == list:
+                                        if builtins.type(args) == tuple or builtins.type(args) == list:
                                             args = list(args)
-                                            for a in args:
-                                                args_.append(a)
+                                            if cmd == "setex":
+                                                args_.append(args[0])
+                                                if type(args[1]) == list:
+                                                    args_.append(json.dumps(args[1]))
+                                                else:
+                                                    args_.append(args[1])
+                                            else:
+                                                for a in args:
+                                                    args_.append(a)
                                         else:
                                             args_.append(args)
                                     args_ = tuple(args_)
                                     result = method(*args_)
-                                    if type(result) == set:
+                                    if builtins.type(result) == set:
                                         result = list(result)
                                     result_str = json.dumps({"code": 0, "data": result})
                                 elif ctype == "cmds":
                                     datas = data["data"]
-                                    result_list=[]
+                                    result_list = []
                                     for d in datas:
                                         db = d["db"]
                                         cmd = d["cmd"]
@@ -181,19 +230,28 @@
                                         method = getattr(RedisUtils, cmd)
                                         args_ = [redis, key]
                                         if args is not None:
-                                            if type(args) == tuple or type(args) == list:
+                                            if builtins.type(args) == tuple or builtins.type(args) == list:
                                                 args = list(args)
-                                                for a in args:
-                                                    args_.append(a)
+                                                if cmd == "setex":
+                                                    args_.append(args[0])
+                                                    if type(args[1]) == list:
+                                                        args_.append(json.dumps(args[1]))
+                                                    else:
+                                                        args_.append(args[1])
+                                                else:
+                                                    for a in args:
+                                                        args_.append(a)
                                             else:
                                                 args_.append(args)
                                         args_ = tuple(args_)
                                         result = method(*args_)
-                                        if type(result) == set:
+                                        if builtins.type(result) == set:
                                             result = list(result)
                                         result_list.append(result)
                                     result_str = json.dumps({"code": 0, "data": result_list})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"redis:{len(result_bytes)}")
                             except Exception as e:
                                 logger_debug.exception(e)
                                 logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}")
@@ -207,18 +265,22 @@
                                 db = data["db"]
                                 cmd = data["cmd"]
                                 args = data.get("args")
-                                mysql = mysql_data.Mysqldb()
+                                mysql_config = get_mysql_config(db)
+                                mysql = mysql_data.Mysqldb(mysql_config)
                                 method = getattr(mysql, cmd)
                                 args_ = []
                                 if args:
-                                    if type(args) == tuple or type(args) == list:
+                                    if builtins.type(args) == tuple or builtins.type(args) == list:
                                         args_ = list(args)
                                     else:
                                         args_.append(args)
                                 args_ = tuple(args_)
                                 result = method(*args_)
                                 result_str = json.dumps({"code": 0, "data": result}, default=str)
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size,
+                                                    f"mysql:{len(result_bytes)}:#{str(data)[:100]}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -232,7 +294,9 @@
                                 params = data.get("params")
                                 result = JueJinHttpApi.request(path_, params)
                                 result_str = json.dumps({"code": 0, "data": result})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"juejin:{len(result_bytes)}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -245,8 +309,21 @@
                                 url = data["url"]
                                 data_ = data.get("data")
                                 result = kpl_api_util.request(url, data_)
+                                if data_.find("a=ZhiShuStockList_W8") >= 0:
+                                    # 鎸囨暟涓嬮潰鐨勬垚鍒嗚偂
+                                    result_obj = json.loads(result)
+                                    if "list" in result_obj:
+                                        fiexed_indexes = [0, 1, 2, 4, 6, 23, 24, 9, 40, 42]
+                                        for d in result_obj["list"]:
+                                            for i in range(len(d)):
+                                                if i in fiexed_indexes:
+                                                    continue
+                                                d[i] = 0
+                                        result = json.dumps(result_obj)
                                 result_str = json.dumps({"code": 0, "data": result})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"kpl:{len(result_bytes)}#{data_}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -260,13 +337,78 @@
                             result_str = json.dumps({"code": 0, "data": {}})
                             sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                             pass
+                        elif data_json["type"] == "push_msg":
+                            data = data_json["data"]["data"]
+                            _type = data["type"]
+                            data = data.get("data")
+                            logger_debug.info(f"鎺ㄩ�佹秷鎭細{data_json}")
+                            push_msg_manager.push_msg(_type, data)
+                            result_str = json.dumps({"code": 0, "data": {}})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                        elif data_json["type"] == 'l1_data':
+                            datas = data_json["data"]
+                            L1DataManager().add_datas(datas)
+                            break
+                        elif data_json["type"] == 'get_l1_target_codes':
+                            # 鑾峰彇鐩爣浠g爜
+                            codes = L1DataManager().get_target_codes()
+                            result_str = json.dumps({"code": 0, "data": list(codes)})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                            break
+                        # 鑾峰彇涓夋柟鏉垮潡
+                        elif data_json["type"] == 'get_third_blocks':
+                            data = data_json["data"]
+                            data = data["data"]
+                            source = data["source"]
+                            code = data["code"]
+                            result_str = json.dumps({"code": 1, "msg": "source涓嶅尮閰�"})
+                            if source == 2:
+                                # 閫氳揪淇�
+                                try:
+                                    blocks = block_web_api.get_tdx_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
+                            elif source == 3:
+                                # 鍚岃姳椤�
+                                try:
+                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    try:
+                                        block_web_api.THSBlocksApi.load_hexin_v()
+                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                    except Exception as e1:
+                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
+                            elif source == 4:
+                                # 涓滄柟璐㈠瘜
+                                try:
+                                    blocks = block_web_api.get_eastmoney_block(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                            break
+                        elif data_json["type"] == 'low_suction':
+                            # TODO 浣庡惛閫氶亾
+                            datas = data_json["data"]
+                            pass
+
+
+
+                    except Exception as e:
+                        log.logger_tuoguan_request_debug.exception(e)
                     finally:
-                        logger_request_debug.info(f"middle_server 璇锋眰缁撴潫({thread_id})锛歿data_json.get('type')}")
+                        if time.time() - __start_time > 2:
+                            log.logger_tuoguan_request_debug.info(
+                                f"鑰楁椂锛歿int(time.time() - __start_time)}s  鏁版嵁锛歿data_json}")
                 else:
                     # 鏂紑杩炴帴
                     break
                 # sk.close()
             except Exception as e:
+                # log.logger_tuoguan_request_debug.exception(e)
                 logging.exception(e)
                 break
 
@@ -297,12 +439,12 @@
                 pass
 
 
-def run():
+def run(port=constant.MIDDLE_SERVER_PORT):
     print("create MiddleServer")
     t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
     t1.start()
 
-    laddr = "0.0.0.0", constant.MIDDLE_SERVER_PORT
+    laddr = "0.0.0.0", port
     print("MiddleServer is at: http://%s:%d/" % (laddr))
     tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle
     tcpserver.serve_forever()

--
Gitblit v1.8.0