import datetime import hashlib import logging import os import time import constant from log_module.log import printlog from utils import tool class LogUtil: @classmethod def extract_log_from_key(cls, key, path, target_path): fw = open(target_path, mode='w', encoding="utf-8") try: with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line.find("{}".format(key)) > 0: fw.write(line) finally: fw.close() # 获取日志时间 def __get_log_time(line): time_ = line.split("|")[0].split(" ")[1].split(".")[0] return time_ def __get_async_log_time(line): line = line.split(" - ")[1] time_str = line[line.find("[") + 1:line.find("[") + 9] return time_str __log_file_contents = {} # 加载文件内容 def __load_file_content(path_str, expire_timespace=20): md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest() if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace: return __log_file_contents[md5][1] contents = [] if os.path.exists(path_str): with open(path_str, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: contents.append(line) __log_file_contents[md5] = (time.time(), contents) return contents def load_market_sift_plate(date=tool.get_now_date_str()): """ 获取精选流入的成分股 :param date: :return: """ path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/market_sift_plate.{date}.log" fdatas = [] if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: time_str = __get_log_time(line) try: data = line.split(" - ")[1].strip() data_dict = eval(data) fdatas.append((time_str, data_dict)) except: pass return fdatas def load_kpl_market_stock_heat(date=tool.get_now_date_str()): """ 获取精选流入的成分股 :param date: :return: """ path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate.{date}.log" fdatas = [] if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: time_str = __get_async_log_time(line) try: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() data_dict = eval(data) fdatas.append((time_str, data_dict)) except: pass return fdatas def load_kpl_market_strong(date=tool.get_now_date_str()): """ 获取开盘啦历史强度 :param date: :return: [("时间","分数")] """ path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/Overall_market_strength_score.{date}.log" fdatas = [] if os.path.exists(path): with open(path, 'r', encoding="utf-8") as f: lines = f.readlines() for line in lines: if line: time_str = __get_async_log_time(line) try: data = line.split(" - ")[1].strip() if data.startswith("["): data = data[data.find("]") + 1:].strip() fdatas.append((time_str, int(data))) except: pass return fdatas if __name__ == '__main__': datas = load_market_sift_plate() fdatas = [] for data in datas: # (距离09:15:00的秒数, 时间, 强度) if "11:30:00"<= data[0]<="13:00:00": continue fdatas.append([tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]]) print(fdatas)