""" 涨停数据管理 """ import threading import time from local_api.util import kpl_api, kpl_util, tool from local_api.log_module import log, log_export class KPLLimitUpDataManager: __update_time = 0 """ 开盘啦涨停数据管理 """ __current_limit_up_info = None __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(KPLLimitUpDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): lists = log_export.load_limit_up() if lists: cls.__current_limit_up_info = lists[-1] def get_current_limitup_info(self): return self.__current_limit_up_info def request_limit_up_list(self): """ 请求涨停数据 :return: [(代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额, 涨停原因代码, 涨停原因代码数量)] """ results = kpl_api.getLimitUpInfo() return kpl_util.parseLimitUpData(results) def start_task(self): def update(): while True: try: datas = self.request_limit_up_list() self.__current_limit_up_info = (tool.get_now_time_str(), datas) # 将数据保存到日志 log.logger_kpl_limit_up.info(f"{datas}") except: pass finally: time.sleep(3) self.__update_time = time.time() threading.Thread(target=update, daemon=True).start() def repaire_task(self): """ 修复任务 :return: """ if time.time() - self.__update_time < 20: return self.start_task() if __name__ == "__main__": datas = KPLLimitUpDataManager().get_current_limitup_info() print(datas)