""" 历史K线管理 """ import copy import datetime import os import threading import constant from code_attribute import gpcode_manager from huaxin_client import l1_subscript_codes_manager from log_module.log import logger_debug from third_data import history_k_data_util from third_data.history_k_data_util import HistoryKDatasUtils from utils import tool, init_data_util def update_history_k_bars(): """ 更新历史K线 @return: 此次更新的数量 """ def update(codes_): for code in codes_: try: datas = init_data_util.get_volumns_by_code(code, 150) if datas: HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas) except Exception as e: logger_debug.exception(e) latest_trading_date = history_k_data_util.get_k_bar_dead_date() # 刷新目标代码的自由流通量 codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False) codes = set() if codes_sh: for code_byte in codes_sh: codes.add(code_byte.decode()) for code_byte in codes_sz: codes.add(code_byte.decode()) # 获取已经更新的数据 codes_record = HistoryKDataManager().get_history_bars_codes(latest_trading_date) codes = codes - codes_record threading.Thread(target=lambda: update(codes), daemon=True).start() return len(codes) def re_set_price_pres(codes, force=False): # 通过历史数据缓存获取 # 获取上一个交易日 day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) not_codes = [] for code in codes: if not tool.is_can_buy_code(code): continue pre_close = HistoryKDataManager().get_pre_close(code, day) if pre_close is not None: gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force) else: not_codes.append(code) if not_codes: init_data_util.re_set_price_pres(not_codes, force) class HistoryKDataManager: __instance = None __db = 0 __history_k_day_datas = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): pass def __get_cache_dir(self): """ 获取缓存路径 @return: """ dir_path = f"{constant.get_path_prefix()}/datas/k_bars" if not os.path.exists(dir_path): os.makedirs(dir_path) return dir_path def __del_outdate_datas(self, code): dir_path = self.__get_cache_dir() datas = [] for root, dirs, files in os.walk(dir_path): for file in files: # 输出文件的绝对路径 if file.find(code) < 0: continue path_ = os.path.join(root, file) day = file.split("_")[0] datas.append((day, path_)) # 保存最新的一条数据 if datas: datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True) datas = datas[1:] for d in datas: os.remove(d[1]) def save_history_bars(self, code, day, datas, force=False): """ 保存历史K线 @param code: 代码 @param day: K线最新的日期(取datas最新一条数据的日期) @param datas: 数据 @return: """ cache_dir = self.__get_cache_dir() file_name = f"{day}_{code}.txt" path_str = f"{cache_dir}/{file_name}" if os.path.exists(path_str) and not force: return if day not in self.__history_k_day_datas: self.__history_k_day_datas[day] = {} if datas: self.__history_k_day_datas[day][code] = datas # 将日期格式化 fdatas = [] for d in datas: dd = copy.deepcopy(d) for k in dd: if type(dd[k]) == datetime.datetime: dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S") fdatas.append(dd) with open(path_str, encoding="utf-8", mode='w') as f: f.write(f"{fdatas}") self.__del_outdate_datas(code) def get_history_bars(self, code, day): """ 获取历史K线 @param code: @param day: @return: """ if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: return self.__history_k_day_datas[day][code] cache_dir = self.__get_cache_dir() file_name = f"{day}_{code}.txt" path_str = f"{cache_dir}/{file_name}" if not os.path.exists(path_str): return None with open(path_str, encoding="utf-8", mode='r') as f: line = f.readline() if line: datas = eval(line) # 将日期格式转为datetime for d in datas: for k in d: if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0: d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S") return datas return None def load_data(self): """ 加载数据 @param day: @return: """ day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) cache_dir = self.__get_cache_dir() if not os.path.exists(cache_dir): return fs = os.listdir(cache_dir) for f in fs: if f.find(day) < 0: continue with open(os.path.join(cache_dir, f), mode='r', encoding='utf-8') as fs: line = fs.readline() if line: datas = eval(line) # 将日期格式转为datetime for d in datas: for k in d: if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0: d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S") if datas: if day not in self.__history_k_day_datas: self.__history_k_day_datas[day] = {} self.__history_k_day_datas[day][datas[0]['sec_id']] = datas def get_pre_close(self, code, day): """ 获取之前的收盘价 @param code: @param day: @return: """ if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: return self.__history_k_day_datas[day][code][0]["close"] return None def get_history_bars_codes(self, day): """ 获取某一天的历史K线的代码数据 @param day: @return: 代码集合 """ dir_path = self.__get_cache_dir() codes = set() for root, dirs, files in os.walk(dir_path): for file in files: # 输出文件的绝对路径 if file.find(day) >= 0: codes.add(file.split("_")[1][:6]) return codes if __name__ == "__main__": print(HistoryKDataManager().get_history_bars_codes("2024-12-31"))