import logging
|
|
import constant
|
from code_attribute import gpcode_manager, code_nature_analyse
|
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
|
from strategy.data_downloader import DataDownloader
|
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
|
from strategy.strategy_params_settings import StrategyParamsSettings, StrategyParamsSettingsManager
|
from strategy.strategy_variable import StockVariables
|
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
|
from third_data import kpl_util
|
from third_data.third_blocks_manager import BlockMapManager
|
from utils import tool, huaxin_util
|
|
|
class BackTest:
|
|
def __init__(self, day, script_name="低吸脚本_辨识度_v3.py", settings=StrategyParamsSettingsManager().get_settings()):
|
self.day = day
|
scripts = ""
|
with open(script_name, mode='r', encoding='utf-8') as f:
|
lines = f.readlines()
|
scripts = "\n".join(lines)
|
# 注释掉里面的import与变量
|
scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv = ").replace("settings = ",
|
"#settings = ").replace(
|
"target_code = ", "#target_code = ")
|
self.settings = settings
|
self.scripts = scripts
|
self.RANGE_TIMES = ("09:25:00", "11:30:00")
|
self.current_time = '09:25:00'
|
|
self.stock_variables_dict = {}
|
self.data_loader: DataLoader = None
|
self.timeline_data = None
|
self.current_data = None
|
self.current_tick_data = None
|
self.fcodes = None
|
# 领涨代码的板块,{代码:{"板块":(代码, 领涨次数, 板块最大领涨次数)}}
|
self.head_rise_code_blocks = {}
|
# 已经成交的代码
|
self.deal_codes = set()
|
# 板块已经成交的代码
|
self.deal_block_codes = {}
|
|
def set_script(self, script):
|
self.scripts = script
|
|
def load_before_date_data_by_timeline(self, data_loader: DataLoader):
|
"""
|
加载回测日期之前的K线数据与历史涨停数据
|
:return: 按时间排序的数据列表
|
"""
|
day = self.day
|
trade_days = data_loader.load_trade_days()
|
timeline_data = []
|
# 加载历史数据
|
kline_data = data_loader.load_kline_data()
|
valid_codes = set(kline_data.keys())
|
minute_data = {} # data_loader.load_minute_data()
|
limit_up_record_data = data_loader.load_limit_up_data()
|
next_trade_day = data_loader.load_next_trade_day()
|
if not trade_days:
|
raise Exception("交易日历获取失败")
|
if not kline_data:
|
raise Exception("历史日K获取失败")
|
if not kline_data:
|
raise Exception("历史涨停获取失败")
|
# 统计120个交易日内代码涨停原因对应的涨停次数排名前3的板块
|
min_day = data_loader.trade_days[120 - 1]
|
block_code_dates = {}
|
for d in limit_up_record_data:
|
# 只统计封板
|
if d[3] != 0:
|
continue
|
if d[1] < min_day:
|
continue
|
code, date, block = d[0], d[1], d[2]
|
if block not in block_code_dates:
|
block_code_dates[block] = {}
|
if code not in block_code_dates[block]:
|
block_code_dates[block][code] = set()
|
block_code_dates[block][code].add(date)
|
# 统计的代码的涨停原因
|
code_blocks = {}
|
for b in block_code_dates:
|
if b in constant.KPL_INVALID_BLOCKS:
|
continue
|
# if b == '跨境电商':
|
# print("")
|
code_limit_up_count_list = [(x, len(block_code_dates[b][x])) for x in block_code_dates[b]]
|
code_limit_up_count_list.sort(key=lambda e: e[1], reverse=True)
|
end_index = 3
|
# code_limit_up_count_list = code_limit_up_count_list[:3]
|
for i in range(end_index, len(code_limit_up_count_list)):
|
if code_limit_up_count_list[end_index - 1][1] == code_limit_up_count_list[i][1]:
|
end_index = i + 1
|
code_limit_up_count_list = code_limit_up_count_list[:end_index]
|
|
for x in code_limit_up_count_list:
|
if x[1] < 3:
|
continue
|
if x[0] not in code_blocks:
|
code_blocks[x[0]] = set()
|
code_blocks[x[0]].add(b)
|
return {
|
'date': day,
|
'kline_data': kline_data,
|
'valid_codes': valid_codes,
|
'minute_data': minute_data,
|
'limit_up_record_data': limit_up_record_data,
|
'limit_up_record_data_list': limit_up_record_data,
|
"trade_days": trade_days,
|
"next_trade_day": next_trade_day,
|
"code_blocks": code_blocks
|
}
|
|
def load_current_date_data_by_timeline(self):
|
"""
|
加载回测日期当天的数据,将这些数据根据秒切片
|
:param day: 日期,格式为"YYYY-MM-DD
|
:return: 按时间排序的数据列表
|
"""
|
if self.day >= '2025-05-26':
|
IS_BY_BIG_ORDER = True
|
else:
|
IS_BY_BIG_ORDER = False
|
day = self.day
|
fdata = {}
|
__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(day)
|
all_limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list()
|
fdata["limit_up_list"] = {d[0][:8]: d[1] for d in all_limit_up_list}
|
big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD)
|
if not big_order_deals or IS_BY_BIG_ORDER:
|
big_order_deals = __LowSuctionOriginDataExportManager.export_big_order_deal_by(BIG_ORDER_MONEY_THRESHOLD)
|
# 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))]
|
big_order_deals_dict = {}
|
for code in big_order_deals:
|
for order in big_order_deals[code]:
|
time_str = huaxin_util.convert_time(order[3])
|
d = (code, order)
|
if time_str not in big_order_deals_dict:
|
big_order_deals_dict[time_str] = []
|
big_order_deals_dict[time_str].append(d)
|
for k in big_order_deals_dict:
|
datas = big_order_deals_dict[k]
|
datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
|
fdata["big_order"] = big_order_deals_dict
|
big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal(BIG_ORDER_MONEY_THRESHOLD)
|
if not big_sell_order_deals or IS_BY_BIG_ORDER:
|
big_sell_order_deals = __LowSuctionOriginDataExportManager.export_big_sell_order_deal_by(
|
BIG_ORDER_MONEY_THRESHOLD)
|
big_sell_order_deals_dict = {}
|
for code in big_sell_order_deals:
|
for order in big_sell_order_deals[code]:
|
time_str = huaxin_util.convert_time(order[3])
|
d = (code, order)
|
if time_str not in big_sell_order_deals_dict:
|
big_sell_order_deals_dict[time_str] = []
|
big_sell_order_deals_dict[time_str].append(d)
|
for k in big_sell_order_deals_dict:
|
datas = big_sell_order_deals_dict[k]
|
datas.sort(key=lambda x: huaxin_util.convert_time(x[1][3], True))
|
fdata["big_sell_order"] = big_sell_order_deals_dict
|
|
# 加载自由流通量
|
zylt_volume_dict = __LowSuctionOriginDataExportManager.export_zylt_volume()
|
fdata["zylt_volume"] = zylt_volume_dict
|
|
# 加载板块代码
|
code_plates_dict = __LowSuctionOriginDataExportManager.export_code_plates()
|
|
code_plates_dict_for_refer = self.data_loader.load_code_plates_for_refer()
|
|
plate_codes = self.data_loader.load_target_plate_and_codes()
|
code_plates_dict_for_buy = {}
|
for p in plate_codes:
|
for code in plate_codes.get(p):
|
if code not in code_plates_dict_for_buy:
|
code_plates_dict_for_buy[code] = set()
|
code_plates_dict_for_buy[code].add(p)
|
fdata["code_plates_for_buy"] = code_plates_dict_for_buy
|
fdata["code_plates_for_refer"] = code_plates_dict_for_refer
|
|
fdata["code_plates"] = code_plates_dict
|
# 加载板块流入(流入为正)
|
block_in_datas = __LowSuctionOriginDataExportManager.export_block_in_datas()
|
fdata["block_in"] = {d[0][:8]: d[1] for d in block_in_datas}
|
|
special_codes = __LowSuctionOriginDataExportManager.export_special_codes()
|
temp_code_plates = {}
|
for plate in special_codes:
|
for code in special_codes[plate]:
|
if code not in temp_code_plates:
|
temp_code_plates[code] = set()
|
temp_code_plates[code].add(plate)
|
for code in temp_code_plates:
|
code_plates_dict[code] = temp_code_plates[code]
|
# 获取所有涨停原因下面的领涨个股信息,得到的信息格式:{"代码":{板块名称}}
|
refer_plates_of_codes = self.data_loader.load_all_refer_plates_of_codes()
|
fdata["limit_up_plate_names_of_refer_code"] = refer_plates_of_codes
|
|
fdata["all_buy_plates_of_codes"] = self.data_loader.load_all_buy_plates_of_codes()
|
# print("*****", plate_names_of_code.get("600774"))
|
|
if not fdata["zylt_volume"]:
|
raise Exception("无自由流通数据")
|
if not fdata["code_plates"]:
|
raise Exception("无板块数据")
|
if not fdata["big_order"]:
|
raise Exception("无大单数据")
|
if not fdata["limit_up_list"]:
|
raise Exception("无涨停数据")
|
if not fdata["limit_up_plate_names_of_refer_code"]:
|
raise Exception("无涨停领涨原因数据")
|
|
return fdata
|
|
def load_current_tick_datas(self, data_loader: DataLoader):
|
"""
|
加载Tick数据
|
@param data_loader:
|
@return: Tick数据
|
"""
|
code_tick_datas = data_loader.load_tick_data(target_codes=self.fcodes)
|
# 根据时间集成
|
fdata = {}
|
for code in code_tick_datas:
|
for tick in code_tick_datas[code]:
|
__time_str = tick["created_at"][-8:]
|
if __time_str not in fdata:
|
fdata[__time_str] = []
|
fdata[__time_str].append(tick)
|
if not fdata:
|
raise Exception("无分时K线数据")
|
|
return fdata
|
|
def __run_backtest(self, code, stock_variables: StockVariables):
|
"""
|
执行回测
|
@param stock_variables:
|
@return: 是否可以买
|
"""
|
global_dict = {
|
"sv": stock_variables,
|
"target_code": code,
|
"settings": self.settings
|
}
|
exec(self.scripts, global_dict)
|
return global_dict["compute_result"]
|
|
def __filter_codes(self, current_data, timeline_data):
|
code_plates = current_data["code_plates"]
|
start_time, end_time = self.RANGE_TIMES[0], self.RANGE_TIMES[1]
|
fplates = set()
|
for i in range(60 * 60 * 5):
|
time_str = tool.trade_time_add_second(start_time, i)
|
if time_str > end_time:
|
break
|
self.current_time = time_str
|
# 统计当前涨停数据
|
current_limit_up_list = current_data["limit_up_list"].get(time_str)
|
if current_limit_up_list:
|
# 统计板块涨停
|
plate_codes_info = {}
|
for x in current_limit_up_list:
|
plates = code_plates.get(x[0])
|
if plates:
|
for p in plates:
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
plate_codes_info[p].append((x[0], x[2]))
|
# print(time_str, "汽车零部件", plate_codes_info.get("汽车零部件"))
|
# 有效的板块
|
valid_plates = set([p for p in plate_codes_info if len(plate_codes_info[p]) >= 2])
|
fplates |= valid_plates
|
codes = set([code for code in code_plates if code_plates[code] & fplates])
|
fcodes = set()
|
for c in codes:
|
if c not in timeline_data["kline_data"]:
|
continue
|
# 自由流通市值30-300亿
|
pre_close = timeline_data["kline_data"].get(c)[0]["close"]
|
if 30e8 <= current_data["zylt_volume"].get(c) * pre_close <= 300e8:
|
fcodes.add(c)
|
|
return fcodes
|
|
def __get_target_codes_v4(self):
|
valid_codes = self.timeline_data["valid_codes"]
|
return set(self.current_data["code_plates_for_buy"].keys()) & valid_codes
|
|
def init_stock_variables(self, code_, timeline_data, current_data):
|
"""
|
初始化变量
|
@param code_:
|
@return:
|
"""
|
if code_ in self.stock_variables_dict:
|
return
|
|
if code_ == '002907':
|
print("")
|
|
stock_variables = StrategyVariableFactory.create_from_history_data(
|
timeline_data["kline_data"].get(code_), timeline_data["minute_data"].get(code_),
|
timeline_data["limit_up_record_data"].get(code_), timeline_data["trade_days"])
|
|
# 加载今日涨停价
|
pre_close = timeline_data["kline_data"].get(code_)[0]["close"]
|
stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2)
|
stock_variables.自由流通市值 = current_data["zylt_volume"].get(code_) * pre_close
|
# 获取代码板块
|
stock_variables.代码板块 = current_data["code_plates_for_buy"].get(code_)
|
is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, timeline_data["kline_data"].get(code_),
|
stock_variables.今日涨停价)
|
# if is_price_too_high[0]:
|
# print("六个交易日涨幅过高", code_)
|
stock_variables.六个交易日涨幅过高 = is_price_too_high[0]
|
stock_variables.新代码板块 = timeline_data["code_blocks"].get(code_)
|
stock_variables.辨识度代码 = self.fcodes
|
stock_variables.领涨板块信息 = self.head_rise_code_blocks.get(code_)
|
if code_ in DEBUG_CODES:
|
print(code_, stock_variables.领涨板块信息)
|
|
for day in [2, 5, 10, 30, 60, 120]:
|
days = timeline_data["trade_days"][:day]
|
stock_variables.__setattr__(f"日出现的板块_{day}",
|
KPLLimitUpDataAnalyzer.get_limit_up_reasons(
|
timeline_data["limit_up_record_data_list"], min_day=days[-1],
|
max_day=days[0]))
|
stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
|
timeline_data["limit_up_record_data_list"], self.data_loader.trade_days[:2])
|
|
self.stock_variables_dict[code_] = stock_variables
|
|
def load_data(self):
|
"""
|
加载数据
|
@return:历史数据, 今日数据, tick数据
|
"""
|
# 提前下载数据
|
__DataLoader = DataLoader(self.day)
|
plates = __DataLoader.get_limit_up_reasons_with_plate_code()
|
for p in plates:
|
__DataLoader.load_plate_codes(p[0], p[1])
|
|
if not self.data_loader:
|
self.data_loader = DataLoader(self.day)
|
if not self.current_data:
|
self.current_data = self.load_current_date_data_by_timeline()
|
# 按时间轴加载数据
|
if not self.timeline_data:
|
self.timeline_data = self.load_before_date_data_by_timeline(self.data_loader)
|
# TODO 输出目标代码
|
if not self.fcodes:
|
# self.fcodes, self.head_rise_code_blocks = self.__get_target_codes_v3() # __filter_codes(current_data, timeline_data)
|
self.fcodes, self.head_rise_code_blocks = self.__get_target_codes_v4(), {}
|
|
print(len(self.fcodes), self.fcodes)
|
if not self.current_tick_data:
|
try:
|
self.current_tick_data = self.load_current_tick_datas(self.data_loader)
|
except:
|
pass
|
|
__DataDownloader = DataDownloader(self.day, self.data_loader.trade_days)
|
__DataDownloader.download_tick_data(self.fcodes)
|
|
def __statistic_big_order_info(self, stock_variables: StockVariables):
|
"""
|
统计大单信息
|
@param stock_variables:
|
@return:
|
"""
|
infos = []
|
thresholds = [50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 10000]
|
for i in range(len(thresholds)):
|
if i >= len(thresholds) - 1:
|
break
|
start, end = thresholds[i], thresholds[i + 1]
|
info = [f"{start}w-{end}w", 0, None, None]
|
# 统计买单
|
total_buy_count = 0
|
total_buy_volume = 0
|
total_buy_money = 0
|
if stock_variables.今日大单数据:
|
order_ids = set()
|
for d in reversed(stock_variables.今日大单数据):
|
if d[0] in order_ids:
|
continue
|
order_ids.add(d[0])
|
if start * 10000 <= d[2] < end * 10000:
|
total_buy_count += 1
|
total_buy_money += d[2]
|
total_buy_volume += d[1]
|
total_sell_count = 0
|
total_sell_money = 0
|
total_sell_volume = 0
|
if stock_variables.今日卖大单数据:
|
order_ids = set()
|
for d in reversed(stock_variables.今日卖大单数据):
|
if d[0] in order_ids:
|
continue
|
order_ids.add(d[0])
|
if start * 10000 <= d[2] < end * 10000:
|
total_sell_count += 1
|
total_sell_money += d[2]
|
total_sell_volume += d[1]
|
info[1] = f"{round((total_buy_volume - total_sell_volume) * 100 / stock_variables.今日成交量, 2)}%"
|
info[2] = (total_buy_count, total_buy_money, total_buy_volume)
|
info[3] = (total_sell_count, total_sell_money, total_sell_volume)
|
if info[2][0] > 0 or info[3][0] > 0:
|
infos.append(info)
|
return ";".join([f"{x[0]}==净额:{x[1]},买单:{x[2]},卖单:{x[3]}" for x in infos])
|
|
def run(self):
|
self.load_data()
|
# print(self.fcodes)
|
limit_up_record_data_dict = {}
|
for limit_up_item in self.timeline_data["limit_up_record_data"]:
|
if limit_up_item[0] not in limit_up_record_data_dict:
|
limit_up_record_data_dict[limit_up_item[0]] = []
|
limit_up_record_data_dict[limit_up_item[0]].append(limit_up_item)
|
self.timeline_data["limit_up_record_data"] = limit_up_record_data_dict
|
next_trade_day = self.timeline_data["next_trade_day"]
|
start_time, end_time = "09:25:00", "12:00:00"
|
# 分钟K线
|
minute_bars_dict = {}
|
code_plates = self.current_data["code_plates"]
|
code_plates_for_refer = self.current_data["code_plates_for_refer"]
|
|
# 板块涨停代码信息
|
kpl_plate_limit_up_codes_info = None
|
plate_limit_up_codes_info = None
|
kpl_head_plate_limit_up_codes_info = None
|
|
latest_current_limit_up_list = None
|
|
latest_block_in_datas = None
|
|
# 根据板块获取目标票
|
target_plate_codes_infos = {}
|
for code in self.head_rise_code_blocks:
|
for p in self.head_rise_code_blocks[code]:
|
if p not in target_plate_codes_infos:
|
target_plate_codes_infos[p] = []
|
target_plate_codes_infos[p].append(self.head_rise_code_blocks[code][p])
|
for p in target_plate_codes_infos:
|
target_plate_codes_infos[p].sort(key=lambda x: x[1], reverse=True)
|
|
all_new_plates = set()
|
|
for i in range(60 * 60 * 5):
|
time_str = tool.trade_time_add_second(start_time, i)
|
# print(f"[{tool.get_now_time_str()}]", time_str)
|
if time_str > end_time:
|
break
|
ticks = self.current_tick_data.get(time_str) if self.current_tick_data else None
|
# ===============统计当前涨停数据
|
origin_current_limit_up_list = self.current_data["limit_up_list"].get(time_str, [])
|
current_limit_up_list = [x for x in origin_current_limit_up_list if kpl_util.get_high_level_count(x[4]) < 3]
|
|
if current_limit_up_list:
|
latest_current_limit_up_list = current_limit_up_list
|
|
if current_limit_up_list:
|
plate_codes_info = {}
|
# 统计板块涨停
|
for x in current_limit_up_list:
|
# 按代码的板块统计涨停板块中的代码数量
|
# 涨停过1分钟才算有效涨停
|
if tool.trade_time_sub(time_str, tool.timestamp_format(x[2], "%H:%M:%S")) < 60:
|
continue
|
plates = code_plates.get(x[0])
|
if plates:
|
for p in plates:
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
plate_codes_info[p].append((x[0], x[2]))
|
plate_limit_up_codes_info = plate_codes_info
|
|
plate_codes_info = {}
|
for x in current_limit_up_list:
|
# 按开盘啦涨停原因统计
|
p = x[5]
|
if p in constant.KPL_INVALID_BLOCKS:
|
continue
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
# 如果领涨代码里面没有当前票就不算这个板块的涨停原因
|
# 获取领涨数据
|
# head_plate_codes_info = self.data_loader.load_plate_codes(x[9], p)
|
# if head_plate_codes_info:
|
# plate_codes = set([x[0] for x in head_plate_codes_info])
|
# else:
|
# plate_codes = set()
|
# if x[0] not in plate_codes:
|
# continue
|
plate_codes_info[p].append((x[0], x[2], x[4]))
|
kpl_plate_limit_up_codes_info = plate_codes_info
|
|
# {"代码":[(板块代码, 板块名称)]}
|
limit_up_plate_names_of_refer_code = self.current_data["limit_up_plate_names_of_refer_code"]
|
plate_codes_info = {}
|
for x in current_limit_up_list:
|
# 按开盘啦涨停原因统计
|
code = x[0]
|
# if code not in limit_up_plate_names_of_refer_code:
|
# continue
|
# 如果记录涨停时间过去20分钟就采用涨停队列的涨停原因
|
if tool.trade_time_sub(time_str, tool.timestamp_format(x[2], "%H:%M:%S")) < 60 * 20 or True:
|
plates_infos = limit_up_plate_names_of_refer_code.get(code)
|
plates = set([d[1] for d in plates_infos if d[1] == x[5]]) if plates_infos else set()
|
else:
|
plates = {x[5]}
|
|
new_plates = set()
|
for p in plates:
|
if p in constant.KPL_INVALID_BLOCKS:
|
continue
|
new_plates.add(p)
|
for p in new_plates:
|
if p not in plate_codes_info:
|
plate_codes_info[p] = []
|
plate_codes_info[p].append((x[0], x[2]))
|
kpl_head_plate_limit_up_codes_info = plate_codes_info
|
|
# ==================注入板块流入
|
block_in_datas = self.current_data["block_in"].get(time_str)
|
if block_in_datas:
|
blocks = [x[0] for x in block_in_datas if x[1] > 0]
|
block_in_datas = blocks[:20]
|
latest_block_in_datas = block_in_datas
|
|
# ================当前时刻大单
|
current_big_orders = self.current_data["big_order"].get(time_str)
|
if current_big_orders:
|
for big_order in current_big_orders:
|
# 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
|
self.init_stock_variables(big_order[0], self.timeline_data, self.current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
|
if stock_variables.今日大单数据 is None:
|
stock_variables.今日大单数据 = []
|
stock_variables.今日大单数据.append(big_order[1])
|
# 统计大单均价
|
order_ids = set()
|
total_money = 0
|
total_volume = 0
|
for order in reversed(stock_variables.今日大单数据):
|
if order[0] in order_ids:
|
continue
|
order_ids.add(order[0])
|
total_money += order[2]
|
total_volume += order[1]
|
if total_volume > 0:
|
stock_variables.今日大单均价 = round(total_money / total_volume, 2)
|
else:
|
stock_variables.今日大单均价 = 0
|
current_big_sell_orders = self.current_data["big_sell_order"].get(time_str)
|
if current_big_sell_orders:
|
for big_order in current_big_sell_orders:
|
# 格式 ("代码", (买单号, 量, 金额, 时间, 最终成交价))
|
self.init_stock_variables(big_order[0], self.timeline_data, self.current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(big_order[0])
|
if stock_variables.今日卖大单数据 is None:
|
stock_variables.今日卖大单数据 = []
|
stock_variables.今日卖大单数据.append(big_order[1])
|
|
# 开盘啦最正涨停原因
|
most_real_kpl_plate_limit_up_codes_info = {}
|
# 获取这个板块的目标票
|
if kpl_plate_limit_up_codes_info:
|
current_limit_up_dict = {x[0]: x for x in latest_current_limit_up_list}
|
codes = set()
|
for plate in kpl_plate_limit_up_codes_info:
|
kpl_plate_codes = kpl_plate_limit_up_codes_info.get(plate)
|
codes |= set([x[0] for x in kpl_plate_codes])
|
for code in codes:
|
plates = code_plates.get(code)
|
if not plates:
|
plates = {current_limit_up_dict.get(code)[5]}
|
plates -= constant.KPL_INVALID_BLOCKS
|
if plates:
|
for p in plates:
|
if p not in most_real_kpl_plate_limit_up_codes_info:
|
most_real_kpl_plate_limit_up_codes_info[p] = []
|
most_real_kpl_plate_limit_up_codes_info[p].append(code)
|
# print(time_str, "涨停数大于3个", [p for p in most_real_kpl_plate_limit_up_codes_info if
|
# len(most_real_kpl_plate_limit_up_codes_info[p]) >= 3])
|
|
# ---------测试--------
|
# test_plate = "化工"
|
# if len(most_real_kpl_plate_limit_up_codes_info.get(test_plate, [])) >= 3:
|
# print("测试开始=========")
|
# code_plates_for_buy = self.current_data["code_plates_for_buy"]
|
# plate_codes = [c for c in code_plates_for_buy if test_plate in code_plates_for_buy[c]]
|
# print(f"{test_plate}满足", time_str, plate_codes)
|
# for c in plate_codes:
|
# sv: StockVariables = self.stock_variables_dict.get(c)
|
# if sv and sv.当前价 > sv.昨日收盘价:
|
# print(c)
|
# print("测试完毕=========")
|
|
if ticks:
|
for tick in ticks:
|
code = tick["symbol"][-6:]
|
if code not in self.fcodes:
|
continue
|
if DEBUG_CODES and code not in DEBUG_CODES:
|
continue
|
|
if code not in self.stock_variables_dict:
|
# 加载基础数据
|
self.init_stock_variables(code, self.timeline_data, self.current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(code)
|
if plate_limit_up_codes_info is not None:
|
stock_variables.板块涨停 = plate_limit_up_codes_info
|
|
if kpl_plate_limit_up_codes_info is not None:
|
stock_variables.开盘啦板块涨停 = kpl_plate_limit_up_codes_info
|
|
if kpl_head_plate_limit_up_codes_info is not None:
|
stock_variables.开盘啦领涨板块涨停 = kpl_head_plate_limit_up_codes_info
|
|
stock_variables.板块成交代码 = self.deal_block_codes
|
# 板块流入数据
|
if latest_block_in_datas:
|
stock_variables.资金流入板块 = latest_block_in_datas
|
# 暂时不用分钟K线
|
# if code not in minute_bars_dict:
|
# minute_bars_dict[code] = [tick]
|
# if minute_bars_dict[code][-1]["created_at"][:-2] == tick["created_at"][:-2]:
|
# # 统计分钟K线
|
# minute_bars_dict[code][-1] = tick
|
# else:
|
# # 保存分钟K线最高价
|
# if not stock_variables.今日最高价:
|
# stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
|
# if minute_bars_dict[code][-1]["price"] > stock_variables.今日最高价:
|
# stock_variables.今日最高价 = minute_bars_dict[code][-1]["price"]
|
|
# 保存开盘价
|
if tick["created_at"][-8:] < '09:30:00':
|
stock_variables.今日开盘价 = tick["price"]
|
# 今日开盘涨幅
|
stock_variables.今日开盘涨幅 = round((tick["price"] - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
|
4)
|
stock_variables.今日成交量 = tick["cum_volume"]
|
stock_variables.今日成交额 = tick["cum_amount"]
|
stock_variables.当前价 = tick["price"]
|
if not stock_variables.今日量够信息:
|
if stock_variables.今日成交量 > stock_variables.昨日成交量 * 0.8:
|
stock_variables.今日量够信息 = (time_str, stock_variables.当前价, round(
|
(stock_variables.当前价 - stock_variables.昨日收盘价) * 100 / stock_variables.昨日收盘价, 2),
|
self.__statistic_big_order_info(stock_variables))
|
if VOLUME_LOG_ENABLE:
|
# 统计大单净额,(50w以上,净额,买单个数/买单总金额,卖单个数/卖单总金额)
|
print("****量够", code, stock_variables.今日量够信息)
|
|
# 统计今日最高价
|
# if stock_variables.今日最高价 and tick["price"] > stock_variables.今日最高价:
|
# print(code, "====突破分时最高价:", tick["created_at"], tick["price"])
|
|
if not stock_variables.今日最高价信息 or tick["price"] > stock_variables.今日最高价信息[0]:
|
stock_variables.今日最高价信息 = (tick["price"], time_str)
|
|
if not stock_variables.今日最低价 or tick["price"] < stock_variables.今日最低价:
|
stock_variables.今日最低价 = tick["price"]
|
if most_real_kpl_plate_limit_up_codes_info:
|
stock_variables.开盘啦最正板块涨停 = most_real_kpl_plate_limit_up_codes_info
|
|
# if time_str >= '09:30:00':
|
# if stock_variables.今日大单数据 and stock_variables.开盘啦最正板块涨停 and max(
|
# [len(stock_variables.开盘啦最正板块涨停.get(x, [])) for x in stock_variables.代码板块]) >= 3:
|
# compute_result = self.__run_backtest(code, stock_variables)
|
# self.__process_test_result(code, stock_variables, next_trade_day, stock_variables.当前价,
|
# time_str, compute_result)
|
|
# if len(real_codes) >= 2 and time_str > '09:30:00':
|
# # print(time_str, plate)
|
# # 找这个板块领涨次数最多的票
|
# codes_infos = target_plate_codes_infos.get(plate)
|
# if codes_infos:
|
# for code_info in codes_infos:
|
# code = code_info[0]
|
# self.init_stock_variables(code, self.timeline_data, self.current_data)
|
# stock_variables: StockVariables = self.stock_variables_dict.get(code)
|
# compute_result = self.__run_backtest(code, stock_variables)
|
# if compute_result[0] and plate not in all_new_plates:
|
# all_new_plates.add(plate)
|
# print(plate, time_str, code_info, real_codes)
|
# else:
|
# pass
|
|
# 大单驱动
|
if current_big_orders and time_str >= '09:30:00':
|
for big_order in current_big_orders:
|
code = big_order[0]
|
if code not in self.fcodes:
|
continue
|
self.init_stock_variables(code, self.timeline_data, self.current_data)
|
stock_variables: StockVariables = self.stock_variables_dict.get(code)
|
if plate_limit_up_codes_info is not None:
|
stock_variables.板块涨停 = plate_limit_up_codes_info
|
|
if kpl_plate_limit_up_codes_info is not None:
|
stock_variables.开盘啦板块涨停 = kpl_plate_limit_up_codes_info
|
|
if kpl_head_plate_limit_up_codes_info is not None:
|
stock_variables.开盘啦领涨板块涨停 = kpl_head_plate_limit_up_codes_info
|
|
if most_real_kpl_plate_limit_up_codes_info is not None:
|
stock_variables.开盘啦最正板块涨停 = most_real_kpl_plate_limit_up_codes_info
|
|
if block_in_datas:
|
stock_variables.资金流入板块 = block_in_datas
|
|
stock_variables.当前价 = big_order[1][4]
|
try:
|
compute_result = self.__run_backtest(code, stock_variables)
|
# print(compute_result)
|
self.__process_test_result(code, stock_variables, next_trade_day, big_order[1][4],
|
huaxin_util.convert_time(big_order[1][3]), compute_result)
|
except Exception as e:
|
logging.exception(e)
|
|
print("可买题材:", all_new_plates)
|
|
def __process_test_result(self, code, stock_variables: StockVariables, next_trade_day, buy_price, time_str,
|
compute_result):
|
|
# if code == '000628':
|
# print(time_str, code, compute_result)
|
|
if not compute_result[0]:
|
if code in DEBUG_CODES:
|
print(time_str, code, compute_result[1])
|
# if compute_result[1].find("大单") >= 0 or compute_result[1].find("价格超过昨日最低价") >= 0:
|
pass
|
|
# print(code, time_str,stock_variables.代码板块, compute_result)
|
|
if compute_result[0] and code not in self.deal_codes:
|
# 最多买5个
|
if len(self.deal_codes) >= 100:
|
return
|
# if huaxin_util.convert_time(big_order[1][3]) >= "10:30:00" and len(deal_codes) > 0:
|
# break
|
self.deal_codes.add(code)
|
next_k_bars = self.data_loader.load_kline_data_by_day_and_code(next_trade_day, code)
|
current_k_bars = self.data_loader.load_kline_data_by_day_and_code(self.data_loader.now_day,
|
code)
|
if next_k_bars and buy_price:
|
t_rate = round((next_k_bars[0]["open"] - buy_price) * 100 / stock_variables.昨日收盘价, 2)
|
t_rate = f"{t_rate}%"
|
else:
|
# 获取当前的tick线
|
if self.data_loader.now_day >= next_trade_day:
|
ticks = self.data_loader.jueJinLocalApi.get_history_tick_n(code, 1, frequency='tick',
|
end_date=f"{next_trade_day} 09:30:03")
|
else:
|
ticks = None
|
if ticks:
|
t_rate = round((ticks[-1]["price"] - buy_price) * 100 / stock_variables.昨日收盘价, 2)
|
t_rate = f"{t_rate}%"
|
else:
|
t_rate = "未知"
|
if current_k_bars and buy_price:
|
c_rate = round((current_k_bars[0]["close"] - buy_price) * 100 / current_k_bars[0]["pre_close"], 2)
|
c_rate = f"{c_rate}%"
|
else:
|
# 拉取当日K线
|
if tool.get_now_date_str() == self.data_loader.now_day and buy_price:
|
tick = self.data_loader.jueJinLocalApi.get_history_tick_n(code, 1, frequency='tick',
|
end_date=f"{self.data_loader.now_day} {tool.get_now_time_str()}")
|
c_rate = round((tick[0]["price"] - buy_price) * 100 / stock_variables.昨日收盘价, 2)
|
else:
|
bar = self.data_loader.jueJinLocalApi.get_history_tick_n(code, 1,
|
end_date=f"{self.data_loader.now_day} 15:00:00")
|
if bar:
|
c_rate = round((bar[0]["close"] - buy_price) * 100 / bar[0]["pre_close"], 2)
|
else:
|
c_rate = "未知"
|
print(f"{len(self.deal_codes)}==回测结果:", code, gpcode_manager.CodesNameManager().get_code_name(code),
|
f"溢价率:{t_rate},当日盈亏:{c_rate},下单时间:{time_str},涨幅:{round((buy_price - stock_variables.昨日收盘价) * 100 / stock_variables.昨日收盘价, 2)}",
|
compute_result[1],
|
compute_result[2])
|
for b in compute_result[3]:
|
if b not in self.deal_block_codes:
|
self.deal_block_codes[b] = set()
|
self.deal_block_codes[b].add(code)
|
stock_variables.板块成交代码 = self.deal_block_codes
|
|
|
# DEBUG_CODES = ['603579', '300884']
|
DEBUG_CODES = []
|
|
VOLUME_LOG_ENABLE = False
|
# 备用大单
|
|
|
DEBUG_BLOCKS = []
|
|
BIG_ORDER_MONEY_THRESHOLD = 200e4
|
|
if __name__ == "__main__":
|
back_test_dict = {}
|
days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
|
"2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
|
"2025-05-30", "2025-06-03"]
|
# days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
|
# "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
|
# "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
|
# "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17"]
|
|
# days = ["2025-05-23"]
|
|
days.reverse()
|
for day in days:
|
if day not in back_test_dict:
|
# back_test_dict[day] = BackTest(day, "今日量是否足够.py")
|
back_test_dict[day] = BackTest(day, "strategy_script_v6.py")
|
print("=========================", day)
|
# back_test_dict[day].run_volume()
|
back_test_dict[day].run()
|