""" 策略管理 """ import json from code_attribute import gpcode_manager, code_nature_analyse from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from log_module import async_log_util from log_module.log import logger_trade from strategy.data_analyzer import KPLLimitUpDataAnalyzer from strategy.low_suction_strategy import LowSuctionOriginDataExportManager from strategy.strategy_params_settings import StrategyParamsSettingsManager from strategy.strategy_variable import StockVariables from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory import constant from third_data import kpl_util from trade.trade_manager import DealCodesManager from utils import huaxin_util, tool @tool.singleton class TickSummaryDataManager: """ Tick概要数据 """ __db = 13 # 成交代码的订单信息:{代码:{交易id:(量,价格,系统订单号)}} def __init__(self): # 开盘价信息 self.open_price_info_dict = {} # 最低价 self.low_price_dict = {} # 最高价 self.high_price_info_dict = {} self.musql = Mysqldb() self.redis_manager = redis_manager.RedisManager(self.__db) self.__load_data() def __get_redis(self): return self.redis_manager.getRedis() def __load_data(self): keys = RedisUtils.keys(self.__get_redis(), "tick_open_price_info-*") if keys: for k in keys: code = k.split("-")[1] val = RedisUtils.get(self.__get_redis(), k) if val: self.open_price_info_dict[code] = json.loads(val) keys = RedisUtils.keys(self.__get_redis(), "tick_high_price-*") if keys: for k in keys: code = k.split("-")[1] val = RedisUtils.get(self.__get_redis(), k) if val: self.high_price_info_dict[code] = json.loads(val) keys = RedisUtils.keys(self.__get_redis(), "tick_low_price-*") if keys: for k in keys: code = k.split("-")[1] val = RedisUtils.get(self.__get_redis(), k) if val: self.low_price_dict[code] = round(float(val)) def set_open_price_info(self, code, info): """ 设置开盘信息 @param code: @param info: @return: """ self.open_price_info_dict[code] = info RedisUtils.setex_async(self.__db, f"tick_open_price_info-{code}", tool.get_expire(), json.dumps(info)) def set_high_price_info(self, code, info): """ 设置最高价 @param code: @param info:(价格, 时间) @return: """ self.high_price_info_dict[code] = info RedisUtils.setex_async(self.__db, f"tick_high_price-{code}", tool.get_expire(), json.dumps(info)) def set_low_price(self, code, price): """ 设置最低价 @param code: @param price: @return: """ self.low_price_dict[code] = price RedisUtils.setex_async(self.__db, f"tick_low_price-{code}", tool.get_expire(), price) class LowSuctionStrategy: """ 低吸策略 """ def __init__(self, day, script_name="strategy_script_v6.py", settings=StrategyParamsSettingsManager().get_settings(), need_load_data = False): self.now_day = day # 买大单:{代码:[大单数据]} self.big_order_buy = {} # 卖大单 self.big_order_sell = {} # 自由流通量 self.zylt_volume_dict = {} # 历史日K数据 self.kline_data = {} # 历史涨停数据 self.limit_up_record_data = {} # 历史数据 self.timeline_data = {} # 今日数据 self.current_data = {} # 目标代码的板块 self.code_plates_for_buy = {} # 代码的板块(常规) self.code_plates = {} # 下一个交易日 self.next_trade_day = None # 目标代码 self.fcodes = set() # 变量对象 self.stock_variables_dict = {} # 当前涨停列表 self.current_limit_up_list = [] # 当前涨停的板块所包含的代码 self.current_limit_up_plate_codes = {} # 当前板块净流入 self.current_block_in_datas = [] # 加载策略脚本文件 with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r', encoding='utf-8') as f: lines = f.readlines() scripts = "\n".join(lines) # 注释掉里面的import与变量 scripts = scripts.replace("from ", "#from ").replace("sv = ", "#sv = ").replace("settings = ", "#settings = ").replace( "target_code = ", "#target_code = ") self.scripts = scripts self.settings = settings self.data_loader = DataLoader(self.now_day, cache_path=f"{constant.get_path_prefix()}/datas") self.__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(self.now_day) if need_load_data: self.load_data() def load_data(self): # 加载历史数据 self.__load_before_date_data_by_timeline() self.__load_current_date_data_by_timeline() self.fcodes = set(self.code_plates_for_buy.keys()) & set(self.kline_data.keys()) def __load_before_date_data_by_timeline(self): """ 加载回测日期之前的K线数据与历史涨停数据 :return: 按时间排序的数据列表 """ trade_days = self.data_loader.trade_days # 加载历史数据 self.kline_data = self.data_loader.load_kline_data() self.limit_up_record_data = self.data_loader.load_limit_up_data() self.next_trade_day = self.data_loader.load_next_trade_day() if not trade_days: raise Exception("交易日历获取失败") if not self.kline_data: raise Exception("历史日K获取失败") if not self.limit_up_record_data: raise Exception("历史涨停获取失败") def __load_current_date_data_by_timeline(self): """ 加载回测日期当天的数据,将这些数据根据秒切片 :param day: 日期,格式为"YYYY-MM-DD :return: 按时间排序的数据列表 """ IS_BY_BIG_ORDER = False BIG_ORDER_MONEY_THRESHOLD = 200e4 big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal(BIG_ORDER_MONEY_THRESHOLD) if not big_order_deals or IS_BY_BIG_ORDER: big_order_deals = self.__LowSuctionOriginDataExportManager.export_big_order_deal_by( BIG_ORDER_MONEY_THRESHOLD) self.big_order_buy = big_order_deals # 转换格式为:{时间: [("代码", (买单号, 量, 金额, 时间, 最终成交价))] big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal( BIG_ORDER_MONEY_THRESHOLD) if not big_sell_order_deals or IS_BY_BIG_ORDER: big_sell_order_deals = self.__LowSuctionOriginDataExportManager.export_big_sell_order_deal_by( BIG_ORDER_MONEY_THRESHOLD) self.big_order_sell = big_sell_order_deals # 加载自由流通量 self.zylt_volume_dict = self.__LowSuctionOriginDataExportManager.export_zylt_volume() # 加载板块代码 code_plates_dict = self.__LowSuctionOriginDataExportManager.export_code_plates() plate_codes = self.data_loader.load_target_plate_and_codes() code_plates_dict_for_buy = {} for p in plate_codes: for code in plate_codes.get(p): if code not in code_plates_dict_for_buy: code_plates_dict_for_buy[code] = set() code_plates_dict_for_buy[code].add(p) self.code_plates_for_buy = code_plates_dict_for_buy self.code_plates = code_plates_dict if not self.zylt_volume_dict: raise Exception("无自由流通数据") if not self.code_plates: raise Exception("无板块数据") if not self.code_plates_for_buy: raise Exception("无目标票的买入数据") def __init_stock_variables(self, code_): """ 初始化变量 @param code_: @return: """ if code_ in self.stock_variables_dict: return stock_variables = StrategyVariableFactory.create_from_history_data( self.kline_data.get(code_), None, self.limit_up_record_data.get(code_), self.data_loader.trade_days) # 加载今日涨停价 pre_close = self.kline_data.get(code_)[0]["close"] stock_variables.今日涨停价 = round(float(gpcode_manager.get_limit_up_price_by_preprice(code_, pre_close)), 2) stock_variables.自由流通市值 = self.zylt_volume_dict.get(code_) * pre_close # 获取代码板块 stock_variables.代码板块 = self.code_plates_for_buy.get(code_) is_price_too_high = code_nature_analyse.is_price_too_high_in_days(code_, self.kline_data.get(code_), stock_variables.今日涨停价) stock_variables.六个交易日涨幅过高 = is_price_too_high[0] stock_variables.辨识度代码 = self.fcodes for day in [2, 5, 10, 30, 60, 120]: days = self.data_loader.trade_days[:day] stock_variables.__setattr__(f"日出现的板块_{day}", KPLLimitUpDataAnalyzer.get_limit_up_reasons( self.limit_up_record_data, min_day=days[-1], max_day=days[0])) stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons( self.limit_up_record_data, self.data_loader.trade_days[:2]) # 加载Tick信息 open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_) if open_price_info: stock_variables.今日开盘价 = open_price_info[0] stock_variables.今日开盘涨幅 = open_price_info[1] high_price_info = TickSummaryDataManager().high_price_info_dict.get(code_) if high_price_info: stock_variables.今日最高价信息 = high_price_info low_price = TickSummaryDataManager().low_price_dict.get(code_) if low_price: stock_variables.今日最低价 = low_price self.stock_variables_dict[code_] = stock_variables def add_big_orders(self, big_orders): """ 添加大单 @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])] @return: """ codes = [] for d in big_orders: code = d[0] if d[1] == 0: # 买单 if code not in self.big_order_buy: self.big_order_buy[code] = [] self.big_order_buy[code].append(d[2]) else: # 卖单 if code not in self.big_order_sell: self.big_order_sell[code] = [] self.big_order_sell[code].append(d[2]) if code not in codes: codes.append(code) # 驱动下单 for code in codes: self.__run(code, self.stock_variables_dict.get(code)) def add_ticks(self, ticks): """ 添加tick数据 @param ticks: @return: """ fticks = [tick for tick in ticks if tick[0] in self.fcodes] # 初始化对象 for tick in fticks: code = tick[0] self.__init_stock_variables(code) stock_variables = self.stock_variables_dict.get(code) # (代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5) code, time_str, price, cum_volume, cum_amount, buy_5 = tick[0], huaxin_util.convert_time(tick[1]), tick[2], \ tick[ 3], tick[4], tick[5] # 计算tick数据的值 if time_str < '09:30:00': if price > 0: stock_variables.今日开盘价 = price else: stock_variables.今日开盘价 = buy_5[0][0] # 今日开盘涨幅 stock_variables.今日开盘涨幅 = round((stock_variables.今日开盘价 - stock_variables.昨日收盘价) / stock_variables.昨日收盘价, 4) TickSummaryDataManager().set_open_price_info(code, (stock_variables.今日开盘价, stock_variables.今日开盘涨幅)) stock_variables.今日成交量 = cum_volume stock_variables.今日成交额 = cum_amount stock_variables.当前价 = price if not stock_variables.今日最高价信息 or price > stock_variables.今日最高价信息[0]: stock_variables.今日最高价信息 = (price, time_str) TickSummaryDataManager().set_high_price_info(code, stock_variables.今日最高价信息) if not stock_variables.今日最低价 or price < stock_variables.今日最低价: stock_variables.今日最低价 = price TickSummaryDataManager().set_low_price(code, stock_variables.今日最低价) def add_limit_up_list(self, limit_up_list): """ 涨停列表数据 @param limit_up_list: @return: """ self.current_limit_up_list = limit_up_list current_limit_up_list = [x for x in limit_up_list if kpl_util.get_high_level_count(x[4]) < 3] most_real_kpl_plate_limit_up_codes_info = {} current_limit_up_dict = {x[0]: x for x in current_limit_up_list} codes = set(current_limit_up_dict.keys()) for code in codes: plates = self.code_plates.get(code) if not plates: plates = {current_limit_up_dict.get(code)[5]} plates -= constant.KPL_INVALID_BLOCKS if plates: for p in plates: if p not in most_real_kpl_plate_limit_up_codes_info: most_real_kpl_plate_limit_up_codes_info[p] = [] most_real_kpl_plate_limit_up_codes_info[p].append(code) self.current_limit_up_plate_codes = most_real_kpl_plate_limit_up_codes_info def add_block_in(self, block_in_datas): """ 添加板块净流入 @param block_in_datas: @return: """ blocks = [x[0] for x in block_in_datas if x[1] > 0] _block_in_datas = blocks[:20] self.current_block_in_datas = _block_in_datas def __run(self, code, sv: StockVariables): if not sv: return # 运行代码 # 注入大单 sv.今日大单数据 = self.big_order_buy.get(code) sv.今日卖大单数据 = self.big_order_sell.get(code) # 注入板块涨停代码 sv.开盘啦最正板块涨停 = self.current_limit_up_plate_codes # 注入板块流入信息 if self.current_block_in_datas: sv.资金流入板块 = self.current_block_in_datas # 注入已成交代码 place_order_plate_codes = DealCodesManager().get_place_order_plate_codes() sv.板块成交代码 = place_order_plate_codes sv.成交代码 = DealCodesManager().get_deal_codes() global_dict = { "sv": sv, "target_code": code, "settings": self.settings } exec(self.scripts, global_dict) compute_result = global_dict["compute_result"] if compute_result[0]: if code in sv.成交代码: return # 可以下单 # 判断是否可以买 for b in compute_result[3]: DealCodesManager().place_order(b, code) async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}") # 当前的低吸策略对象 low_suction_strtegy = None