admin
2025-06-10 568c763084b926a6f2d632b7ac65b9ec8280752f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import sys
 
from loguru import logger
 
from utils import tool
 
 
class MyLogger:
    def __init__(self):
        logger.remove()
        program_path = sys.argv[0]
        absolute_path = os.path.abspath(program_path)
        #   每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志
        logger.add(self.get_path("datas", "l1"),
                   filter=lambda record: record["extra"].get("name") == "l1",
                   rotation="00:00", compression="zip", enqueue=True)
 
    def get_base_path(self):
        program_path = sys.argv[0]
        absolute_path = os.path.abspath(program_path)
        directory_path = os.path.dirname(absolute_path)
        return directory_path
 
    def get_path(self, dir_name, log_name):
        path_str = "{}/{}/{}".format(self.get_base_path(), dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
        return path_str
 
    def get_logger(self, log_name):
        return logger.bind(name=log_name)
 
 
__mylogger = MyLogger()
 
logger_l1 = __mylogger.get_logger("l1")
 
 
def save_l1_data(code, data):
    logger_l1.info(f"{code}#{data}")
 
 
def get_l1_datas(code):
    fdatas = []
    path_str = "{}\\{}\\{}".format(__mylogger.get_base_path(), "datas", "l1") + f".{tool.get_now_date_str()}.log"
    if os.path.exists(path_str):
        with open(path_str, 'r') as f:
            while True:
                line = f.readline()
                if not line:
                    break
                if line.find(f"{code}#") < 0:
                    continue
                data = line.split("#")[1]
                data = eval(data)
                if abs(data["rate"]) > 0.3:
                    continue
                fdatas.append(data)
    return fdatas