Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""
历史K线管理
"""
import copy
import datetime
import os
import threading
 
import constant
from code_attribute import gpcode_manager
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
from third_data import history_k_data_util
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool, init_data_util
 
 
def update_history_k_bars():
    """
    更新历史K线
    @return: 此次更新的数量
    """
 
    def update(codes_):
        for code in codes_:
            try:
                datas = init_data_util.get_volumns_by_code(code, 150)
                if datas:
                    HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
            except Exception as e:
                logger_debug.exception(e)
 
    latest_trading_date = history_k_data_util.get_k_bar_dead_date()
    # 刷新目标代码的自由流通量
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
    codes = set()
    if codes_sh:
        for code_byte in codes_sh:
            codes.add(code_byte.decode())
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    # 获取已经更新的数据
    codes_record = HistoryKDataManager().get_history_bars_codes(latest_trading_date)
    codes = codes - codes_record
    threading.Thread(target=lambda: update(codes), daemon=True).start()
 
    return len(codes)
 
 
def re_set_price_pres(codes, force=False):
    # 通过历史数据缓存获取
    # 获取上一个交易日
    day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str())
    not_codes = []
    for code in codes:
        if not tool.is_can_buy_code(code):
            continue
        pre_close = HistoryKDataManager().get_pre_close(code, day)
        if pre_close is not None:
            gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force)
        else:
            not_codes.append(code)
    if not_codes:
        init_data_util.re_set_price_pres(not_codes, force)
 
 
class HistoryKDataManager:
    __instance = None
    __db = 0
    __history_k_day_datas = {}
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
 
    @classmethod
    def __load_data(cls):
        pass
 
    def __get_cache_dir(self):
        """
        获取缓存路径
        @return:
        """
        dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        return dir_path
 
    def __del_outdate_datas(self, code):
        dir_path = self.__get_cache_dir()
        datas = []
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(code) < 0:
                    continue
                path_ = os.path.join(root, file)
                day = file.split("_")[0]
                datas.append((day, path_))
        # 保存最新的一条数据
        if datas:
            datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
            datas = datas[1:]
            for d in datas:
                os.remove(d[1])
 
    def save_history_bars(self, code, day, datas, force=False):
        """
        保存历史K线
        @param code: 代码
        @param day: K线最新的日期(取datas最新一条数据的日期)
        @param datas: 数据
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if day not in self.__history_k_day_datas:
            self.__history_k_day_datas[day] = {}
        if datas:
            self.__history_k_day_datas[day][code] = datas
            # 将日期格式化
            fdatas = []
            for d in datas:
                dd = copy.deepcopy(d)
                for k in dd:
                    if type(dd[k]) == datetime.datetime:
                        dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
                fdatas.append(dd)
            with open(path_str, encoding="utf-8", mode='w') as f:
                f.write(f"{fdatas}")
        self.__del_outdate_datas(code)
 
    def get_history_bars(self, code, day):
        """
        获取历史K线
        @param code:
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code]
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if not os.path.exists(path_str):
            return None
        with open(path_str, encoding="utf-8", mode='r') as f:
            line = f.readline()
            if line:
                datas = eval(line)
                # 将日期格式转为datetime
                for d in datas:
                    for k in d:
                        if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                            d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                return datas
        return None
 
    def load_data(self):
        """
        加载数据
        @param day:
        @return:
        """
        day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str())
        cache_dir = self.__get_cache_dir()
        if not os.path.exists(cache_dir):
            return
        fs = os.listdir(cache_dir)
        for f in fs:
            if f.find(day) < 0:
                continue
            with open(os.path.join(cache_dir, f), mode='r', encoding='utf-8') as fs:
                line = fs.readline()
                if line:
                    datas = eval(line)
                    # 将日期格式转为datetime
                    for d in datas:
                        for k in d:
                            if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                                d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                    if datas:
                        if day not in self.__history_k_day_datas:
                            self.__history_k_day_datas[day] = {}
                        self.__history_k_day_datas[day][datas[0]['sec_id']] = datas
 
    def get_pre_close(self, code, day):
        """
        获取之前的收盘价
        @param code:
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code][0]["close"]
        return None
 
    def get_history_bars_codes(self, day):
        """
        获取某一天的历史K线的代码数据
        @param day:
        @return: 代码集合
        """
 
        dir_path = self.__get_cache_dir()
        codes = set()
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(day) >= 0:
                    codes.add(file.split("_")[1][:6])
        return codes
 
 
if __name__ == "__main__":
    print(HistoryKDataManager().get_history_bars_codes("2024-12-31"))