"""
|
买入策略数据管理器
|
"""
|
import decimal
|
|
from log_module import async_log_util, log_export
|
from log_module.log import logger_trade
|
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
|
from trade.l2_transaction_data_manager import HuaXinBuyOrderManager
|
from utils import tool, l2_huaxin_util
|
|
# 涨停买入策略
|
STRATEGY_TYPE_LIMIT_UP = 1
|
# 涨幅高买入策略
|
STRATEGY_TYPE_RISE_HIGH = 2
|
# 涨幅高且有板块的买入策略
|
STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS = 3
|
|
|
class BuyStrategyDataManager:
|
__latest_transaction_price_dict = {}
|
# 涨停过的代码集合
|
__limit_up_record_codes = set()
|
"""
|
买入策略管理
|
"""
|
|
def __init__(self):
|
# 涨停价管理
|
self.__limit_up_price_dict = {}
|
self.__pre_close_price_dict = {} # 昨日收盘价
|
self.__latest_trade_price_dict = {}
|
self.__latest_market_info_dict = {}
|
|
def set_pre_close_price(self, code, price):
|
self.__pre_close_price_dict[code] = price
|
limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_up_rate(code)}"))
|
self.__limit_up_price_dict[code] = round(float(limit_up_price), 2)
|
|
def add_market_info(self, info):
|
# TODO 加入时间
|
"""
|
添加市场信息
|
:param info: (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 委托买入总量, 委托卖出总量, 昨日收盘价, 时间)
|
:return:
|
"""
|
code = info[0]
|
# 设置昨日收盘价格
|
if code not in self.__limit_up_price_dict or code not in self.__pre_close_price_dict:
|
self.set_pre_close_price(code, info[8])
|
self.__latest_market_info_dict[info[0]] = info
|
|
def get_latest_market_info(self, code):
|
return self.__latest_market_info_dict.get(code)
|
|
def add_transaction_info(self, info, backtest=False):
|
|
"""
|
添加成交信息
|
:param backtest: 是否是回测
|
:param info: 示例-{'SecurityID': '002842', 'TradePrice': 6.66, 'TradeVolume': 1100, 'OrderTime': 95436390,
|
'MainSeq': 2011, 'SubSeq': 8162612, 'BuyNo': 8162610, 'SellNo': 8159424, 'ExecType': '1'}
|
:return: 返回是否下单
|
"""
|
if not backtest:
|
HuaXinBuyOrderManager.statistic_big_buy_data(info['SecurityID'],
|
[(info['SecurityID'], info['TradePrice'], info['TradeVolume'],
|
info['OrderTime'], info['MainSeq'], info['SubSeq'],
|
info['BuyNo'],
|
info['SellNo'], info['ExecType'])])
|
if int(l2_huaxin_util.convert_time(info['OrderTime']).replace(":", "")) < int("093000"):
|
return False, STRATEGY_TYPE_LIMIT_UP, "09:30之前不下单"
|
|
code = info["SecurityID"]
|
try:
|
results = self.__can_place_order(code, info, backtest)
|
return results
|
finally:
|
self.__latest_trade_price_dict[code] = info["TradePrice"]
|
|
def __can_place_order(self, code, info, backtest):
|
"""
|
是否可以下单
|
:param code: 代码
|
:param transaction_info: 成交信息
|
:param backtest: 是否是回测模式
|
:return: 返回是否每种策略是否可以下单
|
"""
|
|
def can_buy_for_stategy_1():
|
try:
|
limit_up_price = self.__limit_up_price_dict.get(code)
|
threshold_rate_money = round(0.8 * float(limit_up_price) / 100, 2)
|
|
min_price = None
|
pre_close_price = self.__pre_close_price_dict.get(code)
|
if pre_close_price:
|
min_price = round( (1 + (0.07 if tool.is_shsz_code(code) else 0.17))*pre_close_price, 2)
|
# 获取2秒内吃的档数
|
rised_price, msg = self.__get_rised_price(code, info, min_price=min_price)
|
if rised_price < threshold_rate_money:
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, f"2S内涨幅小于{int(threshold_rate_money*100)}档"
|
pre_close_price = self.__pre_close_price_dict.get(code)
|
if pre_close_price is None:
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, "没有获取到收盘价"
|
rate = (info["TradePrice"] - pre_close_price) / pre_close_price
|
return True, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, f"2S内上升{int(rised_price * 100)}档({msg})"
|
except Exception as e:
|
logger_trade.error(f"买入策略1出错:{code} - {str(e)}")
|
|
def can_buy_for_stategy_2():
|
if code.find("30") == 0 and False:
|
# 30开始的9.8以上的算涨停
|
markekt = self.__latest_market_info_dict.get(info["SecurityID"])
|
if markekt and markekt[2] >= 0.098 and self.__latest_trade_price_dict.get(code):
|
# 上一次涨幅小于0.098
|
if (self.__latest_trade_price_dict.get(code) - markekt[8]) / markekt[8] < 0.098:
|
return True, STRATEGY_TYPE_RISE_HIGH, f"创业板涨幅超过9.8"
|
return False, STRATEGY_TYPE_RISE_HIGH, "不满足买入条件"
|
|
def can_buy_for_stategy_3():
|
is_limit_up = info["TradePrice"] == self.__limit_up_price_dict.get(code)
|
try:
|
limit_up_price = self.__limit_up_price_dict.get(code)
|
if is_limit_up:
|
# 当前为涨停价
|
if code not in self.__latest_trade_price_dict:
|
# 开1
|
# 查询买1金额
|
markekt = self.__latest_market_info_dict.get(info["SecurityID"])
|
if markekt and markekt[3] * markekt[4] >= 1e8:
|
async_log_util.info(logger_trade, f"{code}:买1({markekt[3] * markekt[4]})超过1亿")
|
return True, STRATEGY_TYPE_LIMIT_UP, "买1超过1亿,开1可买"
|
else:
|
return False, STRATEGY_TYPE_LIMIT_UP, "开1"
|
else:
|
if self.__latest_trade_price_dict.get(
|
code) != limit_up_price:
|
# 不是首次涨停也返回
|
return True, STRATEGY_TYPE_LIMIT_UP, "涨停"
|
|
finally:
|
if is_limit_up:
|
# 加入涨停代码列表
|
self.__limit_up_record_codes.add(code)
|
return False, STRATEGY_TYPE_LIMIT_UP, "不满足买入条件"
|
|
return [can_buy_for_stategy_1(), can_buy_for_stategy_2(), can_buy_for_stategy_3()]
|
|
def __get_rised_price(self, code, transaction_info, time_space=2000, min_price = None):
|
"""
|
获取指定时间内股价上升的价格
|
:param code:
|
:param transaction_info:
|
:param time_space:
|
:return:
|
"""
|
if code not in self.__latest_transaction_price_dict:
|
self.__latest_transaction_price_dict[code] = []
|
|
if not self.__latest_transaction_price_dict[code] or self.__latest_transaction_price_dict[code][-1][0] != \
|
transaction_info["OrderTime"]:
|
self.__latest_transaction_price_dict[code].append(
|
(transaction_info["TradePrice"], transaction_info["OrderTime"]))
|
# 删除1s之前的数据
|
while True:
|
end_time, start_time = self.__latest_transaction_price_dict[code][-1][1], \
|
self.__latest_transaction_price_dict[code][0][1]
|
if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
|
l2_huaxin_util.convert_time(start_time, with_ms=True)) <= time_space:
|
break
|
else:
|
if self.__latest_transaction_price_dict[code]:
|
# 删除第一个元素
|
self.__latest_transaction_price_dict[code].pop(0)
|
if min_price and self.__latest_transaction_price_dict[code][0][0]< min_price:
|
return 0, f"{self.__latest_transaction_price_dict[code]}"
|
return self.__latest_transaction_price_dict[code][-1][0] - self.__latest_transaction_price_dict[code][0][0], f"{self.__latest_transaction_price_dict[code]}"
|
|
|
class StrategyBuyOrderRefManager:
|
"""
|
策略买入记录管理
|
"""
|
__instance = None
|
__order_ref_strategy_dict = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(StrategyBuyOrderRefManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
cls.__order_ref_strategy_dict = log_export.load_order_ref_strategy()
|
|
def add(self, order_ref, strategy_type):
|
"""
|
添加数据
|
:param order_ref:
|
:param strategy_type:
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"可转载策略下单结果:({order_ref},{strategy_type})")
|
self.__order_ref_strategy_dict[order_ref] = strategy_type
|
|
def get_strategy_type(self, order_ref):
|
"""
|
获取策略类型
|
:param order_ref:
|
:return:
|
"""
|
return self.__order_ref_strategy_dict.get(order_ref)
|