from code_attribute import gpcode_manager, code_nature_analyse
|
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
|
from strategy.strategy_variable import StockVariables
|
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
|
from utils import tool, huaxin_util
|
|
|
class BackTest:
|
|
def __init__(self, day):
|
self.day = day
|
scripts = ""
|
with open("低吸脚本_辨识度.py", mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
scripts = "\n".join(lines)
|
# 注释掉里面的import与变量
|
scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv = ")
|
self.scripts = scripts
|
self.stock_variables_dict = {}
|
|
def load_before_date_data_by_timeline(self, data_loader: DataLoader):
|
"""
|
加载回测日期之前的K线数据与历史涨停数据
|
:return: 按时间排序的数据列表
|
"""
|
day = self.day
|
trade_days = data_loader.load_trade_days()
|
timeline_data = []
|
# 加载历史数据
|
kline_data = data_loader.load_kline_data()
|
minute_data = {} # data_loader.load_minute_data()
|
limit_up_record_data = data_loader.load_limit_up_data()
|
next_trade_day = data_loader.load_next_trade_day()
|
if not trade_days:
|
raise Exception("交易日历获取失败")
|
if not kline_data:
|
raise Exception("历史日K获取失败")
|
if not kline_data:
|
raise Exception("历史涨停获取失败")
|
|
return {
|
'date': day,
|
'kline_data': kline_data,
|
'minute_data': minute_data,
|
'limit_up_record_data': limit_up_record_data,
|
"trade_days": trade_days,
|
"next_trade_day": next_trade_day
|
}
|
|
def load_current_date_data_by_timeline(self):
|
"""
|
加载回测日期当天的数据,将这些数据根据秒切片
|
:param day: 日期,格式为"YYYY-MM-DD
|
:return: 按时间排序的数据列表
|
"""
|
day = self.day
|
fdata = {}
|
__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(day)
|
all_limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list()
|
fdata["limit_up_list"] = {d[0][:8]: d[1] for d in all_limit_up_list}
|
big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal()
|
if not big_order_deals:
|
big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal_by()
|
# 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))]
|
big_order_deals_dict = {}
|
for code in big_order_deals:
|
for order in big_order_deals[code]:
|
time_str = huaxin_util.convert_time(order[3])
|
d = (code, order)
|
if time_str not in big_order_deals_dict:
|
big_order_deals_dict[time_str] = []
|
big_order_deals_dict[time_str].append(d)
|
for k in big_order_deals_dict:
|
datas = big_order_deals_dict[k]
|
datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
|
fdata["big_order"] = big_order_deals_dict
|
big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal()
|
if not big_sell_order_deals:
|
big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal_by()
|
big_sell_order_deals_dict = {}
|
for code in big_sell_order_deals:
|
for order in big_sell_order_deals[code]:
|
time_str = huaxin_util.convert_time(order[3])
|
d = (code, order)
|
if time_str not in big_sell_order_deals_dict:
|
big_sell_order_deals_dict[time_str] = []
|
big_sell_order_deals_dict[time_str].append(d)
|
for k in big_sell_order_deals_dict:
|
datas = big_sell_order_deals_dict[k]
|
datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
|
fdata["big_sell_order"] = big_sell_order_deals_dict
|
|
# 加载自由流通量
|
zylt_volume_dict = __LowSuctionOriginDataExportManager.export_zylt_volume()
|
fdata["zylt_volume"] = zylt_volume_dict
|
|
code_plates_dict = __LowSuctionOriginDataExportManager.export_code_plates()
|
fdata["code_plates"] = code_plates_dict
|
|
special_codes = __LowSuctionOriginDataExportManager.export_special_codes()
|
temp_code_plates = {}
|
for plate in special_codes:
|
for code in special_codes[plate]:
|
if code not in temp_code_plates:
|
temp_code_plates[code] = set()
|
temp_code_plates[code].add(plate)
|
for code in temp_code_plates:
|
code_plates_dict[code] = temp_code_plates[code]
|
|
if not fdata["zylt_volume"]:
|
raise Exception("无自由流通数据")
|
if not fdata["code_plates"]:
|
raise Exception("无板块数据")
|
if not fdata["big_order"]:
|
raise Exception("无大单数据")
|
if not fdata["limit_up_list"]:
|
raise Exception("无涨停数据")
|
|
return fdata
|
|
def load_current_tick_datas(self, data_loader: DataLoader):
|
"""
|
加载Tick数据
|
:param day: 日期,格式为"YYYY-MM-DD
|
:return: Tick数据
|
"""
|
code_tick_datas = data_loader.load_tick_data()
|
# 根据时间集成
|
fdata = {}
|
for code in code_tick_datas:
|
for tick in code_tick_datas[code]:
|
__time_str = tick["created_at"][-8:]
|
if __time_str not in fdata:
|
fdata[__time_str] = []
|
fdata[__time_str].append(tick)
|
if not fdata:
|
raise Exception("无分时K线数据")
|
|
return fdata
|
|
def __run_backtest(self, code, stock_variables: StockVariables):
|
"""
|
执行回测
|
@param stock_variables:
|
@return: 是否可以买
|
"""
|
global_dict = {
|
"sv": stock_variables,
|
"target_code": code
|
}
|
exec(self.scripts, global_dict)
|
return global_dict["compute_result"]
|
|
def __filter_codes(self, current_data, timeline_data):
|
code_plates = current_data["code_plates"]
|
start_time, end_time = "09:25:00", "11:30:00"
|
fplates = set()
|
for i in range(60 * 60 * 5):
|
time_str = tool.trade_time_add_second(start_time, i)
|
if time_str > end_time:
|
break
|
# 统计当前涨停数据
|
current_limit_up_list = current_data["limit_up_list"].get(time_str)
|
if current_limit_up_list:
|
# 统计板块涨停
|
plate_codes_info = {}
|
for x in current_limit_up_list:
|
plates = code_plates.get(x[0])
|
if plates:
|
for p in plates:
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
plate_codes_info[p].append((x[0], x[2]))
|
# print(time_str, "汽车零部件", plate_codes_info.get("汽车零部件"))
|
# 有效的板块
|
valid_plates = set([p for p in plate_codes_info if len(plate_codes_info[p]) >= 2])
|
fplates |= valid_plates
|
codes = set([code for code in code_plates if code_plates[code] & fplates])
|
fcodes = set()
|
for c in codes:
|
if c not in timeline_data["kline_data"]:
|
continue
|
# 自由流通市值30-300亿
|
pre_close = timeline_data["kline_data"].get(c)[0]["close"]
|
if 30e8 <= current_data["zylt_volume"].get(c) * pre_close <= 300e8:
|
fcodes.add(c)
|
|
return fcodes
|
|
def __get_target_codes(self):
|
special_codes = LowSuctionOriginDataExportManager(self.day).export_special_codes()
|
fcodes = set()
|
for codes in [special_codes[p] for p in special_codes]:
|
fcodes |= codes
|
return fcodes
|
|
def init_stock_variables(self, code_, timeline_data, current_data):
|
"""
|
初始化变量
|
@param code_:
|
@return:
|
"""
|
if code_ in self.stock_variables_dict:
|
return
|
stock_variables = StrategyVariableFactory.create_from_history_data(
|
timeline_data["kline_data"].get(code_), timeline_data["minute_data"].get(code_),
|
timeline_data["limit_up_record_data"].get(code_), timeline_data["trade_days"])
|
# 加载今日涨停价
|
pre_close = timeline_data["kline_data"].get(code_)[0]["close"]
|
stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2)
|
stock_variables.自由流通市值 = current_data["zylt_volume"].get(code_) * pre_close
|
# 获取代码板块
|
stock_variables.代码板块 = current_data["code_plates"].get(code_)
|
is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, timeline_data["kline_data"].get(code_),
|
stock_variables.今日涨停价)
|
# if is_price_too_high[0]:
|
# print("六个交易日涨幅过高", code_)
|
stock_variables.六个交易日涨幅过高 = is_price_too_high[0]
|
self.stock_variables_dict[code_] = stock_variables
|
|
def run(self):
|
data_loader = DataLoader(self.day)
|
current_data = self.load_current_date_data_by_timeline()
|
# 按时间轴加载数据
|
timeline_data = self.load_before_date_data_by_timeline(data_loader)
|
# TODO 输出目标代码
|
fcodes = self.__get_target_codes() # __filter_codes(current_data, timeline_data)
|
# print(len(fcodes), fcodes)
|
current_tick_data = self.load_current_tick_datas(data_loader)
|
limit_up_record_data_dict = {}
|
for limit_up_item in timeline_data["limit_up_record_data"]:
|
if limit_up_item[0] not in limit_up_record_data_dict:
|
limit_up_record_data_dict[limit_up_item[0]] = []
|
limit_up_record_data_dict[limit_up_item[0]].append(limit_up_item)
|
timeline_data["limit_up_record_data"] = limit_up_record_data_dict
|
next_trade_day = timeline_data["next_trade_day"]
|
start_time, end_time = "09:25:00", "12:00:00"
|
# 分钟K线
|
minute_bars_dict = {}
|
code_plates = current_data["code_plates"]
|
# 板块以及买了的代码:{"板块":{"000333"}}
|
deal_block_codes = {}
|
deal_codes = set()
|
print("======", self.day)
|
# 制造回测时间
|
for i in range(60 * 60 * 5):
|
time_str = tool.trade_time_add_second(start_time, i)
|
if time_str > end_time:
|
break
|
ticks = current_tick_data.get(time_str)
|
# 统计当前涨停数据
|
current_limit_up_list = current_data["limit_up_list"].get(time_str)
|
if current_limit_up_list:
|
# 统计板块涨停
|
plate_codes_info = {}
|
for x in current_limit_up_list:
|
plates = code_plates.get(x[0])
|
if plates:
|
for p in plates:
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
plate_codes_info[p].append((x[0], x[2]))
|
else:
|
plate_codes_info = None
|
|
# 当前时刻大单
|
current_big_orders = current_data["big_order"].get(time_str)
|
if current_big_orders:
|
for big_order in current_big_orders:
|
# 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
|
self.init_stock_variables(big_order[0], timeline_data, current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
|
if stock_variables.今日大单数据 is None:
|
stock_variables.今日大单数据 = []
|
stock_variables.今日大单数据.append(big_order[1])
|
# 统计大单均价
|
order_ids = set()
|
total_money = 0
|
total_volume = 0
|
for order in reversed(stock_variables.今日大单数据):
|
if order[0] in order_ids:
|
continue
|
order_ids.add(order[0])
|
total_money += order[2]
|
total_volume += order[1]
|
if total_volume > 0:
|
stock_variables.今日大单均价 = round(total_money / total_volume, 2)
|
else:
|
stock_variables.今日大单均价 = 0
|
|
current_big_sell_orders = current_data["big_sell_order"].get(time_str)
|
if current_big_sell_orders:
|
for big_order in current_big_sell_orders:
|
# 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
|
self.init_stock_variables(big_order[0], timeline_data, current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
|
if stock_variables.今日卖大单数据 is None:
|
stock_variables.今日卖大单数据 = []
|
stock_variables.今日卖大单数据.append(big_order[1])
|
if ticks:
|
for tick in ticks:
|
code = tick["symbol"][-6:]
|
if code not in fcodes:
|
continue
|
if code not in self.stock_variables_dict:
|
# 加载基础数据
|
self.init_stock_variables(code, timeline_data, current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(code)
|
stock_variables.板块成交代码 = deal_block_codes
|
# 设置涨停数据
|
if plate_codes_info is not None:
|
stock_variables.板块涨停 = plate_codes_info
|
if code not in minute_bars_dict:
|
minute_bars_dict[code] = [tick]
|
if minute_bars_dict[code][-1]["created_at"][:-2] == tick["created_at"][:-2]:
|
# 统计分钟K线
|
minute_bars_dict[code][-1] = tick
|
else:
|
# 保存分钟K线最高价
|
if not stock_variables.今日最高价:
|
stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
|
if minute_bars_dict[code][-1]["price"] > stock_variables.今日最高价:
|
stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
|
# 保存开盘价
|
if tick["created_at"][-8:] < '09:30:00':
|
stock_variables.今日开盘价 = tick["price"]
|
# 今日开盘涨幅
|
stock_variables.今日开盘涨幅 = round((tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
|
4)
|
stock_variables.今日成交量 = tick["cum_volume"]
|
stock_variables.当前价 = tick["price"]
|
# 根据表达式计算是否可买
|
# compute_result = __run_backtest(code, stock_variables)
|
# # print("回测结果:",code, compute_result)
|
# if compute_result[0] and code not in deal_codes:
|
# # TODO 下单
|
# deal_codes.add(code)
|
# print("======回测结果:", code, tick["created_at"], tick["price"], compute_result[2])
|
# for b in compute_result[1]:
|
# if b not in deal_block_codes:
|
# deal_block_codes[b] = set()
|
# deal_block_codes[b].add(code)
|
|
# 大单驱动
|
if current_big_orders:
|
for big_order in current_big_orders:
|
code = big_order[0]
|
self.init_stock_variables(code, timeline_data, current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(code)
|
compute_result = self.__run_backtest(code, stock_variables)
|
# print("回测结果:",code, compute_result)
|
# if code == '002640':
|
# print(code, big_order, compute_result)
|
if compute_result[0] and code not in deal_codes:
|
# TODO 下单
|
deal_codes.add(code)
|
next_k_bars = data_loader.load_kline_data_by_day_and_code(next_trade_day, code)
|
current_k_bars = data_loader.load_kline_data_by_day_and_code(data_loader.now_day, code)
|
if next_k_bars:
|
t_rate = round((next_k_bars[0]["open"] - big_order[1][4]) * 100 / big_order[1][4], 2)
|
t_rate = f"{t_rate}%"
|
else:
|
t_rate = "未知"
|
if current_k_bars:
|
c_rate = round((current_k_bars[0]["close"] - big_order[1][4]) * 100 / big_order[1][4], 2)
|
c_rate = f"{c_rate}%"
|
else:
|
c_rate = "未知"
|
|
print("======回测结果:", code, f"溢价率:{t_rate},当日盈亏:{c_rate}", compute_result[2])
|
for b in compute_result[1]:
|
if b not in deal_block_codes:
|
deal_block_codes[b] = set()
|
deal_block_codes[b].add(code)
|
|
|
if __name__ == "__main__":
|
days = ["2025-05-06", "2025-05-07", "2025-05-08", "2025-05-09", "2025-05-12"]
|
days.reverse()
|
for day in days:
|
BackTest(day).run()
|