admin
2025-06-04 287c506725b2d970f721f80169f83c2418cb0991
middle_server.py
@@ -1,4 +1,5 @@
import builtins
import copy
import hashlib
import json
import logging
@@ -10,16 +11,34 @@
import time
import constant
import log
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from log_module import log
from log_module.log import logger_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
    block_web_api
from utils.juejin_util import JueJinHttpApi
trade_data_request_queue = queue.Queue()
__mysql_config_dict = {}
def get_mysql_config(db_name):
    """
    获取mysql的配置
    :param db_name:
    :return:
    """
    if db_name in __mysql_config_dict:
        return __mysql_config_dict.get(db_name)
    config = copy.deepcopy(constant.MYSQL_CONFIG)
    config["database"] = db_name
    __mysql_config_dict[db_name] = config
    return config
class MyTCPServer(socketserver.TCPServer):
@@ -97,7 +116,7 @@
                            encoding='utf-8')))
                        continue
                    type_ = data_json["type"]
                    log.request_info("middle_server", f"请求开始:{type_}")
                    __start_time = time.time()
                    try:
                        if data_json["type"] == 'register':
                            client_type = data_json["data"]["client_type"]
@@ -137,9 +156,28 @@
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                print("l2_subscript_codes", data_json)
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes = datas
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_subscript_codes_rate":
                            # 设置订阅的代码的涨幅
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes_rate = datas
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_position_subscript_codes":
                            # 设置订阅的代码
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                print("l2_position_subscript_codes", data_json)
                                global_data_cache_util.huaxin_position_subscript_codes = datas
                                global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "redis":
@@ -164,7 +202,11 @@
                                        if builtins.type(args) == tuple or builtins.type(args) == list:
                                            args = list(args)
                                            if cmd == "setex":
                                                args_.append(json.dumps(args))
                                                args_.append(args[0])
                                                if type(args[1]) == list:
                                                    args_.append(json.dumps(args[1]))
                                                else:
                                                    args_.append(args[1])
                                            else:
                                                for a in args:
                                                    args_.append(a)
@@ -190,7 +232,11 @@
                                            if builtins.type(args) == tuple or builtins.type(args) == list:
                                                args = list(args)
                                                if cmd == "setex":
                                                    args_.append(json.dumps(args))
                                                    args_.append(args[0])
                                                    if type(args[1]) == list:
                                                        args_.append(json.dumps(args[1]))
                                                    else:
                                                        args_.append(args[1])
                                                else:
                                                    for a in args:
                                                        args_.append(a)
@@ -216,7 +262,8 @@
                                db = data["db"]
                                cmd = data["cmd"]
                                args = data.get("args")
                                mysql = mysql_data.Mysqldb()
                                mysql_config = get_mysql_config(db)
                                mysql = mysql_data.Mysqldb(mysql_config)
                                method = getattr(mysql, cmd)
                                args_ = []
                                if args:
@@ -270,22 +317,77 @@
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            pass
                        elif data_json["type"] == "push_msg":
                            data = data_json["data"]
                            data = data["data"]
                            data = data_json["data"]["data"]
                            _type = data["type"]
                            data = data.get("data")
                            logger_debug.info(f"推送消息:{data_json}")
                            push_msg_manager.__push_msg(_type, data)
                            push_msg_manager.push_msg(_type, data)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                    finally:
                        log.request_info("middle_server", f"请求结束")
                        elif data_json["type"] == 'l1_data':
                            datas = data_json["data"]
                            L1DataManager().add_datas(datas)
                            break
                        elif data_json["type"] == 'get_l1_target_codes':
                            # 获取目标代码
                            codes = L1DataManager().get_target_codes()
                            result_str = json.dumps({"code": 0, "data": list(codes)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                        # 获取三方板块
                        elif data_json["type"] == 'get_third_blocks':
                            data = data_json["data"]
                            data = data["data"]
                            source = data["source"]
                            code = data["code"]
                            result_str = json.dumps({"code": 1, "msg": "source不匹配"})
                            if source == 2:
                                # 通达信
                                try:
                                    blocks = block_web_api.get_tdx_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            elif source == 3:
                                # 同花顺
                                try:
                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    try:
                                        block_web_api.THSBlocksApi.load_hexin_v()
                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
                                    except Exception as e1:
                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
                            elif source == 4:
                                # 东方财富
                                try:
                                    blocks = block_web_api.get_eastmoney_block(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                        elif data_json["type"] == 'low_suction':
                            # TODO 低吸通道
                            datas = data_json["data"]
                            pass
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
                        if time.time() - __start_time > 2:
                            log.logger_tuoguan_request_debug.info(
                                f"耗时:{int(time.time() - __start_time)}s  数据:{data_json}")
                else:
                    # 断开连接
                    break
                # sk.close()
            except Exception as e:
                # log.logger_tuoguan_request_debug.exception(e)
                logging.exception(e)
                break
@@ -316,16 +418,16 @@
                pass
def run():
def run(port=constant.MIDDLE_SERVER_PORT):
    print("create MiddleServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", constant.MIDDLE_SERVER_PORT
    laddr = "0.0.0.0", port
    print("MiddleServer is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
if __name__ == "__main__":
    print(builtins.type("")==str)
    pass