Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
log_module/async_log_util.py
@@ -1,7 +1,9 @@
"""
异步日志管理器
"""
import logging
import queue
import threading
import time
from log_module.log import logger_debug, logger_system
@@ -9,10 +11,21 @@
class AsyncLogManager:
    __log_queue = queue.Queue()
    def __init__(self):
        self.__log_queue = queue.Queue(maxsize=10240)
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
        try:
            self.__log_queue.put_nowait((logger, time.time(), method, args))
        except Exception:
            pass
    def add_log(self, data):
        try:
            self.__log_queue.put_nowait(data)
        except Exception:
            pass
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
@@ -29,79 +42,69 @@
    def exception(self, logger, *args):
        self.__add_log(logger, "exception", *args)
    def get_queue_size(self):
        """
        获取队列大小
        @return:
        """
        self.__log_queue.qsize()
    # 运行同步日志
    def run_sync(self):
    def run_sync(self, add_to_common_log=False):
        # print("run_sync", add_to_common_log)
        logger_system.info(f"run_sync 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                val = self.__log_queue.get()
                time_s = val[1]
                cmd = val[2]
                method = getattr(val[0], cmd)
                d = list(val[3])
                d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
                d = tuple(d)
                method(*d)
            except:
                pass
                if not add_to_common_log:
                    time_s = val[1]
                    cmd = val[2]
                    method = getattr(val[0], cmd)
                    d = list(val[3])
                    d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
                    d = tuple(d)
                    method(*d)
                else:
                    _common_log.add_log(val)
            except Exception as e:
                logging.exception(e)
l2_data_log = AsyncLogManager()
huaxin_l2_log = AsyncLogManager()
log_queue = queue.Queue()
def __add_log(logger, time_out_log, method, *args):
    start_time = time.time()
    log_queue.put_nowait((logger, start_time, method, args))
    if time_out_log:
        end_time = time.time()
        sub_time = end_time - start_time
        if sub_time > 0.01:
            # 记录日志保存慢的日志
            __add_log(logger_debug, False, f"保存到日志队列用时:{sub_time}s")
_common_log = AsyncLogManager()
def debug(logger, *args):
    __add_log(logger, True, "debug", *args)
    _common_log.debug(logger, *args)
def info(logger, *args):
    __add_log(logger, True, "info", *args)
    _common_log.info(logger, *args)
def warning(logger, *args):
    __add_log(logger, True, "warning", *args)
    _common_log.warning(logger, *args)
def error(logger, *args):
    __add_log(logger, True, "error", *args)
    _common_log.error(logger, *args)
def exception(logger, *args):
    __add_log(logger, True, "exception", *args)
    _common_log.exception(logger, *args)
# 运行同步日志
def run_sync():
    logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            val = log_queue.get()
            time_s = val[1]
            cmd = val[2]
            method = getattr(val[0], cmd)
            d = list(val[3])
            d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:3]}] " + d[0]
            d = tuple(d)
            method(*d)
        except:
            pass
    _common_log.run_sync()
if __name__ == "__main__":
    # info(logger_debug, "*-{}", "test")
    info(logger_debug, "002375")
    run_sync()
    _queue = queue.Queue(maxsize=102400)
    for i in range(200):
        _queue.put_nowait("1")