"""
|
策略管理
|
"""
|
import json
|
|
from code_attribute import gpcode_manager, code_nature_analyse
|
from db import redis_manager_delegate as redis_manager
|
from db.mysql_data_delegate import Mysqldb
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import async_log_util
|
from log_module.log import logger_trade, logger_debug
|
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
|
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
|
from strategy.strategy_params_settings import StrategyParamsSettingsManager
|
from strategy.strategy_variable import StockVariables
|
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
|
import constant
|
from third_data import kpl_util
|
from trade.trade_manager import DealCodesManager
|
from utils import huaxin_util, tool
|
|
|
@tool.singleton
|
class TickSummaryDataManager:
|
"""
|
Tick概要数据
|
"""
|
__db = 13
|
|
# 成交代码的订单信息:{代码:{交易id:(量,价格,系统订单号)}}
|
|
def __init__(self):
|
# 开盘价信息
|
self.open_price_info_dict = {}
|
# 最低价
|
self.low_price_dict = {}
|
# 最高价
|
self.high_price_info_dict = {}
|
|
self.musql = Mysqldb()
|
self.redis_manager = redis_manager.RedisManager(self.__db)
|
self.__load_data()
|
|
def __get_redis(self):
|
return self.redis_manager.getRedis()
|
|
def __load_data(self):
|
keys = RedisUtils.keys(self.__get_redis(), "tick_open_price_info-*")
|
if keys:
|
for k in keys:
|
code = k.split("-")[1]
|
val = RedisUtils.get(self.__get_redis(), k)
|
if val:
|
self.open_price_info_dict[code] = json.loads(val)
|
keys = RedisUtils.keys(self.__get_redis(), "tick_high_price-*")
|
if keys:
|
for k in keys:
|
code = k.split("-")[1]
|
val = RedisUtils.get(self.__get_redis(), k)
|
if val:
|
self.high_price_info_dict[code] = json.loads(val)
|
|
keys = RedisUtils.keys(self.__get_redis(), "tick_low_price-*")
|
if keys:
|
for k in keys:
|
code = k.split("-")[1]
|
val = RedisUtils.get(self.__get_redis(), k)
|
if val:
|
self.low_price_dict[code] = round(float(val))
|
|
def set_open_price_info(self, code, info):
|
"""
|
设置开盘信息
|
@param code:
|
@param info:
|
@return:
|
"""
|
self.open_price_info_dict[code] = info
|
RedisUtils.setex_async(self.__db, f"tick_open_price_info-{code}", tool.get_expire(), json.dumps(info))
|
|
def set_high_price_info(self, code, info):
|
"""
|
设置最高价
|
@param code:
|
@param info:(价格, 时间)
|
@return:
|
"""
|
self.high_price_info_dict[code] = info
|
RedisUtils.setex_async(self.__db, f"tick_high_price-{code}", tool.get_expire(), json.dumps(info))
|
|
def set_low_price(self, code, price):
|
"""
|
设置最低价
|
@param code:
|
@param price:
|
@return:
|
"""
|
self.low_price_dict[code] = price
|
RedisUtils.setex_async(self.__db, f"tick_low_price-{code}", tool.get_expire(), price)
|
|
|
class LowSuctionStrategy:
|
"""
|
低吸策略
|
"""
|
|
def __init__(self, day, script_name="strategy_script_v6.py",
|
settings=StrategyParamsSettingsManager().get_settings(), need_load_data = False):
|
self.now_day = day
|
# 买大单:{代码:[大单数据]}
|
self.big_order_buy = {}
|
# 卖大单
|
self.big_order_sell = {}
|
# 自由流通量
|
self.zylt_volume_dict = {}
|
# 历史日K数据
|
self.kline_data = {}
|
# 历史涨停数据
|
self.limit_up_record_data = {}
|
# 历史数据
|
self.timeline_data = {}
|
# 今日数据
|
self.current_data = {}
|
# 目标代码的板块
|
self.code_plates_for_buy = {}
|
# 代码的板块(常规)
|
self.code_plates = {}
|
# 下一个交易日
|
self.next_trade_day = None
|
# 目标代码
|
self.fcodes = set()
|
# 变量对象
|
self.stock_variables_dict = {}
|
# 当前涨停列表
|
self.current_limit_up_list = []
|
# 当前涨停的板块所包含的代码
|
self.current_limit_up_plate_codes = {}
|
# 当前板块净流入
|
self.current_block_in_datas = []
|
|
# 加载策略脚本文件
|
with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r',
|
encoding='utf-8') as f:
|
lines = f.readlines()
|
scripts = "\n".join(lines)
|
# 注释掉里面的import与变量
|
scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv = ").replace("settings = ",
|
"#settings = ").replace(
|
"target_code = ", "#target_code = ")
|
|
self.scripts = scripts
|
self.settings = settings
|
|
self.data_loader = DataLoader(self.now_day)
|
self.__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(self.now_day)
|
if need_load_data:
|
self.load_data()
|
|
def load_data(self):
|
# 加载历史数据
|
self.__load_before_date_data_by_timeline()
|
self.__load_current_date_data_by_timeline()
|
self.fcodes = set(self.code_plates_for_buy.keys()) & set(self.kline_data.keys())
|
|
def __load_before_date_data_by_timeline(self):
|
"""
|
加载回测日期之前的K线数据与历史涨停数据
|
:return: 按时间排序的数据列表
|
"""
|
trade_days = self.data_loader.trade_days
|
# 加载历史数据
|
self.kline_data = self.data_loader.load_kline_data()
|
self.limit_up_record_data = self.data_loader.load_limit_up_data()
|
self.next_trade_day = self.data_loader.load_next_trade_day()
|
if not trade_days:
|
raise Exception("交易日历获取失败")
|
if not self.kline_data:
|
raise Exception("历史日K获取失败")
|
if not self.limit_up_record_data:
|
raise Exception("历史涨停获取失败")
|
|
def __load_current_date_data_by_timeline(self):
|
"""
|
加载回测日期当天的数据,将这些数据根据秒切片
|
:param day: 日期,格式为"YYYY-MM-DD
|
:return: 按时间排序的数据列表
|
"""
|
IS_BY_BIG_ORDER = False
|
BIG_ORDER_MONEY_THRESHOLD = 200e4
|
big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD)
|
if not big_order_deals or IS_BY_BIG_ORDER:
|
big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal_by(
|
BIG_ORDER_MONEY_THRESHOLD)
|
self.big_order_buy = big_order_deals
|
# 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))]
|
|
big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal(
|
BIG_ORDER_MONEY_THRESHOLD)
|
if not big_sell_order_deals or IS_BY_BIG_ORDER:
|
big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal_by(
|
BIG_ORDER_MONEY_THRESHOLD)
|
self.big_order_sell = big_sell_order_deals
|
# 加载自由流通量
|
self.zylt_volume_dict = self.__LowSuctionOriginDataExportManager.export_zylt_volume()
|
|
# 加载板块代码
|
code_plates_dict = self.__LowSuctionOriginDataExportManager.export_code_plates()
|
|
plate_codes = self.data_loader.load_target_plate_and_codes()
|
code_plates_dict_for_buy = {}
|
for p in plate_codes:
|
for code in plate_codes.get(p):
|
if code not in code_plates_dict_for_buy:
|
code_plates_dict_for_buy[code] = set()
|
code_plates_dict_for_buy[code].add(p)
|
self.code_plates_for_buy = code_plates_dict_for_buy
|
self.code_plates = code_plates_dict
|
|
if not self.zylt_volume_dict:
|
raise Exception("无自由流通数据")
|
if not self.code_plates:
|
raise Exception("无板块数据")
|
if not self.code_plates_for_buy:
|
raise Exception("无目标票的买入数据")
|
|
def __init_stock_variables(self, code_):
|
"""
|
初始化变量
|
@param code_:
|
@return:
|
"""
|
if code_ in self.stock_variables_dict:
|
return
|
stock_variables = StrategyVariableFactory.create_from_history_data(
|
self.kline_data.get(code_), None,
|
self.limit_up_record_data.get(code_), self.data_loader.trade_days)
|
|
# 加载今日涨停价
|
pre_close = self.kline_data.get(code_)[0]["close"]
|
stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2)
|
stock_variables.自由流通市值 = self.zylt_volume_dict.get(code_) * pre_close
|
# 获取代码板块
|
stock_variables.代码板块 = self.code_plates_for_buy.get(code_)
|
is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, self.kline_data.get(code_),
|
stock_variables.今日涨停价)
|
stock_variables.六个交易日涨幅过高 = is_price_too_high[0]
|
stock_variables.辨识度代码 = self.fcodes
|
for day in [2, 5, 10, 30, 60, 120]:
|
days = self.data_loader.trade_days[:day]
|
stock_variables.__setattr__(f"日出现的板块_{day}",
|
KPLLimitUpDataAnalyzer.get_limit_up_reasons(
|
self.limit_up_record_data, min_day=days[-1],
|
max_day=days[0]))
|
stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
|
self.limit_up_record_data, self.data_loader.trade_days[:2])
|
|
# 加载Tick信息
|
open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
|
if open_price_info:
|
stock_variables.今日开盘价 = open_price_info[0]
|
stock_variables.今日开盘涨幅 = open_price_info[1]
|
high_price_info = TickSummaryDataManager().high_price_info_dict.get(code_)
|
if high_price_info:
|
stock_variables.今日最高价信息 = high_price_info
|
|
low_price = TickSummaryDataManager().low_price_dict.get(code_)
|
if low_price:
|
stock_variables.今日最低价 = low_price
|
self.stock_variables_dict[code_] = stock_variables
|
|
def add_big_orders(self, big_orders):
|
"""
|
添加大单
|
@param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
|
@return:
|
"""
|
codes = []
|
for d in big_orders:
|
try:
|
code = d[0]
|
# 只计算200w以上的买单
|
if d[2][2] < 200e4:
|
continue
|
if d[1] == 0:
|
# 买单
|
if code not in self.big_order_buy:
|
self.big_order_buy[code] = []
|
self.big_order_buy[code].append(d[2])
|
else:
|
# 卖单
|
if code not in self.big_order_sell:
|
self.big_order_sell[code] = []
|
self.big_order_sell[code].append(d[2])
|
if code not in codes:
|
codes.append(code)
|
except Exception as e:
|
logger_debug.error(f"{d}")
|
# 驱动下单
|
for code in codes:
|
self.__run(code, self.stock_variables_dict.get(code))
|
|
def add_ticks(self, ticks):
|
"""
|
添加tick数据
|
@param ticks:
|
@return:
|
"""
|
fticks = [tick for tick in ticks if tick[0] in self.fcodes]
|
# 初始化对象
|
for tick in fticks:
|
code = tick[0]
|
self.__init_stock_variables(code)
|
stock_variables = self.stock_variables_dict.get(code)
|
|
# (代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)
|
code, time_str, price, cum_volume, cum_amount, buy_5 = tick[0], huaxin_util.convert_time(tick[1]), tick[2], \
|
tick[
|
3], tick[4], tick[5]
|
|
# 计算tick数据的值
|
if time_str < '09:30:00':
|
if price > 0:
|
stock_variables.今日开盘价 = price
|
else:
|
stock_variables.今日开盘价 = buy_5[0][0]
|
# 今日开盘涨幅
|
stock_variables.今日开盘涨幅 = round((stock_variables.今日开盘价 - stock_variables.昨日收盘价) / stock_variables.昨日收盘价,
|
4)
|
TickSummaryDataManager().set_open_price_info(code, (stock_variables.今日开盘价, stock_variables.今日开盘涨幅))
|
stock_variables.今日成交量 = cum_volume
|
stock_variables.今日成交额 = cum_amount
|
stock_variables.当前价 = price
|
if not stock_variables.今日最高价信息 or price > stock_variables.今日最高价信息[0]:
|
stock_variables.今日最高价信息 = (price, time_str)
|
TickSummaryDataManager().set_high_price_info(code, stock_variables.今日最高价信息)
|
if not stock_variables.今日最低价 or price < stock_variables.今日最低价:
|
stock_variables.今日最低价 = price
|
TickSummaryDataManager().set_low_price(code, stock_variables.今日最低价)
|
|
def add_limit_up_list(self, limit_up_list):
|
"""
|
涨停列表数据
|
@param limit_up_list:
|
@return:
|
"""
|
self.current_limit_up_list = limit_up_list
|
current_limit_up_list = [x for x in limit_up_list if kpl_util.get_high_level_count(x[4]) < 3]
|
most_real_kpl_plate_limit_up_codes_info = {}
|
current_limit_up_dict = {x[0]: x for x in current_limit_up_list}
|
codes = set(current_limit_up_dict.keys())
|
for code in codes:
|
plates = self.code_plates.get(code)
|
if not plates:
|
plates = {current_limit_up_dict.get(code)[5]}
|
plates -= constant.KPL_INVALID_BLOCKS
|
if plates:
|
for p in plates:
|
if p not in most_real_kpl_plate_limit_up_codes_info:
|
most_real_kpl_plate_limit_up_codes_info[p] = []
|
most_real_kpl_plate_limit_up_codes_info[p].append(code)
|
self.current_limit_up_plate_codes = most_real_kpl_plate_limit_up_codes_info
|
|
def add_block_in(self, block_in_datas):
|
"""
|
添加板块净流入
|
@param block_in_datas:
|
@return:
|
"""
|
blocks = [x[0] for x in block_in_datas if x[1] > 0]
|
_block_in_datas = blocks[:20]
|
self.current_block_in_datas = _block_in_datas
|
|
def __run(self, code, sv: StockVariables):
|
if not sv:
|
return
|
# 运行代码
|
# 注入大单
|
sv.今日大单数据 = self.big_order_buy.get(code)
|
sv.今日卖大单数据 = self.big_order_sell.get(code)
|
# 注入板块涨停代码
|
sv.开盘啦最正板块涨停 = self.current_limit_up_plate_codes
|
# 注入板块流入信息
|
if self.current_block_in_datas:
|
sv.资金流入板块 = self.current_block_in_datas
|
# 注入已成交代码
|
place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
|
sv.板块成交代码 = place_order_plate_codes
|
sv.成交代码 = DealCodesManager().get_deal_codes()
|
global_dict = {
|
"sv": sv,
|
"target_code": code,
|
"settings": self.settings
|
}
|
exec(self.scripts, global_dict)
|
compute_result = global_dict["compute_result"]
|
if compute_result[0]:
|
if code in sv.成交代码:
|
return
|
# 可以下单
|
# 判断是否可以买
|
for b in compute_result[3]:
|
DealCodesManager().place_order(b, code)
|
async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
|
|
|
# 当前的低吸策略对象
|
low_suction_strtegy = None
|