import datetime
|
import hashlib
|
import logging
|
import os
|
import time
|
|
import constant
|
from log_module.log import printlog
|
from utils import tool
|
|
|
class LogUtil:
|
@classmethod
|
def extract_log_from_key(cls, key, path, target_path):
|
fw = open(target_path, mode='w', encoding="utf-8")
|
try:
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line.find("{}".format(key)) > 0:
|
fw.write(line)
|
finally:
|
fw.close()
|
|
|
# 获取日志时间
|
def __get_log_time(line):
|
time_ = line.split("|")[0].split(" ")[1].split(".")[0]
|
return time_
|
|
|
def __get_async_log_time(line):
|
line = line.split(" - ")[1]
|
time_str = line[line.find("[") + 1:line.find("[") + 9]
|
return time_str
|
|
|
__log_file_contents = {}
|
|
|
# 加载文件内容
|
def __load_file_content(path_str, expire_timespace=20):
|
md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
|
if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
|
return __log_file_contents[md5][1]
|
contents = []
|
if os.path.exists(path_str):
|
with open(path_str, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
contents.append(line)
|
__log_file_contents[md5] = (time.time(), contents)
|
return contents
|
|
|
def load_stock_of_markets_plate(date=tool.get_now_date_str()):
|
"""
|
获取精选流入的成分股
|
:param date:
|
:return:
|
"""
|
path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate.{date}.log"
|
fdatas = []
|
if os.path.exists(path):
|
with open(path, 'r', encoding="utf-8") as f:
|
lines = f.readlines()
|
for line in lines:
|
if line:
|
time_str = __get_async_log_time(line)
|
try:
|
data = line.split(" - ")[1].strip()
|
if data.startswith("["):
|
data = data[data.find("]") + 1:].strip()
|
data_dict = eval(data)
|
fdatas.append((time_str, data_dict))
|
except:
|
pass
|
return fdatas
|
|
if __name__ == '__main__':
|
load_stock_of_markets_plate()
|