admin
2024-06-21 e15f8d3a986b9e8ccea31d28e0f7e26ac73d85b5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import hashlib
import os
import time
 
from local_api.util import tool, constant
 
 
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
 
 
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
 
 
def __get_async_log_time(line):
    line = line.split(" - ")[1]
    time_str = line[line.find("[") + 1:line.find("[") + 9]
    return time_str
 
 
__log_file_contents = {}
 
 
# 加载文件内容
def __load_file_content(path_str, expire_timespace=20):
    md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
    if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
        return __log_file_contents[md5][1]
    contents = []
    if os.path.exists(path_str):
        with open(path_str, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                contents.append(line)
    __log_file_contents[md5] = (time.time(), contents)
    return contents
 
 
def load_system_logs(date=tool.get_now_date_str()):
    path = f"{constant.LOG_DIR}/system/system.{date}.log"
    fdatas = []
    with open(path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        for line in lines:
            content = line[line.find(' - ') + 3:]
            fdatas.append((__get_log_time(line), content.strip()))
    return fdatas
 
 
def load_trade_logs(date=tool.get_now_date_str()):
    path = f"{constant.LOG_DIR}/trade/trade.{date}.log"
    fdatas = []
    with open(path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        for line in lines:
            content = line[line.find(' - ') + 3:]
            fdatas.append((__get_log_time(line), content.strip()))
    return fdatas
 
 
def load_kpl_blocks(date=tool.get_now_date_str()):
    fdatas = {}
    path = f"{constant.LOG_DIR}/kpl/block.{date}.log"
    with open(path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        for line in lines:
            content = line[line.find(']') + 1:]
            ds = content.split("#")
            code = ds[0].strip()
            data_str = ds[1].strip()
            blocks = eval(data_str)
            fdatas[code] = blocks
    return fdatas
 
 
def load_limit_up(date=tool.get_now_date_str()):
    fdatas = []
    path = f"{constant.LOG_DIR}/kpl/limit_up.{date}.log"
    with open(path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        for line in lines:
            time_str = __get_log_time(line)
            content = line[line.find(' - ') + 3:]
            limit_up_list = eval(content.strip())
            fdatas.append((time_str, limit_up_list))
    return fdatas
 
 
if __name__ == "__main__":
    fdatas = load_system_logs()
    print(fdatas)
 
    pass