admin
2025-03-26 8c51895ecab1de9c6faf664d389168473951f3bd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import datetime
import hashlib
import logging
import os
import time
 
import constant
from log_module.log import printlog
from utils import tool
 
 
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
 
 
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
 
 
def __get_async_log_time(line):
    line = line.split(" - ")[1]
    time_str = line[line.find("[") + 1:line.find("[") + 9]
    return time_str
 
 
__log_file_contents = {}
 
 
# 加载文件内容
def __load_file_content(path_str, expire_timespace=20):
    md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest()
    if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace:
        return __log_file_contents[md5][1]
    contents = []
    if os.path.exists(path_str):
        with open(path_str, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                contents.append(line)
    __log_file_contents[md5] = (time.time(), contents)
    return contents
 
 
def load_stock_of_markets_plate(date=tool.get_now_date_str()):
    """
     获取精选流入的成分股
    :param date:
    :return:
    """
    path = f"{constant.get_path_prefix()}/low_suction_log/gp/kpl/stock_of_markets_plate.{date}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    time_str = __get_async_log_time(line)
                    try:
                        data = line.split(" - ")[1].strip()
                        if data.startswith("["):
                            data = data[data.find("]") + 1:].strip()
                        data_dict = eval(data)
                        fdatas.append((time_str, data_dict))
                    except:
                        pass
    return fdatas
 
if __name__ == '__main__':
    load_stock_of_markets_plate()