import os
|
import sys
|
|
from loguru import logger
|
|
from utils import tool
|
|
|
class MyLogger:
|
def __init__(self):
|
logger.remove()
|
program_path = sys.argv[0]
|
absolute_path = os.path.abspath(program_path)
|
# 每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
|
logger.add(self.get_path("datas", "l1"),
|
filter=lambda record: record["extra"].get("name") == "l1",
|
rotation="00:00", compression="zip", enqueue=True)
|
|
def get_base_path(self):
|
program_path = sys.argv[0]
|
absolute_path = os.path.abspath(program_path)
|
directory_path = os.path.dirname(absolute_path)
|
return directory_path
|
|
def get_path(self, dir_name, log_name):
|
path_str = "{}/{}/{}".format(self.get_base_path(), dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
|
return path_str
|
|
def get_logger(self, log_name):
|
return logger.bind(name=log_name)
|
|
|
__mylogger = MyLogger()
|
|
logger_l1 = __mylogger.get_logger("l1")
|
|
|
def save_l1_data(code, data):
|
logger_l1.info(f"{code}#{data}")
|
|
|
def get_l1_datas(code):
|
fdatas = []
|
path_str = "{}\\{}\\{}".format(__mylogger.get_base_path(), "datas", "l1") + f".{tool.get_now_date_str()}.log"
|
if os.path.exists(path_str):
|
with open(path_str, 'r') as f:
|
while True:
|
line = f.readline()
|
if not line:
|
break
|
if line.find(f"{code}#") < 0:
|
continue
|
data = line.split("#")[1]
|
data = eval(data)
|
if abs(data["rate"]) > 0.3:
|
continue
|
fdatas.append(data)
|
return fdatas
|