Administrator
2024-07-03 7ee37fb1e32d3d5dc527e90c4c0c87c5313f6a56
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
历史K线管理
"""
import datetime
import os
 
import constant
 
 
class HistoryKDataManager:
    __instance = None
    __db = 0
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
 
    @classmethod
    def __load_data(cls):
        pass
 
    def __get_cache_dir(self):
        """
        获取缓存路径
        @return:
        """
        dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        return dir_path
 
    def __del_outdate_datas(self, code):
        dir_path = self.__get_cache_dir()
        datas = []
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(code) < 0:
                    continue
                path_ = os.path.join(root, file)
                day = file.split("_")[0]
                datas.append((day, path_))
        # 保存最新的一条数据
        if datas:
            datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
            datas = datas[1:]
            for d in datas:
                os.remove(d[1])
 
    def save_history_bars(self, code, day, datas, force=False):
        """
        保存历史K线
        @param code: 代码
        @param day: K线最新的日期(取datas最新一条数据的日期)
        @param datas: 数据
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if datas:
            # 将日期格式化
            for d in datas:
                for k in d:
                    if type(d[k]) == datetime.datetime:
                        d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S")
 
        with open(path_str, encoding="utf-8", mode='w') as f:
            f.write(f"{datas}")
        self.__del_outdate_datas(code)
 
    def get_history_bars(self, code, day):
        """
        获取历史K线
        @param code:
        @param day:
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if not os.path.exists(path_str):
            return None
        with open(path_str, encoding="utf-8", mode='r') as f:
            line = f.readline()
            if line:
                datas = eval(line)
                # 将日期格式转为datetime
                for d in datas:
                    for k in d:
                        if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                            d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                return datas
        return None
 
    def get_history_bars_code_count(self, day):
        """
        获取某一天的历史K线的数量
        @param day:
        @return:
        """
        dir_path = self.__get_cache_dir()
        count = 0
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(day) >= 0:
                    count += 1
        return count