admin
2025-06-04 287c506725b2d970f721f80169f83c2418cb0991
middle_server.py
@@ -1,6 +1,6 @@
import datetime
import builtins
import copy
import hashlib
import io
import json
import logging
import queue
@@ -10,14 +10,35 @@
import threading
import time
import constant
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util
from log_module import log
from log_module.log import logger_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
    block_web_api
from utils.juejin_util import JueJinHttpApi
trade_data_request_queue = queue.Queue()
__mysql_config_dict = {}
def get_mysql_config(db_name):
    """
    获取mysql的配置
    :param db_name:
    :return:
    """
    if db_name in __mysql_config_dict:
        return __mysql_config_dict.get(db_name)
    config = copy.deepcopy(constant.MYSQL_CONFIG)
    config["database"] = db_name
    __mysql_config_dict[db_name] = config
    return config
class MyTCPServer(socketserver.TCPServer):
@@ -94,151 +115,279 @@
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    if data_json["type"] == 'register':
                        client_type = data_json["data"]["client_type"]
                        rid = data_json["rid"]
                        socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                        sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                        try:
                            # print("客户端", ClientSocketManager.socket_client_dict)
                            while True:
                                result, header = self.getRecvData(sk)
                                try:
                                    resultJSON = json.loads(result)
                                    if resultJSON["type"] == 'heart':
                                        # 记录活跃客户端
                                        socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
                                except json.decoder.JSONDecodeError as e:
                                    print("JSON解析出错", result, header)
                                    if not result:
                                        sk.close()
                                        break
                                time.sleep(1)
                        except ConnectionResetError as ee:
                            socket_manager.ClientSocketManager.del_client(rid)
                        except Exception as e:
                            logging.exception(e)
                    type_ = data_json["type"]
                    __start_time = time.time()
                    try:
                        if data_json["type"] == 'register':
                            client_type = data_json["data"]["client_type"]
                            rid = data_json["rid"]
                            socket_manager.ClientSocketManager.add_client(client_type, rid, sk)
                            sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                            try:
                                # print("客户端", ClientSocketManager.socket_client_dict)
                                while True:
                                    result, header = self.getRecvData(sk)
                                    try:
                                        resultJSON = json.loads(result)
                                        if resultJSON["type"] == 'heart':
                                            # 记录活跃客户端
                                            socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
                                    except json.decoder.JSONDecodeError as e:
                                        print("JSON解析出错", result, header)
                                        if not result:
                                            sk.close()
                                            break
                                    time.sleep(1)
                            except ConnectionResetError as ee:
                                socket_manager.ClientSocketManager.del_client(rid)
                            except Exception as e:
                                logging.exception(e)
                        elif data_json["type"] == "response":
                            # 主动触发的响应
                            try:
                                client_id = data_json["client_id"]
                                # hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                                # # 设置响应内容
                                hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_subscript_codes":
                            # 设置订阅的代码
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes = datas
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_subscript_codes_rate":
                            # 设置订阅的代码的涨幅
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes_rate = datas
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "l2_position_subscript_codes":
                            # 设置订阅的代码
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                print("l2_position_subscript_codes", data_json)
                                global_data_cache_util.huaxin_position_subscript_codes = datas
                                global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
                                sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                        elif data_json["type"] == "redis":
                            try:
                                data = data_json["data"]
                                ctype = data["ctype"]
                    elif data_json["type"] == "response":
                        # 主动触发的响应
                        try:
                            client_id = data_json["client_id"]
                            # hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
                            # # 设置响应内容
                            hosting_api_util.set_response(client_id, data_json["request_id"], data_json['data'])
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes":
                        # 设置订阅的代码
                        try:
                            data = data_json["data"]
                            datas = data["data"]
                            print("l2_subscript_codes", data_json)
                            global_data_cache_util.huaxin_subscript_codes = datas
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "redis":
                        try:
                            data = data_json["data"]
                            ctype = data["ctype"]
                            data = data["data"]
                            result_str = ''
                            if ctype == "queue_size":
                                # TODO 设置队列大小
                                result_str = json.dumps({"code": 0})
                            elif ctype == "cmd":
                                result_str = ''
                                if ctype == "queue_size":
                                    # TODO 设置队列大小
                                    result_str = json.dumps({"code": 0})
                                elif ctype == "cmd":
                                    data = data["data"]
                                    db = data["db"]
                                    cmd = data["cmd"]
                                    key = data["key"]
                                    args = data.get("args")
                                    redis = RedisManager(db).getRedis()
                                    method = getattr(RedisUtils, cmd)
                                    args_ = [redis, key]
                                    if args is not None:
                                        if builtins.type(args) == tuple or builtins.type(args) == list:
                                            args = list(args)
                                            if cmd == "setex":
                                                args_.append(args[0])
                                                if type(args[1]) == list:
                                                    args_.append(json.dumps(args[1]))
                                                else:
                                                    args_.append(args[1])
                                            else:
                                                for a in args:
                                                    args_.append(a)
                                        else:
                                            args_.append(args)
                                    args_ = tuple(args_)
                                    result = method(*args_)
                                    if builtins.type(result) == set:
                                        result = list(result)
                                    result_str = json.dumps({"code": 0, "data": result})
                                elif ctype == "cmds":
                                    datas = data["data"]
                                    result_list = []
                                    for d in datas:
                                        db = d["db"]
                                        cmd = d["cmd"]
                                        key = d["key"]
                                        args = d.get("args")
                                        redis = RedisManager(db).getRedis()
                                        method = getattr(RedisUtils, cmd)
                                        args_ = [redis, key]
                                        if args is not None:
                                            if builtins.type(args) == tuple or builtins.type(args) == list:
                                                args = list(args)
                                                if cmd == "setex":
                                                    args_.append(args[0])
                                                    if type(args[1]) == list:
                                                        args_.append(json.dumps(args[1]))
                                                    else:
                                                        args_.append(args[1])
                                                else:
                                                    for a in args:
                                                        args_.append(a)
                                            else:
                                                args_.append(args)
                                        args_ = tuple(args_)
                                        result = method(*args_)
                                        if builtins.type(result) == set:
                                            result = list(result)
                                        result_list.append(result)
                                    result_str = json.dumps({"code": 0, "data": result_list})
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logger_debug.exception(e)
                                logger_debug.info(f"Redis操作出错:data_json:{data_json}")
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "mysql":
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                db = data["db"]
                                cmd = data["cmd"]
                                key = data["key"]
                                args = data.get("args")
                                redis = RedisManager(db).getRedis()
                                method = getattr(RedisUtils, cmd)
                                args_ = [redis, key]
                                if args is not None:
                                    if type(args) == tuple or type(args) == list:
                                        args = list(args)
                                        for a in args:
                                            args_.append(a)
                                mysql_config = get_mysql_config(db)
                                mysql = mysql_data.Mysqldb(mysql_config)
                                method = getattr(mysql, cmd)
                                args_ = []
                                if args:
                                    if builtins.type(args) == tuple or builtins.type(args) == list:
                                        args_ = list(args)
                                    else:
                                        args_.append(args)
                                args_ = tuple(args_)
                                result = method(*args_)
                                if type(result) == set:
                                    result = list(result)
                                result_str = json.dumps({"code": 0, "data": result}, default=str)
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "juejin":
                            # 掘金请求
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                path_ = data["path"]
                                params = data.get("params")
                                result = JueJinHttpApi.request(path_, params)
                                result_str = json.dumps({"code": 0, "data": result})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logger_debug.exception(e)
                            logger_debug.info(f"Redis操作出错:data_json:{data_json}")
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "mysql":
                        try:
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "kpl":
                            # 开盘啦请求
                            try:
                                data = data_json["data"]
                                data = data["data"]
                                url = data["url"]
                                data_ = data.get("data")
                                result = kpl_api_util.request(url, data_)
                                result_str = json.dumps({"code": 0, "data": result})
                                sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            except Exception as e:
                                logging.exception(e)
                                sk.sendall(socket_util.load_header(
                                    json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                        elif data_json["type"] == "kp_msg":
                            # 看盘消息
                            data = data_json["data"]
                            data = data["data"]
                            db = data["db"]
                            cmd = data["cmd"]
                            args = data.get("args")
                            mysql = mysql_data.Mysqldb()
                            method = getattr(mysql, cmd)
                            args_ = []
                            if args:
                                if type(args) == tuple or type(args) == list:
                                    args_ = list(args)
                                else:
                                    args_.append(args)
                            args_ = tuple(args_)
                            result = method(*args_)
                            result_str = json.dumps({"code": 0, "data": result}, default=str)
                            msg = data["msg"]
                            kp_client_msg_manager.add_msg(msg)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "juejin":
                        # 掘金请求
                        try:
                            pass
                        elif data_json["type"] == "push_msg":
                            data = data_json["data"]["data"]
                            _type = data["type"]
                            data = data.get("data")
                            logger_debug.info(f"推送消息:{data_json}")
                            push_msg_manager.push_msg(_type, data)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        elif data_json["type"] == 'l1_data':
                            datas = data_json["data"]
                            L1DataManager().add_datas(datas)
                            break
                        elif data_json["type"] == 'get_l1_target_codes':
                            # 获取目标代码
                            codes = L1DataManager().get_target_codes()
                            result_str = json.dumps({"code": 0, "data": list(codes)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                        # 获取三方板块
                        elif data_json["type"] == 'get_third_blocks':
                            data = data_json["data"]
                            data = data["data"]
                            path_ = data["path"]
                            params = data.get("params")
                            result = JueJinHttpApi.request(path_, params)
                            result_str = json.dumps({"code": 0, "data": result})
                            source = data["source"]
                            code = data["code"]
                            result_str = json.dumps({"code": 1, "msg": "source不匹配"})
                            if source == 2:
                                # 通达信
                                try:
                                    blocks = block_web_api.get_tdx_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            elif source == 3:
                                # 同花顺
                                try:
                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    try:
                                        block_web_api.THSBlocksApi.load_hexin_v()
                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
                                    except Exception as e1:
                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
                            elif source == 4:
                                # 东方财富
                                try:
                                    blocks = block_web_api.get_eastmoney_block(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "kpl":
                        # 开盘啦请求
                        try:
                            data = data_json["data"]
                            data = data["data"]
                            url = data["url"]
                            data_ = data.get("data")
                            result = kpl_api_util.request(url, data_)
                            result_str = json.dumps({"code": 0, "data": result})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
                    elif data_json["type"] == "kp_msg":
                        # 看盘消息
                        data = data_json["data"]
                        data = data["data"]
                        msg = data["msg"]
                        kp_client_msg_manager.add_msg(msg)
                        result_str = json.dumps({"code": 0, "data": {}})
                        sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        pass
                            break
                        elif data_json["type"] == 'low_suction':
                            # TODO 低吸通道
                            datas = data_json["data"]
                            pass
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
                        if time.time() - __start_time > 2:
                            log.logger_tuoguan_request_debug.info(
                                f"耗时:{int(time.time() - __start_time)}s  数据:{data_json}")
                else:
                    # 断开连接
                    break
                # sk.close()
            except Exception as e:
                # log.logger_tuoguan_request_debug.exception(e)
                logging.exception(e)
                break
@@ -269,12 +418,12 @@
                pass
def run():
def run(port=constant.MIDDLE_SERVER_PORT):
    print("create MiddleServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10008
    laddr = "0.0.0.0", port
    print("MiddleServer is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()