| | |
| | | import json |
| | | import os |
| | | import time |
| | | |
| | | import constant |
| | | from utils import tool |
| | |
| | | |
| | | class KPLDataManager: |
| | | __latest_datas = {} |
| | | kpl_data_update_info = {} |
| | | |
| | | def __save_in_file(self, key, datas): |
| | | @classmethod |
| | | def __save_in_file(cls, key, datas): |
| | | name = f"{tool.get_now_date_str()}_{key}.log" |
| | | path = f"{constant.CACHE_PATH}/{name}" |
| | | with open(path, 'w') as f: |
| | | f.write(json.dumps(datas)) |
| | | |
| | | def __get_from_file(self, key, day=tool.get_now_date_str()): |
| | | @classmethod |
| | | def __get_from_file(cls, key, day=tool.get_now_date_str()): |
| | | name = f"{day}_{key}.log" |
| | | path = f"{constant.CACHE_PATH}/{name}" |
| | | if not os.path.exists(path): |
| | |
| | | return json.loads(lines[0]) |
| | | return None |
| | | |
| | | def get_from_file(self, type, day): |
| | | @classmethod |
| | | def get_from_file(cls, type, day): |
| | | name = f"{day}_{type.value}.log" |
| | | path = f"{constant.CACHE_PATH}/{name}" |
| | | if not os.path.exists(path): |
| | |
| | | return json.loads(lines[0]) |
| | | return None |
| | | |
| | | @classmethod |
| | | # 获取最近几天的数据,根据日期倒序返回 |
| | | def get_latest_from_file(self, type, count): |
| | | def get_latest_from_file(cls, type, count): |
| | | files = os.listdir(constant.CACHE_PATH) |
| | | file_name_list = [] |
| | | for f in files: |
| | |
| | | |
| | | return fresults |
| | | |
| | | def save_data(self, type, datas): |
| | | self.__latest_datas[type] = datas |
| | | self.__save_in_file(type, datas) |
| | | @classmethod |
| | | def save_data(cls, type, datas): |
| | | cls.kpl_data_update_info[type] = (tool.get_now_time_str(), len(datas)) |
| | | cls.__latest_datas[type] = datas |
| | | cls.__save_in_file(type, datas) |
| | | |
| | | def get_data(self, type): |
| | | @classmethod |
| | | def get_data(cls, type): |
| | | type = type.value |
| | | if type in self.__latest_datas: |
| | | return self.__latest_datas[type] |
| | | result = self.__get_from_file(type) |
| | | if type in cls.__latest_datas: |
| | | return cls.__latest_datas[type] |
| | | result = cls.__get_from_file(type) |
| | | if result is not None: |
| | | self.__latest_datas[type] = result |
| | | cls.__latest_datas[type] = result |
| | | return result |
| | | |
| | | |