import datetime import fileinput import hashlib import json import logging import os import shutil import time from code_attribute import gpcode_manager from utils import tool, constant class LogUtil: @classmethod def extract_log_from_key(cls, key, path, target_path): fw = open(target_path, mode='w', encoding="utf-8") try: with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line.find("{}".format(key)) > 0: fw.write(line) finally: fw.close() # 获取日志时间 def __get_log_time(line): time_ = line.split("|")[0].split(" ")[1].split(".")[0] return time_ def __get_async_log_time(line): line = line.split(" - ")[1] time_str = line[line.find("[") + 1:line.find("[") + 9] return time_str __log_file_contents = {} # 加载文件内容 def __load_file_content(path_str, expire_timespace=20): md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest() if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace: return __log_file_contents[md5][1] contents = [] if os.path.exists(path_str): with open(path_str, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: contents.append(line) __log_file_contents[md5] = (time.time(), contents) return contents # 加载买入得分记录 def load_latest_market_info(date=tool.get_now_date_str()): path = f"{constant.get_path_prefix()}/{constant.LOG_DIR}/huaxin_local/l2/market.{date}.log" fdatas = {} MAX_LINE = 5000 with open(path, 'r') as file: file.seek(0, 2) end_position = file.tell() lines_to_read = MAX_LINE + 1 # 包括最后一行的换行符 lines = [] while len(lines) < lines_to_read and end_position > 0: # 向前移动一个字符 end_position -= 1 file.seek(end_position) # 如果字符是换行符,说明到达一行的末尾 if file.read(1) == '\n': line = file.readline() if line: lines.append(line) lines.reverse() for line in lines: start_index = line.find("]") line = line[start_index + 1:].strip() data = eval(line) fdatas[data[0]] = data return fdatas if __name__ == "__main__": fdatas = load_latest_market_info() print(fdatas) pass