| | |
| | | """ |
| | | 历史K线管理 |
| | | """ |
| | | import copy |
| | | import datetime |
| | | import os |
| | | import threading |
| | | |
| | | import constant |
| | | from code_attribute import gpcode_manager |
| | | from huaxin_client import l1_subscript_codes_manager |
| | | from log_module.log import logger_debug |
| | | from third_data import history_k_data_util |
| | | from third_data.history_k_data_util import HistoryKDatasUtils |
| | | from utils import tool, init_data_util |
| | | |
| | | |
| | |
| | | 更新历史K线 |
| | | @return: 此次更新的数量 |
| | | """ |
| | | |
| | | def update(codes_): |
| | | for code in codes_: |
| | | try: |
| | | datas = init_data_util.get_volumns_by_code(code) |
| | | HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas) |
| | | datas = init_data_util.get_volumns_by_code(code, 150) |
| | | if datas: |
| | | HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str()) |
| | | if previous_trading_date is None: |
| | | raise Exception("上一个交易日获取失败") |
| | | |
| | | latest_trading_date = history_k_data_util.get_k_bar_dead_date() |
| | | # 刷新目标代码的自由流通量 |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False) |
| | | codes = set() |
| | |
| | | for code_byte in codes_sz: |
| | | codes.add(code_byte.decode()) |
| | | # 获取已经更新的数据 |
| | | codes_record = HistoryKDataManager().get_history_bars_codes(previous_trading_date) |
| | | codes_record = HistoryKDataManager().get_history_bars_codes(latest_trading_date) |
| | | codes = codes - codes_record |
| | | threading.Thread(target=lambda: update(codes), daemon=True).start() |
| | | |
| | | return len(codes) |
| | | |
| | | |
| | | def re_set_price_pres(codes, force=False): |
| | | # 通过历史数据缓存获取 |
| | | # 获取上一个交易日 |
| | | day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) |
| | | not_codes = [] |
| | | for code in codes: |
| | | if not tool.is_can_buy_code(code): |
| | | continue |
| | | pre_close = HistoryKDataManager().get_pre_close(code, day) |
| | | if pre_close is not None: |
| | | gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force) |
| | | else: |
| | | not_codes.append(code) |
| | | if not_codes: |
| | | init_data_util.re_set_price_pres(not_codes, force) |
| | | |
| | | |
| | | class HistoryKDataManager: |
| | | __instance = None |
| | | __db = 0 |
| | | __history_k_day_datas = {} |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | |
| | | path_str = f"{cache_dir}/{file_name}" |
| | | if os.path.exists(path_str) and not force: |
| | | return |
| | | if day not in self.__history_k_day_datas: |
| | | self.__history_k_day_datas[day] = {} |
| | | if datas: |
| | | self.__history_k_day_datas[day][code] = datas |
| | | # 将日期格式化 |
| | | fdatas = [] |
| | | for d in datas: |
| | | for k in d: |
| | | if type(d[k]) == datetime.datetime: |
| | | d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S") |
| | | |
| | | with open(path_str, encoding="utf-8", mode='w') as f: |
| | | f.write(f"{datas}") |
| | | dd = copy.deepcopy(d) |
| | | for k in dd: |
| | | if type(dd[k]) == datetime.datetime: |
| | | dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S") |
| | | fdatas.append(dd) |
| | | with open(path_str, encoding="utf-8", mode='w') as f: |
| | | f.write(f"{fdatas}") |
| | | self.__del_outdate_datas(code) |
| | | |
| | | def get_history_bars(self, code, day): |
| | |
| | | @param day: |
| | | @return: |
| | | """ |
| | | if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: |
| | | return self.__history_k_day_datas[day][code] |
| | | cache_dir = self.__get_cache_dir() |
| | | file_name = f"{day}_{code}.txt" |
| | | path_str = f"{cache_dir}/{file_name}" |
| | |
| | | return datas |
| | | return None |
| | | |
| | | def load_data(self): |
| | | """ |
| | | 加载数据 |
| | | @param day: |
| | | @return: |
| | | """ |
| | | day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) |
| | | cache_dir = self.__get_cache_dir() |
| | | if not os.path.exists(cache_dir): |
| | | return |
| | | fs = os.listdir(cache_dir) |
| | | for f in fs: |
| | | if f.find(day) < 0: |
| | | continue |
| | | with open(os.path.join(cache_dir, f), mode='r', encoding='utf-8') as fs: |
| | | line = fs.readline() |
| | | if line: |
| | | datas = eval(line) |
| | | # 将日期格式转为datetime |
| | | for d in datas: |
| | | for k in d: |
| | | if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0: |
| | | d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S") |
| | | if datas: |
| | | if day not in self.__history_k_day_datas: |
| | | self.__history_k_day_datas[day] = {} |
| | | self.__history_k_day_datas[day][datas[0]['sec_id']] = datas |
| | | |
| | | def get_pre_close(self, code, day): |
| | | """ |
| | | 获取之前的收盘价 |
| | | @param code: |
| | | @param day: |
| | | @return: |
| | | """ |
| | | if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: |
| | | return self.__history_k_day_datas[day][code][0]["close"] |
| | | return None |
| | | |
| | | def get_history_bars_codes(self, day): |
| | | """ |
| | | 获取某一天的历史K线的代码数据 |
| | |
| | | if file.find(day) >= 0: |
| | | codes.add(file.split("_")[1][:6]) |
| | | return codes |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(HistoryKDataManager().get_history_bars_codes("2024-12-31")) |