From 95e18d831b6e1e3509e24e1fe3eed9f1d0b70f6d Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期四, 31 七月 2025 14:13:27 +0800
Subject: [PATCH] 添加推送日志

---
 middle_server.py |  124 +++++++++++++++++++++++++++++++++++++----
 1 files changed, 111 insertions(+), 13 deletions(-)

diff --git a/middle_server.py b/middle_server.py
index 4e39e59..14393c2 100644
--- a/middle_server.py
+++ b/middle_server.py
@@ -1,4 +1,5 @@
 import builtins
+import copy
 import hashlib
 import json
 import logging
@@ -10,17 +11,34 @@
 import time
 
 import constant
-import log
 import socket_manager
 from db import mysql_data
 from db.redis_manager import RedisUtils, RedisManager
-from log import logger_debug, logger_request_debug
+from log_module import log, async_log_util
+from log_module.log import logger_debug, logger_response_size
 from middle_l1_data_server import L1DataManager
 from output import push_msg_manager
-from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
+from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
+    block_web_api
 from utils.juejin_util import JueJinHttpApi
 
 trade_data_request_queue = queue.Queue()
+
+__mysql_config_dict = {}
+
+
+def get_mysql_config(db_name):
+    """
+    鑾峰彇mysql鐨勯厤缃�
+    :param db_name:
+    :return:
+    """
+    if db_name in __mysql_config_dict:
+        return __mysql_config_dict.get(db_name)
+    config = copy.deepcopy(constant.MYSQL_CONFIG)
+    config["database"] = db_name
+    __mysql_config_dict[db_name] = config
+    return config
 
 
 class MyTCPServer(socketserver.TCPServer):
@@ -105,6 +123,7 @@
                             rid = data_json["rid"]
                             socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                             sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
+                            logger_debug.info(f"register:  {self.request.getpeername()}")
                             try:
                                 # print("瀹㈡埛绔�", ClientSocketManager.socket_client_dict)
                                 while True:
@@ -138,9 +157,18 @@
                             try:
                                 data = data_json["data"]
                                 datas = data["data"]
-                                print("l2_subscript_codes", data_json)
+                                # print("l2_subscript_codes", data_json)
                                 global_data_cache_util.huaxin_subscript_codes = datas
                                 global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
+                            finally:
+                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
+                        elif data_json["type"] == "l2_subscript_codes_rate":
+                            # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙
+                            try:
+                                data = data_json["data"]
+                                datas = data["data"]
+                                # print("l2_subscript_codes", data_json)
+                                global_data_cache_util.huaxin_subscript_codes_rate = datas
                             finally:
                                 sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                         elif data_json["type"] == "l2_position_subscript_codes":
@@ -221,7 +249,9 @@
                                             result = list(result)
                                         result_list.append(result)
                                     result_str = json.dumps({"code": 0, "data": result_list})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"redis:{len(result_bytes)}")
                             except Exception as e:
                                 logger_debug.exception(e)
                                 logger_debug.info(f"Redis鎿嶄綔鍑洪敊锛歞ata_json锛歿data_json}")
@@ -235,7 +265,8 @@
                                 db = data["db"]
                                 cmd = data["cmd"]
                                 args = data.get("args")
-                                mysql = mysql_data.Mysqldb()
+                                mysql_config = get_mysql_config(db)
+                                mysql = mysql_data.Mysqldb(mysql_config)
                                 method = getattr(mysql, cmd)
                                 args_ = []
                                 if args:
@@ -246,7 +277,10 @@
                                 args_ = tuple(args_)
                                 result = method(*args_)
                                 result_str = json.dumps({"code": 0, "data": result}, default=str)
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size,
+                                                    f"mysql:{len(result_bytes)}:#{str(data)[:100]}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -260,7 +294,9 @@
                                 params = data.get("params")
                                 result = JueJinHttpApi.request(path_, params)
                                 result_str = json.dumps({"code": 0, "data": result})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"juejin:{len(result_bytes)}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -273,8 +309,21 @@
                                 url = data["url"]
                                 data_ = data.get("data")
                                 result = kpl_api_util.request(url, data_)
+                                if data_.find("a=ZhiShuStockList_W8") >= 0:
+                                    # 鎸囨暟涓嬮潰鐨勬垚鍒嗚偂
+                                    result_obj = json.loads(result)
+                                    if "list" in result_obj:
+                                        fiexed_indexes = [0, 1, 2, 4, 6, 23, 24, 9, 40, 42]
+                                        for d in result_obj["list"]:
+                                            for i in range(len(d)):
+                                                if i in fiexed_indexes:
+                                                    continue
+                                                d[i] = 0
+                                        result = json.dumps(result_obj)
                                 result_str = json.dumps({"code": 0, "data": result})
-                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                                result_bytes = socket_util.load_header(result_str.encode(encoding='utf-8'))
+                                sk.sendall(result_bytes)
+                                async_log_util.info(logger_response_size, f"kpl:{len(result_bytes)}#{data_}")
                             except Exception as e:
                                 logging.exception(e)
                                 sk.sendall(socket_util.load_header(
@@ -298,13 +347,62 @@
                             sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                         elif data_json["type"] == 'l1_data':
                             datas = data_json["data"]
-                            L1DataManager.add_datas(datas)
+                            L1DataManager().add_datas(datas)
                             break
+                        elif data_json["type"] == 'get_l1_target_codes':
+                            # 鑾峰彇鐩爣浠g爜
+                            codes = L1DataManager().get_target_codes()
+                            result_str = json.dumps({"code": 0, "data": list(codes)})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                            break
+                        # 鑾峰彇涓夋柟鏉垮潡
+                        elif data_json["type"] == 'get_third_blocks':
+                            data = data_json["data"]
+                            data = data["data"]
+                            source = data["source"]
+                            code = data["code"]
+                            result_str = json.dumps({"code": 1, "msg": "source涓嶅尮閰�"})
+                            if source == 2:
+                                # 閫氳揪淇�
+                                try:
+                                    blocks = block_web_api.get_tdx_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
+                            elif source == 3:
+                                # 鍚岃姳椤�
+                                try:
+                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    try:
+                                        block_web_api.THSBlocksApi.load_hexin_v()
+                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
+                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                    except Exception as e1:
+                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
+                            elif source == 4:
+                                # 涓滄柟璐㈠瘜
+                                try:
+                                    blocks = block_web_api.get_eastmoney_block(code)
+                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
+                                except Exception as e:
+                                    result_str = json.dumps({"code": 1, "msg": str(e)})
+                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
+                            break
+                        elif data_json["type"] == 'low_suction':
+                            # TODO 浣庡惛閫氶亾
+                            datas = data_json["data"]
+                            pass
+
+
+
                     except Exception as e:
                         log.logger_tuoguan_request_debug.exception(e)
                     finally:
                         if time.time() - __start_time > 2:
-                            log.logger_tuoguan_request_debug.info(f"鑰楁椂锛歿int(time.time() - __start_time)}s  鏁版嵁锛歿data_json}")
+                            log.logger_tuoguan_request_debug.info(
+                                f"鑰楁椂锛歿int(time.time() - __start_time)}s  鏁版嵁锛歿data_json}")
                 else:
                     # 鏂紑杩炴帴
                     break
@@ -341,7 +439,7 @@
                 pass
 
 
-def run(port =  constant.MIDDLE_SERVER_PORT):
+def run(port=constant.MIDDLE_SERVER_PORT):
     print("create MiddleServer")
     t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
     t1.start()
@@ -353,4 +451,4 @@
 
 
 if __name__ == "__main__":
-    print(builtins.type("")==str)
+    pass

--
Gitblit v1.8.0