import hashlib
|
import os
|
import time
|
|
from local_api.util import tool, constant
|
|
|
class LogUtil:
|
@classmethod
|
def extract_log_from_key(cls, key, path, target_path):
|
fw = open(target_path, mode='w', encoding="utf-8")
|
try:
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find("{}".format(key)) > 0:
|
fw.write(line)
|
finally:
|
fw.close()
|
|
|
# 获取日志时间
|
def __get_log_time(line):
|
time_ = line.split("|")[0].split(" ")[1].split(".")[0]
|
return time_
|
|
|
def __get_async_log_time(line):
|
line = line.split(" - ")[1]
|
time_str = line[line.find("[") + 1:line.find("[") + 9]
|
return time_str
|
|
|
__log_file_contents = {}
|
|
|
# 加载文件内容
|
def __load_file_content(path_str, expire_timespace=20):
|
md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
|
if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
|
return __log_file_contents[md5][1]
|
contents = []
|
if os.path.exists(path_str):
|
with open(path_str, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
contents.append(line)
|
__log_file_contents[md5] = (time.time(), contents)
|
return contents
|
|
|
def load_system_logs(date=tool.get_now_date_str()):
|
path = f"{constant.LOG_DIR}/system/system.{date}.log"
|
fdatas = []
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
content = line[line.find(' - ') + 3:]
|
fdatas.append((__get_log_time(line), content.strip()))
|
return fdatas
|
|
|
def load_trade_logs(date=tool.get_now_date_str()):
|
path = f"{constant.LOG_DIR}/trade/trade.{date}.log"
|
fdatas = []
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
content = line[line.find(' - ') + 3:]
|
fdatas.append((__get_log_time(line), content.strip()))
|
return fdatas
|
|
|
def load_kpl_blocks(date=tool.get_now_date_str()):
|
fdatas = {}
|
path = f"{constant.LOG_DIR}/kpl/block.{date}.log"
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
content = line[line.find(']') + 1:]
|
ds = content.split("#")
|
code = ds[0].strip()
|
data_str = ds[1].strip()
|
blocks = eval(data_str)
|
fdatas[code] = blocks
|
return fdatas
|
|
|
def load_limit_up(date=tool.get_now_date_str()):
|
fdatas = []
|
path = f"{constant.LOG_DIR}/kpl/limit_up.{date}.log"
|
with open(path, 'r', encoding='utf-8') as file:
|
lines = file.readlines()
|
for line in lines:
|
time_str = __get_log_time(line)
|
content = line[line.find(' - ') + 3:]
|
limit_up_list = eval(content.strip())
|
fdatas.append((time_str, limit_up_list))
|
return fdatas
|
|
|
if __name__ == "__main__":
|
fdatas = load_system_logs()
|
print(fdatas)
|
|
pass
|