Administrator
2024-07-03 7ee37fb1e32d3d5dc527e90c4c0c87c5313f6a56
历史K线管理
1个文件已修改
1个文件已添加
114 ■■■■■ 已修改文件
third_data/history_k_data_manager.py 113 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_manager.py
New file
@@ -0,0 +1,113 @@
"""
历史K线管理
"""
import datetime
import os
import constant
class HistoryKDataManager:
    __instance = None
    __db = 0
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        pass
    def __get_cache_dir(self):
        """
        获取缓存路径
        @return:
        """
        dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        return dir_path
    def __del_outdate_datas(self, code):
        dir_path = self.__get_cache_dir()
        datas = []
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(code) < 0:
                    continue
                path_ = os.path.join(root, file)
                day = file.split("_")[0]
                datas.append((day, path_))
        # 保存最新的一条数据
        if datas:
            datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
            datas = datas[1:]
            for d in datas:
                os.remove(d[1])
    def save_history_bars(self, code, day, datas, force=False):
        """
        保存历史K线
        @param code: 代码
        @param day: K线最新的日期(取datas最新一条数据的日期)
        @param datas: 数据
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if datas:
            # 将日期格式化
            for d in datas:
                for k in d:
                    if type(d[k]) == datetime.datetime:
                        d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S")
        with open(path_str, encoding="utf-8", mode='w') as f:
            f.write(f"{datas}")
        self.__del_outdate_datas(code)
    def get_history_bars(self, code, day):
        """
        获取历史K线
        @param code:
        @param day:
        @return:
        """
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
        if not os.path.exists(path_str):
            return None
        with open(path_str, encoding="utf-8", mode='r') as f:
            line = f.readline()
            if line:
                datas = eval(line)
                # 将日期格式转为datetime
                for d in datas:
                    for k in d:
                        if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
                            d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
                return datas
        return None
    def get_history_bars_code_count(self, day):
        """
        获取某一天的历史K线的数量
        @param day:
        @return:
        """
        dir_path = self.__get_cache_dir()
        count = 0
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                # 输出文件的绝对路径
                if file.find(day) >= 0:
                    count += 1
        return count
utils/tool.py
@@ -405,6 +405,7 @@
    """
    return get_market_type(code) == MARKET_TYPE_SZSE
def is_ge_code(code):
    """
    是否是创业板