import copy import time from db import redis_manager from db.redis_manager_delegate import RedisUtils from third_data import kpl_data_manager, kpl_util, kpl_api from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager, KPLCodeJXBlockManager from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager from third_data.third_blocks_manager import BlockMapManager, CodeThirdBlocksManager, SOURCE_TYPE_KPL from utils import tool from utils.kpl_data_db_util import KPLLimitUpDataUtil def block_run(): current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas) LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), current_datas) LatestLimitUpBlockManager().statistics_limit_up_block_infos() def get_code_current_block(): kpl_data_manager.get_current_limit_up_data_records(10) current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas) CodeLimitUpSequenceManager().set_current_limit_up_datas(current_datas) code = "301136" limit_up_sequence = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code) if limit_up_sequence: print( f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})") if __name__ == "__main__": limit_up_timestamp = time.time() current_before_codes_info = [("000333", time.time())] if tool.trade_time_sub(tool.timestamp_format(limit_up_timestamp, '%H:%M:%S'), tool.timestamp_format(current_before_codes_info[-1][1], '%H:%M:%S')) >= 10 * 60: print("123123") # code = "603825" # k3 = CodesHisReasonAndBlocksManager().get_history_blocks(code) # print(k3) # k3 = CodesHisReasonAndBlocksManager().get_history_blocks(code) # print(k3) # print(KPLLimitUpDataUtil.get_latest_block_infos(code="000561")) # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks()) # code_plate_key_manager.ForbiddenBlockManager().add("测试2") # code_plate_key_manager.ForbiddenBlockManager().add("测试3") # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks()) # print( code_plate_key_manager.ForbiddenBlockManager().is_in("测试")) # print(code_plate_key_manager.ForbiddenBlockManager().is_in("测试1")) # RedisUtils.run_loop()