"""
|
回测
|
"""
|
import datetime
|
import gc
|
import json
|
import logging
|
import os.path
|
import threading
|
import time
|
|
import constant
|
from log_module import log_export, async_log_util
|
from strategy import data_cache, target_codes_manager, instant_time_market
|
from strategy.forbidden_plates_manager import ForbiddenPlatesManager
|
from strategy.order_methods import TodayBuyCodeManager
|
from strategy.trade_setting import BuyMoneyPerCodeManager
|
from utils import tool, juejin_api
|
import unittest
|
from unittest import mock
|
|
TICK_DIR_PATH = "D:/datas/ticks"
|
|
START_TIME, END_TIME = '09:24:00', '10:00:00'
|
|
|
class DataDownloader:
|
def __init__(self, day):
|
"""
|
初始化数据
|
:param day:
|
"""
|
self.day = day
|
|
def download_ticks(self, target_codes):
|
"""
|
下载ticks
|
:return:
|
"""
|
|
def __dowload_ticks(code):
|
tick_path = os.path.join(dir_, f"{code}.json")
|
# tick路径
|
if os.path.exists(tick_path):
|
return
|
results = juejin_api.JueJinApi.history_tick_n(tool.get_symbol(code), 5000, f"{self.day} 11:31:00", 1)
|
for result in results:
|
for k in result:
|
if type(result[k]) == datetime.datetime:
|
result[k] = result[k].strftime('%Y-%m-%d %H:%M:%S')
|
|
results = [x for x in results if x['created_at'] > f'{self.day} 09:24:00']
|
# 保存
|
with open(tick_path, mode='w', encoding='utf-8') as f:
|
f.write(json.dumps(results))
|
|
dir_ = f"{TICK_DIR_PATH}/{self.day}"
|
if not os.path.exists(dir_):
|
os.makedirs(dir_)
|
files = os.listdir(dir_)
|
downloaded_codes = set([name[:6] for name in files])
|
for code in target_codes:
|
if code in downloaded_codes:
|
continue
|
__start_time = time.time()
|
__dowload_ticks(code)
|
downloaded_codes.add(code)
|
left_count = len(target_codes - downloaded_codes)
|
print("剩余数量:", left_count, "剩余时间(秒):", int((time.time() - __start_time) * left_count))
|
|
|
@tool.singleton
|
class DataLoader:
|
def __init__(self, day):
|
self.day = day
|
|
def load_ticks(self, pre_close_price_dict, page: int, page_size: int):
|
"""
|
加载数据
|
:return:
|
"""
|
code_ticks_data_dict, total_count = self.__load_tick_data(pre_close_price_dict, page, page_size)
|
# 将数据改为以时间作为键
|
time_ticks = {}
|
for code in code_ticks_data_dict:
|
ticks = code_ticks_data_dict[code]
|
for tick in ticks:
|
time_str = tick[7][-8:]
|
if time_str not in time_ticks:
|
time_ticks[time_str] = []
|
time_ticks[time_str].append(tick)
|
return time_ticks, total_count
|
|
def load_all_ticks_from_log(self,min_time_str, max_time_str):
|
"""
|
加载所有的K线数据
|
:return:
|
"""
|
ticks_data_list = log_export.load_ticks_data(min_time_str, max_time_str, self.day)
|
ticks_data_dict = {}
|
for d in ticks_data_list:
|
time_str, data_list = d[0], d[1]
|
if time_str < '09:24:00':
|
continue
|
if time_str not in ticks_data_dict:
|
ticks_data_dict[time_str] = []
|
ticks_data_dict[time_str].extend(data_list)
|
return ticks_data_dict
|
|
def load_forbidden_buy_plates(self):
|
"""
|
加载所有的K线数据
|
:return:
|
"""
|
forbidden_plates_data_list = log_export.load_forbidden_plates_data(self.day)
|
return forbidden_plates_data_list
|
|
def __load_tick_data(self, pre_close_price_dict, page: int, page_size: int):
|
"""
|
加载tick数据
|
:return: {"代码":[(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)]}
|
"""
|
dir_ = f"{TICK_DIR_PATH}/{self.day}"
|
code_ticks_data_dict = {}
|
index = 0
|
names = os.listdir(dir_)
|
names.sort()
|
temp_names = names[(page - 1) * page_size: page * page_size]
|
for name in temp_names:
|
index += 1
|
# print(index)
|
code = name[:6]
|
path = os.path.join(dir_, name)
|
with open(path, mode='r', encoding='utf-8') as f:
|
line = f.readline()
|
try:
|
data = json.loads(line)
|
#[(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)]
|
fdata = [(d['symbol'][-6:], pre_close_price_dict.get(d['symbol'][-6:], 0), d['price'],
|
d['cum_volume'], d['cum_amount'],
|
[(dd['bid_p'], dd['bid_v']) for dd in d['quotes']],
|
[(dd['ask_p'], dd['ask_v']) for dd in d['quotes']], d['created_at'][-8:]) for d in data]
|
del data
|
if index % 50 == 0:
|
gc.collect()
|
code_ticks_data_dict[code] = fdata
|
except Exception as e:
|
logging.exception(e)
|
print(path)
|
return code_ticks_data_dict, len(names)
|
|
|
class VirtualTrade(unittest.TestCase):
|
|
def __init_data(self, now_day):
|
"""
|
初始化数据
|
:param now_day:
|
:return:
|
"""
|
# 今日变化的数据
|
current_datas = {}
|
data_cache.target_codes_manager = target_codes_manager.TargetCodesManager(day=now_day)
|
data_cache.DataCache().load_data(now_day)
|
k_bars_dict = log_export.load_k_bars(data_cache.DataCache().pre_trading_day)
|
# 加载K线数据
|
data_cache.all_stocks_all_K_line_property_dict = k_bars_dict
|
# 加载代码板块数据
|
code_plates_dict = log_export.load_kpl_code_plates(data_cache.DataCache().pre_trading_day)
|
data_cache.all_stocks_plate_dict = code_plates_dict
|
|
limit_up_datas = log_export.load_kpl_limit_up_datas(now_day)
|
# 涨停列表
|
current_datas["limit_up_list"] = {x[0]: x[1] for x in limit_up_datas}
|
stock_of_markets_plate_simple = log_export.load_stock_of_markets_plate_simple(START_TIME, END_TIME, now_day)
|
# 过滤涨速大于1
|
current_datas["stock_of_markets_plate_simple"] = {
|
x[0]: (x[1][0], {k: [xx for xx in v if xx[3] > 1 and int(xx[7]) >= 0] for k, v in x[1][1].items()}) for x in
|
stock_of_markets_plate_simple}
|
codes_info = data_cache.DataCache().min_stocks
|
fcodes = set([x[-6:] for x in codes_info])
|
# DataDownloader(now_day).download_ticks(fcodes)
|
pre_close_price_dict = {s[-6:]: v[0]['close'] for s, v in
|
data_cache.all_stocks_all_K_line_property_dict.items()}
|
current_datas["ticks"] = DataLoader(now_day).load_all_ticks_from_log(START_TIME,END_TIME)
|
current_datas["pre_close_price_dict"] = pre_close_price_dict
|
# 获取大单
|
deal_big_order_list = log_export.load_deal_big_order(now_day)
|
current_datas["deal_big_order"] = {x[0]: x[1] for x in deal_big_order_list}
|
|
current_datas["forbidden_plates"] = DataLoader(now_day).load_forbidden_buy_plates()
|
|
return current_datas
|
|
def __read_backtest_result(self):
|
"""
|
读取回测结果
|
:return:
|
"""
|
with open(f"D:/low_suction_log/gp/common/common.{tool.get_now_date_str()}.log", mode='r',
|
encoding='utf-8') as f:
|
lines = f.readlines()
|
results = []
|
for i in range(len(lines)):
|
if lines[i].find("指标下单") > 0:
|
for j in range(10):
|
d = lines[i + j]
|
if d.find("当前时间:") > 0:
|
results.append((d.split("当前时间:")[1][:8], d))
|
break
|
results.sort(key=lambda x: x[0])
|
for r in results:
|
print(r)
|
|
def test_run(self):
|
print("=================回测状态===============")
|
self.__read_backtest_result()
|
threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
|
now_day = '2025-08-21'
|
constant.IS_FOR_BACKTEST = True
|
current_datas = self.__init_data(now_day)
|
# 从早上09:25:00回测到11:30:00
|
start_time, end_time = START_TIME, END_TIME
|
# 初始化
|
data_cache.limit_up_block_names = []
|
data_cache.latest_code_market_info_dict = {}
|
if data_cache.big_order_deal_dict:
|
del data_cache.big_order_deal_dict
|
data_cache.big_order_deal_dict = {}
|
TodayBuyCodeManager().get_buy_codes = mock.Mock(return_value=set())
|
BuyMoneyPerCodeManager().set_money(2000)
|
constant.MAX_BUY_CODE_COUNT = 100
|
|
for i in range(0, 60 * 60 * 3):
|
# 时间驱动
|
time_str = tool.trade_time_add_second(start_time, i)
|
if time_str > end_time:
|
break
|
|
tool.get_now_time_str = mock.Mock(return_value=time_str)
|
now_date = datetime.datetime.strptime(f"{now_day} {time_str}",
|
"%Y-%m-%d %H:%M:%S")
|
with mock.patch('datetime.datetime', wraps=datetime.datetime) as mock_datetime:
|
mock_datetime.now.return_value = now_date
|
print(datetime.datetime.now())
|
|
# =======涨停数据
|
limit_up_list = current_datas["limit_up_list"].get(time_str)
|
stock_of_markets_plate_simple = current_datas["stock_of_markets_plate_simple"].get(time_str)
|
ticks = current_datas["ticks"].get(time_str)
|
deal_big_order = current_datas["deal_big_order"].get(time_str)
|
forbidden_plates_data = current_datas["forbidden_plates"]
|
forbidden_plates = set()
|
for d in forbidden_plates_data:
|
if d[0] > time_str:
|
break
|
if d[1] == 'add':
|
forbidden_plates.add(d[2])
|
elif d[1] == 'remove':
|
forbidden_plates.discard(d[2])
|
ForbiddenPlatesManager().forbidden_plates = list(forbidden_plates)
|
if limit_up_list is not None:
|
data_cache.limit_up_info = limit_up_list
|
# 提取涨停列表中的板块名称
|
limit_up_block_names = []
|
# 循环添加涨停概念
|
for info in data_cache.limit_up_info:
|
limit_up_block_names.append(info[5])
|
data_cache.limit_up_block_names = limit_up_block_names
|
# ======精选流入数据
|
if stock_of_markets_plate_simple is not None:
|
in_plates_info, plate_codes_info = stock_of_markets_plate_simple
|
# plate_codes_info: (s[1], s[0], s[6], s[9], s[4], s[2], s[40])
|
# for plate in plate_codes_info:
|
# code_info_list = plate_codes_info[plate]
|
# new_code_info_list = []
|
# for code_info in code_info_list:
|
# origin_plate_codes_info = [0] * 45
|
# indexes = [1, 0, 6, 9, 4, 2, 40]
|
# for i in range(len(indexes)):
|
# origin_plate_codes_info[indexes[i]] = code_info[i]
|
# new_code_info_list.append(origin_plate_codes_info)
|
# del code_info_list
|
# plate_codes_info[plate] = new_code_info_list
|
data_cache.market_sift_plate_stock_dict = plate_codes_info
|
data_cache.market_sift_plates = in_plates_info
|
# =======tick数据
|
if ticks is not None:
|
data_cache.latest_code_market_info_dict = {x[0]: x for x in ticks}
|
ticks = [t for t in ticks if not DEBUG_CODES or t[0] in DEBUG_CODES]
|
if ticks:
|
instant_time_market.set_current_info(ticks)
|
# ========大单数据
|
if deal_big_order:
|
for d in deal_big_order:
|
if d[1] != 0:
|
continue
|
code, data = d[0], d[2]
|
if code not in data_cache.big_order_deal_dict:
|
data_cache.big_order_deal_dict[code] = []
|
data_cache.big_order_deal_dict[code].append(d)
|
time.sleep(60)
|
self.__read_backtest_result()
|
|
|
DEBUG_CODES = {}
|
|
if __name__ == '__main__':
|
unittest.main()
|