Administrator
2024-07-16 cf551c3d66c1410bfdfd9f808e98cba77dec5cd1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
历史K线管理
"""
import datetime
import os
import threading
 
import constant
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
from third_data import history_k_data_util
from utils import tool, init_data_util
 
 
def update_history_k_bars():
    """
    更新历史K线
    @return: 此次更新的数量
    """
 
    def update(codes_):
        for code in codes_:
            try:
                datas = init_data_util.get_volumns_by_code(code, 150)
                HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
            except Exception as e:
                logger_debug.exception(e)
 
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    if previous_trading_date is None:
        raise Exception("上一个交易日获取失败")
    # 刷新目标代码的自由流通量
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
    codes = set()
    if codes_sh:
        for code_byte in codes_sh:
            codes.add(code_byte.decode())
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    # 获取已经更新的数据
    codes_record = HistoryKDataManager().get_history_bars_codes(previous_trading_date)
    codes = codes - codes_record
    threading.Thread(target=lambda: update(codes), daemon=True).start()
 
    return len(codes)
 
 
class HistoryKDataManager:
    __instance = None
    __db = 0
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
 
    @classmethod
    def __load_data(cls):
        pass
 
    def __get_cache_dir(self):
        """
        获取缓存路径
        @return:
        """
        dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        return dir_path
 
    def __del_outdate_datas(self, code):
        dir_path = self.__get_cache_dir()
        datas = []
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(code) < 0:
                    continue
                path_ = os.path.join(root, file)
                day = file.split("_")[0]
                datas.append((day, path_))
        # 保存最新的一条数据
        if datas:
            datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
            datas = datas[1:]
            for d in datas:
                os.remove(d[1])
 
    def save_history_bars(self, code, day, datas, force=False):
        """
        保存历史K线
        @param code: 代码
        @param day: K线最新的日期(取datas最新一条数据的日期)
        @param datas: 数据
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if datas:
            # 将日期格式化
            for d in datas:
                for k in d:
                    if type(d[k]) == datetime.datetime:
                        d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S")
 
        with open(path_str, encoding="utf-8", mode='w') as f:
            f.write(f"{datas}")
        self.__del_outdate_datas(code)
 
    def get_history_bars(self, code, day):
        """
        获取历史K线
        @param code:
        @param day:
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if not os.path.exists(path_str):
            return None
        with open(path_str, encoding="utf-8", mode='r') as f:
            line = f.readline()
            if line:
                datas = eval(line)
                # 将日期格式转为datetime
                for d in datas:
                    for k in d:
                        if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                            d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                return datas
        return None
 
    def get_history_bars_codes(self, day):
        """
        获取某一天的历史K线的代码数据
        @param day:
        @return: 代码集合
        """
 
        dir_path = self.__get_cache_dir()
        codes = set()
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(day) >= 0:
                    codes.add(file.split("_")[1][:6])
        return codes