import json import logging import os import datetime import pickle import re from dateutil.tz import tzfile, gettz from code_attribute import global_data_loader from log_module.log import logger_debug from strategy.data_analyzer import KTickLineAnalyzer from strategy.data_downloader import DataDownloader from strategy.low_suction_strategy import LowSuctionOriginDataExportManager from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils from utils import global_util, tool, huaxin_util def __get_special_codes(): """ 获取特殊的代码,需要订阅300w以上的大单 @return: 代码集合, 收盘价字典:{"code":收盘价} """ try: pre_close_price_dict = {} zylt_volume_map = global_util.zylt_volume_map codes = set() # TODO 测试 last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] for code in zylt_volume_map: volume = zylt_volume_map.get(code) # 今日涨停价要突破昨日最高价 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: # 自由流通市值在30亿-300亿以上 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) if limit_up_price > k_bars[0]["high"]: # 今日涨停价要突破昨日最高价 codes.add(code) pre_close_price_dict[code] = k_bars[0]["close"] return codes, pre_close_price_dict except Exception as e: logging.exception(e) return set(), None from dateutil.parser import parse code_ticks_dict = {} code_time_ticks_dict = {} code_minutes_dict = {} def __load_today_tick_info(code): now_day = tool.get_now_date_str() tick_path = f"D:\\datas\\ticks\\{now_day}_{code}.txt" if code not in code_ticks_dict: with open(tick_path, mode='r') as f: lines = f.readlines() line = lines[0] line = line.replace("datetime.datetime(", "\"datetime.datetime(").replace("('PRC'))", "('PRC'))\"") line = line.replace("'", "\"").replace("\"PRC\"", "'PRC'") ticks = json.loads(line) for t in ticks: created_at_line = t["created_at"].replace("datetime.datetime(", "") sts = created_at_line.split(",") year, month, day, hour, minute, second = sts[0].strip(), sts[1].strip(), sts[2].strip(), sts[3].strip(), \ sts[4].strip(), ( sts[5].strip() if len(sts) >= 7 else 0) tz = 'PRC' # 获取时区(如 'PRC' -> Asia/Shanghai) dt = datetime.datetime(int(year), int(month), int(day), int(hour), int(minute), int(second)) t["created_at"] = dt if code not in code_time_ticks_dict: code_time_ticks_dict[code] = {} code_time_ticks_dict[code][t["created_at"].strftime("%H:%M:%S")] = t # 整理为分钟K线 code_ticks_dict[code] = ticks if code not in code_minutes_dict: ticks = code_ticks_dict.get(code) bars = [] for tick in ticks: if not bars: bars.append(tick) temp_time_str = tick["created_at"].strftime("%H:%M:%S") if tick["created_at"].strftime("%H:%M") == bars[-1]["created_at"].strftime("%H:%M"): bars[-1] = tick else: bars.append(tick) code_minutes_dict[code] = bars def is_over_today_high_price(code, price, time_str): bars = code_minutes_dict.get(code) max_price = max([b["price"] for b in bars if b["created_at"].strftime("%H:%M") < time_str[:5]]) if price >= max_price: return True return False def is_over_yesterday_high_price(code, price, last_trade_day): k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) # 昨天是否涨停 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["pre_close"], 2) if k_bars[0]["high"] >= limit_up_price: # 昨日涨停或炸板 return False return price > k_bars[0]["high"] def is_over_5days_high_price(code, price, last_trade_day): k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) # 昨天是否涨停 max_price = max(k_bars[:5], key=lambda x: x["high"])["high"] return price > max_price def get_limit_up_list_by_time(limit_up_list, now_time_str): for d in limit_up_list: if now_time_str > d[0][:8]: return d[1] def get_block_limit_up_plate_code(limit_up_list, code_plates_dict): """ 根据涨停列表与代码的板块字典获取板块里面有哪些代码 @param limit_up_list: @param code_plates_dict: @return: """ plate_codes_dict = {} if limit_up_list: for d in limit_up_list: code = d[0] plates = code_plates_dict.get(code) if plates: for p in plates: if p not in plate_codes_dict: plate_codes_dict[p] = set() plate_codes_dict[p].add(code) return plate_codes_dict def __download_tick_data(code, data_downloader: DataDownloader): data_downloader.download_tick_data([code]) def __load_record_datas(codes, day): data_loader = DataLoader(day) kline_1d_dict = data_loader.load_kline_data() limit_up_list_dict = {} limit_up_list = data_loader.load_limit_up_data() for r in limit_up_list: code = r[0] if code not in limit_up_list_dict: limit_up_list_dict[code] = [] limit_up_list_dict[code].append(r) for code in codes: stockVariables = StrategyVariableFactory.create_from_history_data(kline_data_1d=kline_1d_dict.get(code), kline_data_60s_dict=None, limit_up_data_records=limit_up_list_dict.get( code), trade_days=data_loader.trade_days) print(stockVariables) if True: codes = {'002779', '300687', '600986', '002688', '600734', '002312', '300378', '002057', '002152', '002851', '600006', '300840', '300857', '002006', '002354', '002639', '300353', '000966', '603979', '603667', '603169', '300567', '300571', '300758', '002536', '603887', '600198', '300042', '000935', '603009', '002279', '002436', '002126', '600498', '300109', '600203', '600866', '600509', '301368', '605128', '002896', '600126', '300100', '000880', '002036', '000582', '600732', '600602', '000962', '603486', '301261', '300170', '300666', '002195', '300984', '600366', '000958', '600363', '000967', '002938', '603881', '600143', '002611', '002472', '002283', '605117', '603915', '002093', '000676', '002402', '600624', '300184', '300603', '301160', '300031', '600889', '603882', '002559', '601611', '600633', '002570', '002681', '002115', '600575', '002434', '300466', '600480', '603990', '002164', '002062', '000723', '600571', '000997', '300223', '300493', '002065', '000536', '000597', '600114', '600789', '000886', '300918', '002335', '600590', '300166', '600101', '002553', '600105', '605319', '300007', '000949', '001339', '600979', '300660', '000661', '603166', '002196', '601789', '600383', '300458', '600808', '002414', '002697', '300879', '000816', '002139', '603918', '300846', '300122', '600663', '603119', '300718', '000837', '600552', '000678', '603078', '601177', '002255', '300611', '000555', '000822', '002229', '002841', '300454', '600536', '002222', '300102', '603883', '002530', '603629', '300229', '300766', '002249', '002315', '300316', '300290', '300520', '603081', '300607', '000766', '002657', '601155', '002123', '000422', '600300', '002008', '603127', '300346', '300212', '603121', '000034', '000032', '002929', '002469', '002010', '301171', '000887', '605020', '003021', '002031', '000560', '605589', '300024', '603507', '301396', '603960', '600835', '301308', '603688', '002459', '002838', '002009', '300403', '002837', '603128', '600728', '603583', '300139', '001309', '603322', '600186', '603110', '300505', '000777', '600895', '603809', '300641', '002104', '002640', '002265', '300127', '600673', '002276', '601100', '000969', '600933', '002685', '002272', '600619', '000710', '000815', '002526', '600171', '300724', '002467', '603072', '002430', '300115', '002600', '000831', '600592', '000670', '605555', '600598', '603063', '300727', '301550', '605133', '002542', '002703', '603005', '300674', '000903', '002965', '600310', '300547', '002332', '002979', '603236', '603300', '300075', '002175', '002765', '002046', '002655', '603686', '603319', '002145', '002410', '300382', '002444', '300432', '600649', '603067', '002988', '600578', '002920', '000681', '002527', '603496', '603728', '000948', '002165', '600353', '300224', '603662', '300748', '603129', '600730', '002584', '301269', '603985', '000876', '000503', '000795', '605358', '300373', '603306', '000875', '600868', '002575', '002048', '600611', '600797', '300953', '002131', '300328', '300623', '600812', '002292', '300276', '002366', '600325', '600580', '301000', '603920', '603466', '000970', '603816', '600589', '000062', '600610'} __load_record_datas(codes, "2025-05-06") def __back_test1(): day = "2025-05-07" data_loader = DataLoader(day) trade_days = data_loader.load_trade_days() __DataDownloader = DataDownloader(day, trade_days) last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] global_data_loader.load_zyltgb_volume_from_db() special_codes, pre_close_dict = DataLoader(day).load_target_codes() __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(day) big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal_by() codes = big_order_deals.keys() codes = codes - special_codes for code in codes: big_order_deals.pop(code) codes = big_order_deals.keys() # ===========涨停原因分析=========== code_blocks = __LowSuctionOriginDataExportManager.export_code_plates() all_limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list() all_limit_up_list.reverse() back_test_results = [] tick_codes = set() for code in codes: # if code != "002265": # continue if tool.is_ge_code(code): continue big_orders = big_order_deals.get(code) limit_up_price = round(pre_close_dict[code] * tool.get_limit_up_rate(code), 2) # 判断价格是否过前高 for order in big_orders: # 涨停价成交的不算 if order[4] >= limit_up_price: continue rate = (order[4] - pre_close_dict[code]) / pre_close_dict[code] # 涨得过高或过低得不买 threshold_rate = int((tool.get_limit_up_rate(code) - 1) * 10) if rate >= 0.06 * threshold_rate or rate <= -0.06 * threshold_rate: continue order_time = huaxin_util.convert_time(order[3]) if order_time < "09:30:00": continue can_buy_infos = [] limit_up_list = get_limit_up_list_by_time(all_limit_up_list, huaxin_util.convert_time(order[3])) plates = code_blocks.get(code) plate_codes = get_block_limit_up_plate_code(limit_up_list, code_blocks) if plates: for p in plates: if plate_codes.get(p) and len(plate_codes.get(p)) >= 2: # 统计之前的大单数量 before_big_order_no = set() for bo in big_orders: if bo[0] == order[0]: break before_big_order_no.add(bo[0]) can_buy_infos.append((code, p, plate_codes.get(p), order, len(before_big_order_no))) if not can_buy_infos: continue if is_over_yesterday_high_price(code, order[4], last_trade_day): # print("===========", code, order, "过昨日最高") tick_codes.add(code) __download_tick_data(code, __DataDownloader) if is_over_today_high_price(code, order[4], huaxin_util.convert_time(order[3])): # print(code, order, "过今日前高") # print("======", can_buy_infos) back_test_results.extend(can_buy_infos) print("可能买的代码:", tick_codes) back_test_results.sort(key=lambda e: huaxin_util.convert_time(e[3][3])) print("最终回撤结果:=========") for r in back_test_results: print(r) print("计算完成") def __back_test2(): day = "2025-05-07" data_loader = DataLoader(day) trade_days = data_loader.load_trade_days() __DataDownloader = DataDownloader(day, trade_days) last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] global_data_loader.load_zyltgb_volume_from_db() special_codes, pre_close_dict = DataLoader(day).load_target_codes() __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(day) big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal_by() codes = big_order_deals.keys() codes = codes - special_codes for code in codes: big_order_deals.pop(code) codes = big_order_deals.keys() # ===========涨停原因分析=========== code_blocks = __LowSuctionOriginDataExportManager.export_code_plates() all_limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list() all_limit_up_list.reverse() back_test_results = [] tick_codes = set() code_buy_plates = {} can_buy_infos = [] for code in codes: print("==========开始回测代码", code) logger_debug.info("==========开始回测代码:{}", code) # if code != "002265": # continue if tool.is_ge_code(code): continue big_orders = big_order_deals.get(code) limit_up_price = round(pre_close_dict[code] * tool.get_limit_up_rate(code), 2) start_time, end_time = "09:25:00", "11:30:00" # 制造回测时间 for i in range(60 * 60 * 5): time_str = tool.trade_time_add_second(start_time, i) if time_str > end_time: break if code_time_ticks_dict.get(code) and time_str not in code_time_ticks_dict.get(code): continue limit_up_list = get_limit_up_list_by_time(all_limit_up_list, time_str) plates = code_blocks.get(code) plate_codes = get_block_limit_up_plate_code(limit_up_list, code_blocks) target_plates = set() if plates: for p in plates: if plate_codes.get(p) and len(plate_codes.get(p)) >= 2: # 判断 # 加载K线 target_plates.add(p) if not target_plates: # 板块不满足 continue # 下载K线 if code not in code_ticks_dict: __download_tick_data(code, __DataDownloader) # 将tick文件加载到内存中 __load_today_tick_info(code) tick = code_time_ticks_dict[code].get(time_str) if not tick: continue rate = (tick["price"] - pre_close_dict[code]) / pre_close_dict[code] # 涨得过高或过低得不买 threshold_rate = int((tool.get_limit_up_rate(code) - 1) * 10) if rate >= 0.06 * threshold_rate or rate <= -0.03 * threshold_rate: continue if tick["created_at"].strftime("%H:%M:%S") < "09:30:00": continue if not is_over_5days_high_price(code, tick["price"], last_trade_day): continue if not is_over_today_high_price(code, tick["price"], time_str): continue # 大单数据大于2个 order_ids = set([order[0] for order in big_orders if time_str > huaxin_util.convert_time(order[3]) >="09:30:00"]) if len(order_ids) >= 2: code_buy_plates[code] = target_plates can_buy_infos.append( (code, "买入时间:" + tick["created_at"].strftime("%H:%M:%S"), tick["price"], target_plates, {p: plate_codes.get(p) for p in target_plates}, f"大单数量:{len(order_ids)}")) break can_buy_infos.sort(key=lambda e: e[1]) print("最终回撤结果:=========") for r in can_buy_infos: print(r) print("计算完成") if __name__ == "__main__": data_loader = DataLoader("2025-06-10") kline_1d_dict = data_loader.load_kline_data() KTickLineAnalyzer.get_third_limit_up_days(kline_1d_dict.get("002907"), 10)