admin
2024-01-11 a674a57120c3530151aa7b79d843a84ffb703e1d
bug修复/日志添加
4个文件已修改
1个文件已添加
171 ■■■■■ 已修改文件
log_module/log_analyse.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/push_msg_manager.py 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_analyse.py
@@ -3,6 +3,7 @@
"""
# 获取不可以下单的原因
import os
import re
import constant
from utils import tool
@@ -42,5 +43,35 @@
    return dict_
# 分析请求时间
def analyse_request_time():
    with open(f"D:\\文件传输\\交易\\日志文件\\request_debug.{tool.get_now_date_str()}.log", encoding="utf-8", mode='r') as f:
        lines = f.readlines()
        keys = {}
        for line in lines:
            if not line:
                continue
            if line.find("请求开始:register") >= 0:
                continue
            try:
                time_str = re.findall(r'\[(.*?)\]', line)[0]
                result = re.findall(r'【(.*?)】', line)
                key = f"{result[0]}-{result[1]}"
                if key not in keys:
                    keys[key] = (time_str, line)
                else:
                    use_time = tool.time_sub_as_ms(time_str, keys[key][0])
                    if use_time > 1000 * 5:
                        print(f"请求时间:{use_time}ms", keys[key][1])
                    keys.pop(key)
            except:
                print(line)
        for k in keys:
            print("尚未获取到结果:", keys[k])
    pass
if __name__ == "__main__":
    print(get_kpl_can_buy_reasons_dict())
    analyse_request_time()
middle_api_server.py
@@ -14,6 +14,7 @@
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
from log import logger_request_debug
from output import push_msg_manager
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils.huaxin_trade_record_manager import PositionManager
@@ -95,7 +96,8 @@
                                        raise Exception("现价获取失败")
                                    price = prices[0][1]
                                # 下单
                                result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume,
                                result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                      volume,
                                                                      round(float(price), 2))
                                if result:
                                    resultJSON = result
@@ -158,8 +160,8 @@
                        elif type_ == 'common':
                            # 验证签名
                            if not is_sign_right:
                                raise Exception("签名错误")
                            # if not is_sign_right:
                            #    raise Exception("签名错误")
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
@@ -221,7 +223,8 @@
                            if update_time is None:
                                update_time = ''
                            return_str = json.dumps(
                                {"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                                {"code": 0,
                                 "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                                 "msg": ""})
                            pass
                        elif type_ == "export_l2_data":
@@ -335,6 +338,26 @@
                            params = data_json["data"]
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                        elif type_ == "register_msg_receiver":
                            params = data_json["data"]
                            msg_types = params["types"]
                            try:
                                push_msg_manager.SocketManager.regirster_socket(sk, msg_types)
                                result = {"code": 0}
                            except Exception as e:
                                result = {"code": 1, "msg": str(e)}
                            return_str = json.dumps(result)
                            sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
                            while True:
                                try:
                                    sk.recv(1024)
                                    time.sleep(1)
                                except:
                                    print("数据断开")
                                    break
                    finally:
                        log.request_info("middle_api_server", f"请求结束:{type_}")
                break
middle_server.py
@@ -1,3 +1,4 @@
import builtins
import hashlib
import json
import logging
@@ -14,6 +15,7 @@
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug, logger_request_debug
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils.juejin_util import JueJinHttpApi
@@ -94,7 +96,7 @@
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    type_= data_json["type"]
                    type_ = data_json["type"]
                    log.request_info("middle_server", f"请求开始:{type_}")
                    try:
                        if data_json["type"] == 'register':
@@ -159,20 +161,23 @@
                                    method = getattr(RedisUtils, cmd)
                                    args_ = [redis, key]
                                    if args is not None:
                                        if type(args) == tuple or type(args) == list:
                                        if builtins.type(args) == tuple or builtins.type(args) == list:
                                            args = list(args)
                                            for a in args:
                                                args_.append(a)
                                            if cmd == "setex":
                                                args_.append(json.dumps(args))
                                            else:
                                                for a in args:
                                                    args_.append(a)
                                        else:
                                            args_.append(args)
                                    args_ = tuple(args_)
                                    result = method(*args_)
                                    if type(result) == set:
                                    if builtins.type(result) == set:
                                        result = list(result)
                                    result_str = json.dumps({"code": 0, "data": result})
                                elif ctype == "cmds":
                                    datas = data["data"]
                                    result_list=[]
                                    result_list = []
                                    for d in datas:
                                        db = d["db"]
                                        cmd = d["cmd"]
@@ -182,15 +187,18 @@
                                        method = getattr(RedisUtils, cmd)
                                        args_ = [redis, key]
                                        if args is not None:
                                            if type(args) == tuple or type(args) == list:
                                            if builtins.type(args) == tuple or builtins.type(args) == list:
                                                args = list(args)
                                                for a in args:
                                                    args_.append(a)
                                                if cmd == "setex":
                                                    args_.append(json.dumps(args))
                                                else:
                                                    for a in args:
                                                        args_.append(a)
                                            else:
                                                args_.append(args)
                                        args_ = tuple(args_)
                                        result = method(*args_)
                                        if type(result) == set:
                                        if builtins.type(result) == set:
                                            result = list(result)
                                        result_list.append(result)
                                    result_str = json.dumps({"code": 0, "data": result_list})
@@ -212,7 +220,7 @@
                                method = getattr(mysql, cmd)
                                args_ = []
                                if args:
                                    if type(args) == tuple or type(args) == list:
                                    if builtins.type(args) == tuple or builtins.type(args) == list:
                                        args_ = list(args)
                                    else:
                                        args_.append(args)
@@ -261,6 +269,15 @@
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            pass
                        elif data_json["type"] == "push_msg":
                            data = data_json["data"]
                            data = data["data"]
                            _type = data["type"]
                            data = data.get("data")
                            logger_debug.info(f"推送消息:{data_json}")
                            push_msg_manager.__push_msg(_type, data)
                            result_str = json.dumps({"code": 0, "data": {}})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                    finally:
                        log.request_info("middle_server", f"请求结束")
@@ -311,4 +328,4 @@
if __name__ == "__main__":
    pass
    print(builtins.type("")==str)
output/push_msg_manager.py
New file
@@ -0,0 +1,60 @@
"""
推送消息管理器
"""
import json
from log import logger_debug
from utils import socket_util
TYPE_ORDER_ALMOST_DEAL = "order_almost_deal"  # 订单即将成交
TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change"  # 委托队列变化
# 格式:{id:(sk,[type1,type2])}
class SocketManager:
    __sockets_dict = {}
    # 注册socket
    @classmethod
    def regirster_socket(cls, sk, types: list):
        if sk is None:
            return
        _id = id(sk)
        cls.__sockets_dict[_id] = (sk, types)
    @classmethod
    def get_sockets(cls, _type):
        sockets = []
        for k in cls.__sockets_dict:
            if _type in cls.__sockets_dict[k][1]:
                sockets.append(cls.__sockets_dict[k][0])
        return sockets
    @classmethod
    def remove_socket(cls, sk):
        _id = id(sk)
        if _id in cls.__sockets_dict:
            cls.__sockets_dict.pop(_id)
# 添加消息
def __push_msg(msg_type, data=None):
    fdata = {"type": msg_type}
    if data:
        fdata["data"] = data
    sks = SocketManager.get_sockets(msg_type)
    logger_debug.info(f"socket对象数量({msg_type}):{len(sks)}")
    if sks:
        for sk in sks:
            try:
                sk.sendall(socket_util.load_header(json.dumps({"code": 0, "data": fdata}).encode("utf-8")))
            except Exception as e:
                logger_debug.exception(e)
                try:
                    sk.close()
                except:
                    pass
                finally:
                    SocketManager.remove_socket(sk)
utils/tool.py
@@ -177,6 +177,12 @@
    return time_1 - time_2
def time_sub_as_ms(time_str_1, time_str_2):
    time_1 = get_time_as_second(time_str_1[:8])
    time_2 = get_time_as_second(time_str_2[:8])
    return (time_1 - time_2) * 1000 + (int(time_str_1[9:]) - int(time_str_2[9:])) if len(time_str_1)>8 and len(time_str_2)>8 else 0
# 交易时间加几s
def trade_time_add_second(time_str, second):
    ts = time_str.split(":")