admin
2024-01-08 b4ff4ac715060ce313cdaee2345617b4b7a979ea
bug修复/日志添加
6个文件已修改
97 ■■■■ 已修改文件
data_server.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py
@@ -6,6 +6,7 @@
import dask
import log
from code_attribute import gpcode_manager
from log import logger_request_debug
from log_module import log_analyse, log_export
@@ -44,10 +45,10 @@
            KPLLimitUpDataRecordManager.load_total_datas()
            total_datas = KPLLimitUpDataRecordManager.total_datas
        current_datas_results = hosting_api_util.common_request({"ctype":"get_kpl_limit_up_datas"})
        current_datas_results = hosting_api_util.common_request({"ctype": "get_kpl_limit_up_datas"})
        if type(current_datas_results) == str:
            current_datas_results = json.loads(current_datas_results)
        current_datas = current_datas_results.get("data")  #KPLLimitUpDataRecordManager.latest_origin_datas
        current_datas = current_datas_results.get("data")  # KPLLimitUpDataRecordManager.latest_origin_datas
        current_block_codes = {}
        for c in current_datas:
            if c[5] not in current_block_codes:
@@ -272,8 +273,7 @@
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        thread_id = random.randint(0, 1000000)
        logger_request_debug.info(f"GET 请求开始({thread_id}):{url.path}")
        log.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}")
        try:
            if url.path == "/kpl/get_limit_up_list":
                response_data = self.__get_limit_up_list()
@@ -287,14 +287,12 @@
                result = hosting_api_util.get_from_data_server(url.path, ps_dict)
                self.__send_response(result)
        finally:
            logger_request_debug.info(f"GET 请求结束({thread_id}):{url.path}")
            log.request_info("DATA_SERVER_GET", f"GET 请求结束")
    def do_POST(self):
        thread_id = random.randint(0, 1000000)
        path = self.path
        url = urlparse.urlparse(path)
        logger_request_debug.info(f"POST 请求开始({thread_id}):{url.path}")
        log.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}")
        try:
            if url.path == "/upload_kpl_data":
                # 接受开盘啦数据
@@ -302,7 +300,7 @@
                result_str = self.__process_kpl_data(params)
                self.__send_response(result_str)
        finally:
            logger_request_debug.info(f"POST 请求结束({thread_id}):{url.path}")
            log.request_info("DATA_SERVER_POST", f"POST 请求结束")
    def __process_kpl_data(self, data):
        data = json.loads(json.dumps(data).replace("概念", ""))
log.py
@@ -1,9 +1,13 @@
"""
日志
"""
import queue
import sys
import time
from loguru import logger
import constant
from utils import tool
class MyLogger:
@@ -79,3 +83,49 @@
logger_profile = __mylogger.get_logger("profile")
logger_request_debug = __mylogger.get_logger("request_debug")
class AsyncLogManager:
    __log_queue = queue.Queue()
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
    def info(self, logger, *args):
        self.__add_log(logger, "info", *args)
    def warning(self, logger, *args):
        self.__add_log(logger, "warning", *args)
    def error(self, logger, *args):
        self.__add_log(logger, "error", *args)
    def exception(self, logger, *args):
        self.__add_log(logger, "exception", *args)
    # 运行同步日志
    def run_sync(self):
        while True:
            try:
                val = self.__log_queue.get()
                time_s = val[1]
                cmd = val[2]
                method = getattr(val[0], cmd)
                d = list(val[3])
                d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:3]}] " + d[0]
                d = tuple(d)
                method(*d)
            except:
                pass
async_log_util = AsyncLogManager()
def request_info(type_name, content, thread_id=None):
    if not thread_id:
        thread_id = tool.get_thread_id()
    async_log_util.info(logger_request_debug, f"【{thread_id}】【{type_name}】 {content}")
main.py
@@ -2,6 +2,7 @@
import constant
import data_server
import log
import middle_api_server
import middle_server
@@ -10,4 +11,6 @@
    t1.start()
    t1 = threading.Thread(target=lambda: data_server.run("0.0.0.0", constant.DATA_SERVER_PORT), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: log.async_log_util.run_sync(), daemon=True)
    t1.start()
    middle_server.run()
middle_api_server.py
@@ -8,6 +8,7 @@
import time
import constant
import log
import socket_manager
import trade_manager
from db import mysql_data, redis_manager
@@ -65,9 +66,8 @@
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = json.loads(data_str)
                    type_ = data_json['type']
                    thread_id = random.randint(0, 1000000)
                    try:
                        logger_request_debug.info(f"middle_api_server 请求开始({thread_id}):{type_}")
                        log.request_info("middle_api_server", f"请求开始:{type_}")
                        if type(type_) == int:
                            # 处理数字型TYPE
                            return_str = self.process_num_type(sk, type_, data_str)
@@ -336,7 +336,7 @@
                            result = hosting_api_util.common_request(params)
                            return_str = json.dumps(result)
                    finally:
                        logger_request_debug.info(f"middle_api_server 请求结束({thread_id}):{type}")
                        log.request_info("middle_api_server", f"请求结束:{type_}")
                break
                # sk.close()
            except Exception as e:
middle_server.py
@@ -9,6 +9,7 @@
import time
import constant
import log
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
@@ -93,8 +94,8 @@
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    thread_id = random.randint(0, 1000000)
                    logger_request_debug.info(f"middle_server 请求开始({thread_id}):{data_json.get('type')}")
                    type_= data_json["type"]
                    log.request_info("middle_server", f"请求开始:{type_}")
                    try:
                        if data_json["type"] == 'register':
                            client_type = data_json["data"]["client_type"]
@@ -261,7 +262,8 @@
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            pass
                    finally:
                        logger_request_debug.info(f"middle_server 请求结束({thread_id}):{data_json.get('type')}")
                        log.request_info("middle_server", f"请求结束")
                else:
                    # 断开连接
                    break
utils/tool.py
@@ -1,8 +1,10 @@
"""
常用工具
"""
import ctypes
import decimal
import random
import threading
import time
import time as t
import datetime
@@ -234,3 +236,15 @@
        if code in cache_dict:
            return True, cache_dict.get(code)
        return False, None
def get_thread_id():
    try:
        if constant.is_windows():
            return threading.current_thread().ident
        else:
            thread_id = ctypes.CDLL('libc.so.6').syscall(186)  # Linux下的系统调用号
            return thread_id
    except:
        pass
    return None