# 开盘啦涨停数据管理 import json import os import threading import time from utils import tool from kpl import kpl_util from kpl import kpl_api class KPLLimitUpDataManager: __instance = None limit_up_records_dict = {} limit_up_current_dict = {} __limit_up_history_datas_cache = {} __limit_up_current_datas_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLLimitUpDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls, day=tool.get_now_date_str()): record_apth = cls.__get_record_path(day) current_path = cls.__get_current_path(day) limit_up_records_dict = {} limit_up_current_dict = {} # 获取历史涨停 if os.path.exists(record_apth): children = os.listdir(record_apth) for c in children: with open(f"{record_apth}\\{c}", mode="r") as f: line = f.readline().strip() if line: try: data = json.loads(line) limit_up_records_dict[data[0]] = data except: pass # 获取实时涨停 if os.path.exists(current_path): with open(current_path, mode="r") as f: line = f.readline().strip() if line: try: datas = json.loads(line) for d in datas: limit_up_current_dict[d[0]] = d except: pass if day == tool.get_now_date_str(): cls.limit_up_records_dict = limit_up_records_dict cls.limit_up_current_dict = limit_up_current_dict return limit_up_records_dict, limit_up_current_dict def __save_data(self, limit_up_datas): record_path = self.__get_record_path() current_path = self.__get_current_path() if not os.path.exists(record_path): os.makedirs(record_path) for d in limit_up_datas: code = d[0] # 保存记录数据 with open(f"{record_path}\\{code}.log", mode='w') as f: f.write(json.dumps(d)) with open(current_path, mode='w') as f: f.write(json.dumps(limit_up_datas)) def add_data(self, limit_up_datas): temp_limit_up_dict = {} for d in limit_up_datas: temp_limit_up_dict[d[0]] = d self.limit_up_records_dict[d[0]] = d self.limit_up_current_dict = temp_limit_up_dict # 保存文件 threading.Thread(target=self.__save_data, args=(limit_up_datas,), daemon=True).start() def get_limit_up_current_datas(self, day=tool.get_now_date_str()): if day == tool.get_now_date_str(): temps = [self.limit_up_current_dict[k] for k in self.limit_up_current_dict] else: records, current = self.__load_data(day) temps = [current[k] for k in current] temps.sort(key=lambda x: x[2], reverse=True) return temps def get_limit_up_history_datas(self, day=tool.get_now_date_str()): if day == tool.get_now_date_str(): temps = [self.limit_up_records_dict[k] for k in self.limit_up_records_dict] else: records, current = self.__load_data(day) temps = [records[k] for k in records] temps.sort(key=lambda x: x[2], reverse=True) return temps # 从缓存文件获取某天的涨停数据 def get_limit_up_current_datas_from_file(self, day): current_path = self.__get_current_path(day) # 获取实时涨停 if os.path.exists(current_path): with open(current_path, mode="r") as f: line = f.readline().strip() if line: datas = json.loads(line) return datas return None # 从缓存文件获取某天的历史涨停数据 def get_limit_up_history_datas__from_file(self, day): record_apth = self.__get_record_path(day) if os.path.exists(record_apth): fdatas = [] children = os.listdir(record_apth) for c in children: with open(f"{record_apth}\\{c}", mode="r") as f: line = f.readline().strip() if line: data = json.loads(line) fdatas.append(data) return fdatas return None def get_limit_up_history_datas_cache(self, day): if day not in self.__limit_up_history_datas_cache: results = self.get_limit_up_history_datas__from_file(day) if results: self.__limit_up_history_datas_cache[day] = results return self.__limit_up_history_datas_cache.get(day) def get_limit_up_current_datas_cache(self, day): if day not in self.__limit_up_current_datas_cache: results = self.get_limit_up_current_datas_from_file(day) if results: self.__limit_up_current_datas_cache[day] = results return self.__limit_up_current_datas_cache.get(day) @classmethod def __get_record_path(cls, day=None): base_path = cls.__get_base_path(day) return f"{base_path}\\record" @classmethod def __get_current_path(cls, day=None): base_path = cls.__get_base_path(day) return f"{base_path}\\limit_up.log" @classmethod def __get_base_path(cls, day=None): if not day: day = tool.get_now_date_str() return f"{os.getcwd()}\\datas\\{day}" def run(self): while True: try: if not tool.is_trade_time(tool.get_now_time_str()): continue result = kpl_api.getLimitUpInfoNew() limit_up_datas = kpl_util.parseLimitUpData(result) KPLLimitUpDataManager().add_data(limit_up_datas) except: pass finally: time.sleep(3) if __name__ == '__main__': print(KPLLimitUpDataManager().get_limit_up_current_datas()) print(KPLLimitUpDataManager().get_limit_up_history_datas()) input()