Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
third_data/history_k_data_manager.py
@@ -1,14 +1,17 @@
"""
历史K线管理
"""
import copy
import datetime
import os
import threading
import constant
from code_attribute import gpcode_manager
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
from third_data import history_k_data_util
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool, init_data_util
@@ -17,16 +20,17 @@
    更新历史K线
    @return: 此次更新的数量
    """
    def update(codes_):
        for code in codes_:
            try:
                datas = init_data_util.get_volumns_by_code(code)
                HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
                datas = init_data_util.get_volumns_by_code(code, 150)
                if datas:
                    HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
            except Exception as e:
                logger_debug.exception(e)
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    if previous_trading_date is None:
        raise Exception("上一个交易日获取失败")
    latest_trading_date = history_k_data_util.get_k_bar_dead_date()
    # 刷新目标代码的自由流通量
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
    codes = set()
@@ -36,17 +40,34 @@
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    # 获取已经更新的数据
    codes_record = HistoryKDataManager().get_history_bars_codes(previous_trading_date)
    codes_record = HistoryKDataManager().get_history_bars_codes(latest_trading_date)
    codes = codes - codes_record
    threading.Thread(target=lambda: update(codes), daemon=True).start()
    return len(codes)
def re_set_price_pres(codes, force=False):
    # 通过历史数据缓存获取
    # 获取上一个交易日
    day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str())
    not_codes = []
    for code in codes:
        if not tool.is_can_buy_code(code):
            continue
        pre_close = HistoryKDataManager().get_pre_close(code, day)
        if pre_close is not None:
            gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force)
        else:
            not_codes.append(code)
    if not_codes:
        init_data_util.re_set_price_pres(not_codes, force)
class HistoryKDataManager:
    __instance = None
    __db = 0
    __history_k_day_datas = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -99,15 +120,20 @@
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if day not in self.__history_k_day_datas:
            self.__history_k_day_datas[day] = {}
        if datas:
            self.__history_k_day_datas[day][code] = datas
            # 将日期格式化
            fdatas = []
            for d in datas:
                for k in d:
                    if type(d[k]) == datetime.datetime:
                        d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S")
        with open(path_str, encoding="utf-8", mode='w') as f:
            f.write(f"{datas}")
                dd = copy.deepcopy(d)
                for k in dd:
                    if type(dd[k]) == datetime.datetime:
                        dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
                fdatas.append(dd)
            with open(path_str, encoding="utf-8", mode='w') as f:
                f.write(f"{fdatas}")
        self.__del_outdate_datas(code)
    def get_history_bars(self, code, day):
@@ -117,6 +143,8 @@
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code]
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
@@ -134,6 +162,45 @@
                return datas
        return None
    def load_data(self):
        """
        加载数据
        @param day:
        @return:
        """
        day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str())
        cache_dir = self.__get_cache_dir()
        if not os.path.exists(cache_dir):
            return
        fs = os.listdir(cache_dir)
        for f in fs:
            if f.find(day) < 0:
                continue
            with open(os.path.join(cache_dir, f), mode='r', encoding='utf-8') as fs:
                line = fs.readline()
                if line:
                    datas = eval(line)
                    # 将日期格式转为datetime
                    for d in datas:
                        for k in d:
                            if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                                d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                    if datas:
                        if day not in self.__history_k_day_datas:
                            self.__history_k_day_datas[day] = {}
                        self.__history_k_day_datas[day][datas[0]['sec_id']] = datas
    def get_pre_close(self, code, day):
        """
        获取之前的收盘价
        @param code:
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code][0]["close"]
        return None
    def get_history_bars_codes(self, day):
        """
        获取某一天的历史K线的代码数据
@@ -149,3 +216,7 @@
                if file.find(day) >= 0:
                    codes.add(file.split("_")[1][:6])
        return codes
if __name__ == "__main__":
    print(HistoryKDataManager().get_history_bars_codes("2024-12-31"))