import json import time import constant from code_attribute import global_data_loader from l2 import code_price_manager from log_module import async_log_util from log_module.log import logger_debug from third_data import kpl_data_manager, kpl_util, block_info, kpl_api from third_data.code_plate_key_manager import RealTimeKplMarketData from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from third_data.kpl_data_manager import KPLLimitUpDataRecordManager from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager from trade.buy_radical import new_block_processor from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager from utils import tool, init_data_util def block_run(): current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas) LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), current_datas) LatestLimitUpBlockManager().statistics_limit_up_block_infos() def get_code_current_block(): kpl_data_manager.get_current_limit_up_data_records(10) current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas) CodeLimitUpSequenceManager().set_current_limit_up_datas(current_datas) code = "301136" limit_up_sequence = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code) if limit_up_sequence: print( f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})") def test_block(): codes_str = "002845" codes = codes_str.split(",") # ["002889", "300337", "001298", "002771"] kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas() kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), kpl_data_manager.KPLDataManager.get_data( kpl_util.KPLDataType.LIMIT_UP)) for code in codes: # KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62, # kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons()) # block_info.init_code(code) # latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records() init_data_util.re_set_price_pre(code, True) limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code) if limit_up_data: limit_up_time = tool.to_time_str(limit_up_data[2]) RadicalBuyBlockManager.set_current_limit_up_datas( kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas) # CodePlateKeyBuyManager.update_can_buy_blocks(code, # kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, # kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, # latest_current_limit_up_records, # block_info.get_before_blocks_dict(), # kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict()) # can_buy_result = CodePlateKeyBuyManager.can_buy(code) # print(can_buy_result) # if can_buy_result: # if can_buy_result[0]: # blocks = ",".join([f"{x[0]}-{x[1] + 1}({x[2]}&{x[3] - x[2]})" for x in can_buy_result[0]]) # print(blocks) yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() if yesterday_codes is None: yesterday_codes = set() result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes) print(code, result) if result[0]: can_buy_result = RadicalBuyDataManager.is_code_can_buy(code) if can_buy_result[0]: print("可以买", code, result) else: print("不可以买", code, can_buy_result) # l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None) def update_history_limit_up_codes(): day = "2024-12-19" for i in range(0, 1): day = HistoryKDatasUtils.get_next_trading_date(day) # if day == "2024-12-20": # break results = kpl_api.getHistoryLimitUpInfo(day) result_list = kpl_util.parseDaBanData(json.dumps({"list": results, "errcode": 0}), kpl_util.DABAN_TYPE_LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(day, result_list, set_not_open=True) def do_limit_up(result_list_): def request_new_blocks_codes(blocks_info, all_new_blocks): """ 请求新板块的代码 @param blocks_info:[(板块名称,板块代码)] @return: """ yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() for bi in blocks_info: result = kpl_api.getCodesByPlate(bi[1]) result = json.loads(result) code_info_list = [] for d in result["list"]: if d[0] in yesterday_codes: continue # 涨幅要大于5% rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10)) if rate < 5: continue # 格式:(代码,涨幅) code_info_list.append((d[0], d[6])) if code_info_list: # 将代码加入新题材 new_block_processor.process_new_block_by_component_codes(bi[0], set([x[0] for x in code_info_list]), all_new_blocks) try: if result_list_: try: # 新题材 new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas( [(x[0], x[5]) for x in result_list_]) if new_block_codes: # 统计板块的代码 records = KPLLimitUpDataRecordManager.total_datas block_plate_code_dict = {} for x in records: block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15] # 新板块 update_new_block_plates = [] for b in new_block_codes: for c in new_block_codes[b]: new_block_processor.process_new_block_by_limit_up_list(c, b) for r in new_block_codes: if r in block_plate_code_dict: update_new_block_plates.append((r, block_plate_code_dict[r])) if update_new_block_plates: # 需要获取板块下的代码 request_new_blocks_codes(update_new_block_plates, new_block_codes.keys()) except: pass except Exception as e: logger_debug.exception(e) if __name__ == "__main__": try: global_data_loader.load_zyltgb() KPLLimitUpDataRecordManager.load_total_datas() records = KPLLimitUpDataRecordManager.total_datas # 计算今日新增的题材概念 with open("D:/trade_cache/2025-04-10_limit_up.log") as f: lines = f.readlines() do_limit_up(json.loads(lines[0])) code = "002163" print(LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)) print(BlockSpecialCodesManager().get_code_blocks(code)) print(BlockSpecialCodesManager().get_code_blocks_dict()) except: pass