"""
|
买入策略数据管理器
|
"""
|
import decimal
|
|
from log_module import async_log_util, log_export
|
from log_module.log import logger_trade
|
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
|
from trade.l2_transaction_data_manager import HuaXinBuyOrderManager
|
from utils import tool, l2_huaxin_util
|
|
# 涨停买入策略
|
STRATEGY_TYPE_LIMIT_UP = 1
|
# 涨幅高买入策略
|
STRATEGY_TYPE_RISE_HIGH = 2
|
# 涨幅高且有板块的买入策略
|
STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS = 3
|
|
|
class BuyStrategyDataManager:
|
__latest_transaction_price_dict = {}
|
# 涨停过的代码集合
|
__limit_up_record_codes = set()
|
"""
|
买入策略管理
|
"""
|
|
def __init__(self):
|
# 涨停价管理
|
self.__limit_up_price_dict = {}
|
self.__pre_close_price_dict = {} # 昨日收盘价
|
self.__latest_trade_price_dict = {}
|
self.__latest_market_info_dict = {}
|
|
def set_pre_close_price(self, code, price):
|
self.__pre_close_price_dict[code] = price
|
limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{tool.get_limit_up_rate(code)}"))
|
self.__limit_up_price_dict[code] = round(float(limit_up_price), 2)
|
|
def add_market_info(self, info):
|
# TODO 加入时间
|
"""
|
添加市场信息
|
:param info: (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 委托买入总量, 委托卖出总量, 昨日收盘价, 时间)
|
:return:
|
"""
|
code = info[0]
|
# 设置昨日收盘价格
|
if code not in self.__limit_up_price_dict or code not in self.__pre_close_price_dict:
|
self.set_pre_close_price(code, info[8])
|
self.__latest_market_info_dict[info[0]] = info
|
|
def add_transaction_info(self, info, backtest=False):
|
|
"""
|
添加成交信息
|
:param backtest: 是否是回测
|
:param info: 示例-{'SecurityID': '002842', 'TradePrice': 6.66, 'TradeVolume': 1100, 'OrderTime': 95436390,
|
'MainSeq': 2011, 'SubSeq': 8162612, 'BuyNo': 8162610, 'SellNo': 8159424, 'ExecType': '1'}
|
:return: 返回是否下单
|
"""
|
if not backtest:
|
HuaXinBuyOrderManager.statistic_big_buy_data(info['SecurityID'],
|
[(info['SecurityID'], info['TradePrice'], info['TradeVolume'],
|
info['OrderTime'], info['MainSeq'], info['SubSeq'],
|
info['BuyNo'],
|
info['SellNo'], info['ExecType'])])
|
if int(l2_huaxin_util.convert_time(info['OrderTime']).replace(":", "")) < int("093000"):
|
return False, STRATEGY_TYPE_LIMIT_UP, "09:30之前不下单"
|
|
code = info["SecurityID"]
|
try:
|
results = self.__can_place_order(code, info, backtest)
|
return results
|
finally:
|
self.__latest_trade_price_dict[code] = info["TradePrice"]
|
|
def __can_place_order(self, code, info, backtest):
|
"""
|
是否可以下单
|
:param code: 代码
|
:param transaction_info: 成交信息
|
:param backtest: 是否是回测模式
|
:return: 返回是否每种策略是否可以下单
|
"""
|
|
def can_buy_for_stategy_1():
|
# 获取2秒内吃的档数
|
rised_price = self.__get_rised_price(code, info)
|
if rised_price < 0.1:
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, "2S内涨幅小于10档"
|
pre_close_price = self.__pre_close_price_dict.get(code)
|
if pre_close_price is None:
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, "没有获取到收盘价"
|
rate = (info["TradePrice"] - pre_close_price) / pre_close_price
|
if rate < 0.07:
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, "涨幅小于7%"
|
if code.find("30") == 0 and rate >= 0.19:
|
return True, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, f"2S内上升{int(rised_price * 100)}档 涨幅({rate})>19%"
|
if code.find("30") != 0 and rate >= 0.09:
|
return True, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, f"2S内上升{int(rised_price * 100)}档 涨幅({rate})>9%"
|
return False, STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS, f"不满足买入条件"
|
|
def can_buy_for_stategy_2():
|
if code.find("30") == 0 and False:
|
# 30开始的9.8以上的算涨停
|
markekt = self.__latest_market_info_dict.get(info["SecurityID"])
|
if markekt and markekt[2] >= 0.098 and self.__latest_trade_price_dict.get(code):
|
# 上一次涨幅小于0.098
|
if (self.__latest_trade_price_dict.get(code) - markekt[8]) / markekt[8] < 0.098:
|
return True, STRATEGY_TYPE_RISE_HIGH, f"创业板涨幅超过9.8"
|
return False, STRATEGY_TYPE_RISE_HIGH, "不满足买入条件"
|
|
def can_buy_for_stategy_3():
|
is_limit_up = info["TradePrice"] == self.__limit_up_price_dict.get(code)
|
try:
|
limit_up_price = self.__limit_up_price_dict.get(code)
|
if is_limit_up:
|
# 当前为涨停价
|
if code not in self.__latest_trade_price_dict:
|
# 开1
|
# 查询买1金额
|
markekt = self.__latest_market_info_dict.get(info["SecurityID"])
|
if markekt and markekt[3] * markekt[4] >= 1e8:
|
async_log_util.info(logger_trade, f"{code}:买1({markekt[3] * markekt[4]})超过1亿")
|
return True, STRATEGY_TYPE_LIMIT_UP, "买1超过1亿,开1可买"
|
else:
|
return False, STRATEGY_TYPE_LIMIT_UP, "开1"
|
else:
|
if self.__latest_trade_price_dict.get(
|
code) != limit_up_price and code not in self.__limit_up_record_codes:
|
# 初次涨停之前那一次不是涨停价
|
return True, STRATEGY_TYPE_LIMIT_UP, "涨停"
|
finally:
|
if is_limit_up:
|
# 加入涨停代码列表
|
self.__limit_up_record_codes.add(code)
|
return False, STRATEGY_TYPE_LIMIT_UP, "不满足买入条件"
|
|
return [can_buy_for_stategy_1(), can_buy_for_stategy_2(), can_buy_for_stategy_3()]
|
|
def __get_rised_price(self, code, transaction_info, time_space=2000):
|
"""
|
获取指定时间内股价上升的价格
|
:param code:
|
:param transaction_info:
|
:param time_space:
|
:return:
|
"""
|
if code not in self.__latest_transaction_price_dict:
|
self.__latest_transaction_price_dict[code] = []
|
|
if not self.__latest_transaction_price_dict[code] or self.__latest_transaction_price_dict[code][-1][0] != \
|
transaction_info["OrderTime"]:
|
self.__latest_transaction_price_dict[code].append(
|
(transaction_info["TradePrice"], transaction_info["OrderTime"]))
|
# 删除1s之前的数据
|
while True:
|
end_time, start_time = self.__latest_transaction_price_dict[code][-1][1], \
|
self.__latest_transaction_price_dict[code][0][1]
|
if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
|
l2_huaxin_util.convert_time(start_time, with_ms=True)) <= time_space:
|
break
|
else:
|
# 删除第一个元素
|
del self.__latest_transaction_price_dict[code][0]
|
return self.__latest_transaction_price_dict[code][-1][0] - self.__latest_transaction_price_dict[code][0][0]
|
|
def process(self, underlying_code, underlying_transaction_info, underlying_market_info, cb_code, cb_market_info):
|
"""
|
处理数据
|
:param underlying_code: 正股代码
|
:param underlying_transaction_info:正股成交信息
|
:param underlying_market_info: 正股行情
|
:param cb_code: 可转债代码
|
:param cb_market_info: 可转债行情
|
:return: 是否可以买, 买入信息
|
"""
|
pass
|
# 是否开1
|
limit_up_datas = KPLLimitUpDataRecordManager.latest_origin_datas
|
for item in limit_up_datas:
|
pass
|
|
|
class StrategyBuyOrderRefManager:
|
"""
|
策略买入记录管理
|
"""
|
__instance = None
|
__order_ref_strategy_dict = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(StrategyBuyOrderRefManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
cls.__order_ref_strategy_dict = log_export.load_order_ref_strategy()
|
|
def add(self, order_ref, strategy_type):
|
"""
|
添加数据
|
:param order_ref:
|
:param strategy_type:
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"可转载策略下单结果:({order_ref},{strategy_type})")
|
self.__order_ref_strategy_dict[order_ref] = strategy_type
|
|
def get_strategy_type(self, order_ref):
|
"""
|
获取策略类型
|
:param order_ref:
|
:return:
|
"""
|
return self.__order_ref_strategy_dict.get(order_ref)
|