Administrator
2024-11-13 d2d5ca80907183f88a5e78aa28c085a746868d6d
third_data/kpl_data_manager.py
@@ -349,7 +349,7 @@
    @classmethod
    # 获取最近几天的数据,根据日期倒序返回
    def get_latest_from_file(cls, type, count):
    def get_latest_from_file(cls, type, count, max_day=tool.get_now_date_str()):
        files = os.listdir(constant.CACHE_PATH)
        file_name_list = []
        for f in files:
@@ -357,7 +357,6 @@
                file_name_list.append((f.split("_")[0], f))
        file_name_list.sort(key=lambda x: x[0], reverse=True)
        file_name_list = file_name_list[:count]
        fresults = []
        for file in file_name_list:
            path = f"{constant.CACHE_PATH}/{file[1]}"
@@ -366,7 +365,10 @@
            with open(path, 'r') as f:
                lines = f.readlines()
                if lines:
                    if int(file[0].replace("-", "")) <= int(max_day.replace("-", "")):
                    fresults.append((file[0], json.loads(lines[0])))
                if len(fresults) >=count:
                    break
        return fresults
@@ -411,18 +413,24 @@
# 获取最近几天的实时涨停信息
# 返回格式([日期,数据])
def get_current_limit_up_data_records(count):
def get_current_limit_up_data_records(count, day=tool.get_now_date_str()):
    fresults = []
    day = tool.get_now_date_str()
    datas = []
    if day in __limit_up_list_records_dict:
        datas = __limit_up_list_records_dict[day]
    else:
        logger_debug.info("从文件中获取前几天的实时涨停数据")
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2)
        if datas:
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2, max_day=day)
        # 移除比今天还大的数据
        fdatas = []
        for d in datas:
            if int(d[0].replace("-", "")) > int(day.replace("-", "")):
                continue
            fdatas.append(d)
        if fdatas:
            # 保存数据
            __limit_up_list_records_dict[day] = datas
            __limit_up_list_records_dict[day] = fdatas
    datas = __limit_up_list_records_dict[day]
    for i in range(len(datas)):
        if datas[i][0] == day:
            continue
@@ -447,10 +455,9 @@
__latest_current_limit_up_records = {}
def get_latest_current_limit_up_records():
    day = tool.get_now_date_str()
def get_latest_current_limit_up_records(day=tool.get_now_date_str(), max_day_count=15):
    if day not in __latest_current_limit_up_records:
        fdatas = get_current_limit_up_data_records(15)
        fdatas = get_current_limit_up_data_records(max_day_count)
        __latest_current_limit_up_records[day] = fdatas
    return __latest_current_limit_up_records.get(day)
@@ -599,6 +606,7 @@
        threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
@tool.singleton
class CodeHighLevel:
    """
    代码高度管理
@@ -608,15 +616,13 @@
    __code_level_dict = {}
    __codes = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeHighLevel, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    def __init__(self, day=tool.get_now_date_str()):
        self.__day = day
        self.__load_data(day)
    @classmethod
    def __load_data(cls):
        fdatas = get_current_limit_up_data_records(15)
    def __load_data(cls, day):
        fdatas = get_current_limit_up_data_records(15, day=day)
        temp_dict = {d[0]: 2 for d in fdatas[0][1]}
        break_codes = set()
        for i in range(1, len(fdatas)):
@@ -642,5 +648,5 @@
if __name__ == "__main__":
    print(CodeHighLevel().get_high_level("000833"))
    print(CodeHighLevel("2024-11-11").get_high_level("000833"))
    input()