"""
|
L2数据交易影响因子
|
"""
|
|
# l2交易因子
|
|
from code_attribute import big_money_num_manager, limit_up_time_manager, global_data_loader, gpcode_manager
|
import constant
|
from utils import global_util, tool
|
|
|
# 下单参数
|
|
|
class L2PlaceOrderParamsManager:
|
|
# 获取买入等级描述
|
def get_buy_rank_desc(self):
|
continue_count = self.get_begin_continue_buy_count()
|
time_range = self.get_time_range()
|
m = self.get_m_val()[0]
|
m = m//10000
|
|
desc = ""
|
if self.buy_rank == 0:
|
desc = f"买入信号({continue_count})"
|
elif self.buy_rank == 1:
|
desc = f"买入信号({continue_count})+M值≥{m}万"
|
elif self.buy_rank == 2:
|
desc = f"买入信号({continue_count})+M值≥{m}万+至少含1笔大单"
|
elif self.buy_rank == 3:
|
desc = f"买入信号({continue_count})+M值≥{m}万+至少含2笔大单"
|
else:
|
desc = "常规买入"
|
desc += f"+囊括时间{time_range}s"
|
return desc
|
|
# 获取买入等级
|
# 0:买入信号
|
# 1:买入信号+M1000万
|
# 2:买入信号+M1000万+1笔大单
|
# 3:买入信号+M1000+2笔大单
|
# 100:执行之前固有方案
|
def get_buy_rank(self):
|
if self.score_index == 0:
|
return 0
|
elif self.score_index == 1:
|
return 1
|
elif self.score_index == 2:
|
return 2
|
else:
|
return 100
|
# 暂时不需要次此中策略
|
# # 判断有没有炸开
|
# if code_price_manager.Buy1PriceManager.is_can_buy(self.code):
|
# # 回封
|
# if self.score_index == 0:
|
# return 0
|
# elif self.score_index == 1:
|
# return 1
|
# elif self.score_index == 2:
|
# return 2
|
# else:
|
# return 100
|
# else:
|
# # 首封
|
# if tool.trade_time_sub(self.now_time, "10:30:00") < 0 or tool.trade_time_sub(self.now_time, "14:00:00") > 0:
|
# if self.score_index == 0:
|
# return 1
|
# elif self.score_index == 1:
|
# return 2
|
# elif self.score_index == 2:
|
# return 3
|
# else:
|
# return 100
|
# else:
|
# if self.score_index == 0:
|
# return 2
|
# elif self.score_index == 1:
|
# return 3
|
# elif self.score_index == 2:
|
# return 100
|
# else:
|
# return 100
|
|
# 得分
|
def __init__(self, code, is_first_code, volume_rate, volume_rate_index, score, now_time=tool.get_now_time_str()):
|
self.code = code
|
self.is_first_code = is_first_code
|
score_ranks = [constant.BUY_SCORE_RANK_3, constant.BUY_SCORE_RANK_2, constant.BUY_SCORE_RANK_1,
|
constant.BUY_SCORE_RANK_0]
|
self.score = score[0][0]
|
self.score_info = score
|
# 为分数设置等级
|
score_index = -1
|
for i in range(0, len(score_ranks)):
|
if self.score >= score_ranks[i]:
|
score_index = i
|
break
|
self.score_index = score_index
|
# 只要加入想买单的,全部执行主动买入二星方案
|
if gpcode_manager.WantBuyCodesManager().is_in_cache(code):
|
self.score_index = 1
|
self.is_want_buy = True
|
else:
|
self.is_want_buy = False
|
# 没有加入想买单的,2星/3星将为1星
|
if self.score_index == 0 or self.score_index == 1:
|
self.score_index = 2
|
|
self.volume_rate = volume_rate
|
self.volume_rate_index = volume_rate_index
|
self.now_time = now_time
|
self.buy_rank = self.get_buy_rank()
|
|
# 设置分数
|
def set_score(self, score):
|
score_ranks = [constant.BUY_SCORE_RANK_3, constant.BUY_SCORE_RANK_2, constant.BUY_SCORE_RANK_1,
|
constant.BUY_SCORE_RANK_0]
|
self.score = score[0][0]
|
self.score_info = score
|
# 为分数设置等级
|
score_index = -1
|
for i in range(0, len(score_ranks)):
|
if self.score >= score_ranks[i]:
|
score_index = i
|
break
|
self.score_index = score_index
|
|
# 获取信号连续买笔数
|
|
def get_begin_continue_buy_count(self):
|
counts = [3, 3, 3, 2, 2, 2, 2]
|
volume_rate_index = self.volume_rate_index
|
if self.volume_rate_index >= len(counts):
|
volume_rate_index = -1
|
return counts[volume_rate_index]
|
|
# 获取时间计算范围,返回s
|
def get_time_range(self):
|
# ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)]
|
ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)]
|
# 暂时去除分的影响
|
# if -1 < self.score_index < 3:
|
# return ts[0]
|
volume_rate_index = self.volume_rate_index
|
if self.volume_rate_index >= len(ts):
|
volume_rate_index = -1
|
return ts[volume_rate_index]
|
|
# 获取需要的大单个数
|
def get_big_num_count(self):
|
# if self.is_first_code:
|
# if self.buy_rank < 2:
|
# return 0
|
# elif self.buy_rank == 2:
|
# return 1
|
# elif self.buy_rank == 3:
|
# return 2
|
counts = [2, 1, 1, 1, 0, 0, 0, 0]
|
volume_rate_index = self.volume_rate_index
|
if self.volume_rate_index >= len(counts):
|
volume_rate_index = -1
|
return counts[volume_rate_index]
|
|
# 获取安全笔数影响比例
|
def get_safe_count_rate(self):
|
rates = [0, -0.1, -0.2, -0.3, -0.4, -0.5, -0.6, -0.7]
|
volume_rate_index = self.volume_rate_index
|
if self.volume_rate_index >= len(rates):
|
volume_rate_index = -1
|
return rates[volume_rate_index]
|
|
def get_safe_count(self):
|
# if self.is_first_code:
|
# if self.buy_rank < 4:
|
# return 2
|
base_count, min_count, max_count = L2TradeFactorUtil.get_safe_buy_count(self.code, True)
|
rate = self.get_safe_count_rate()
|
count = int(round(base_count * (1 + rate)))
|
return count
|
|
# 获取m值影响比例
|
@classmethod
|
def get_m_val_rate(cls, volume_rate_index):
|
rates = [0.0, -0.1, -0.2, -0.3, -0.4, -0.5, -0.6, -0.7]
|
if volume_rate_index >= len(rates):
|
volume_rate_index = -1
|
return rates[volume_rate_index]
|
|
# 获取m值
|
def get_m_val(self):
|
# 获取固定m值
|
zyltgb = global_util.zyltgb_map.get(self.code)
|
if zyltgb is None:
|
global_data_loader.load_zyltgb()
|
zyltgb = global_util.zyltgb_map.get(self.code)
|
|
# if self.is_first_code:
|
# if self.buy_rank == 0:
|
# return 0, ""
|
# elif self.is_want_buy and zyltgb and zyltgb < 20 * 10000 * 10000:
|
# # 小于20亿的想买单
|
# return 500 * 10000, ""
|
# elif self.buy_rank < 4:
|
# return 1000 * 10000, ""
|
|
base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb)
|
rate = self.get_m_val_rate(self.volume_rate_index)
|
m = round(base_m * (1 + rate))
|
return m, ""
|
|
|
# S撤参数
|
class SCancelParamsManager:
|
# 剔除前几的点火大单
|
@staticmethod
|
def get_max_exclude_count(volume_rate_index):
|
counts = [5, 4, 3, 2, 1, 1, 1, 1]
|
if volume_rate_index >= len(counts):
|
volume_rate_index = -1
|
return counts[volume_rate_index]
|
|
# 获取买时间范围(距离执行位),返回s
|
@staticmethod
|
def get_buy_time_range(volume_rate_index):
|
# rates = [0.2, 0.4, 0.6, 0.8, 1, 1.2, 1.4, 1.6]
|
seconds = [pow(2, 1), pow(2, 2), pow(2, 3), pow(2, 4), pow(2, 5), pow(2, 5), pow(2, 4), pow(2, 4)]
|
if volume_rate_index >= len(seconds):
|
volume_rate_index = -1
|
return seconds[volume_rate_index]
|
|
# 获取撤销比例
|
@staticmethod
|
def get_cancel_rate(volume_rate_index):
|
rates = [0.34, 0.44, 0.54, 0.64, 0.74, 0.84, 0.94, 1.04]
|
if volume_rate_index >= len(rates):
|
volume_rate_index = -1
|
return rates[volume_rate_index]
|
|
|
# H撤参数
|
class HCancelParamsManager:
|
|
# 获取最大的观察笔数
|
@staticmethod
|
def get_max_watch_count(volume_rate_index):
|
counts = [40, 36, 32, 28, 24, 20, 16, 12]
|
if volume_rate_index >= len(counts):
|
volume_rate_index = -1
|
return counts[volume_rate_index]
|
|
# 获取撤销比例
|
@staticmethod
|
def get_cancel_rate(volume_rate_index):
|
rates = [0.35, 0.45, 0.55, 0.65, 0.65, 0.75, 0.75, 0.75]
|
if volume_rate_index >= len(rates):
|
volume_rate_index = -1
|
return rates[volume_rate_index]
|
|
|
class L2TradeFactorUtil:
|
# 获取基础m值,返回单位为元
|
@classmethod
|
def get_base_safe_val(cls, zyltgb):
|
if zyltgb is None:
|
return 10000000
|
yi = round(zyltgb / 100000000)
|
if yi < 1:
|
yi = 1
|
m = 5000000 + (yi - 1) * 500000
|
return round(m * (1 - 0.3))
|
|
# 获取行业影响比例
|
# total_limit_percent为统计的比例之和乘以100
|
@classmethod
|
def get_industry_rate(cls, total_limit_percent):
|
if total_limit_percent is None:
|
return 0
|
t = total_limit_percent / 10
|
if t < 0.9:
|
return 0
|
elif t <= 1.1:
|
return 0.7
|
elif t <= 1.6:
|
return 0.4
|
else:
|
rate = 0
|
for i in range(0, 30):
|
if t <= 2.1 + 0.5 * i:
|
rate = 0.18 + 0.08 * i
|
break
|
if rate > 0.9:
|
rate = 0.9
|
return round(rate, 4)
|
|
# 获取量影响比例
|
@classmethod
|
def get_volumn_rate(cls, day60_max, yest, today):
|
if day60_max is None:
|
return 0
|
if yest is None:
|
return 0
|
if today is None:
|
return 0
|
old_volumn = int(yest)
|
if int(day60_max[0]) > int(yest):
|
old_volumn = int(day60_max[0])
|
r = round(int(today) / old_volumn, 2)
|
if r < 0.01:
|
r = 0.01
|
print("比例:", r)
|
rate = 0
|
if r <= 0.5:
|
rate = 0.6 - (r - 0.01) * 2
|
elif r <= 0.85:
|
rate = -0.38 + (r - 0.5) * 2.8
|
elif r <= 1.15:
|
rate = 0.6 - (r - 0.85) * 4
|
else:
|
rate = -0.6
|
return round(rate, 4)
|
|
@classmethod
|
def get_volumn_rate_by_code(cls, code):
|
volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
|
rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
|
return rate
|
|
# 当前股票首次涨停时间的影响比例
|
@classmethod
|
def get_limit_up_time_rate(cls, time_str):
|
times = time_str.split(":")
|
start_m = 9 * 60 + 30
|
m = int(times[0]) * 60 + int(times[1])
|
dif = m - start_m
|
base_rate = 0.3
|
rate = 0
|
if dif < 1:
|
rate = base_rate
|
elif dif <= 120:
|
# 11:30之前
|
rate = base_rate - dif * 0.0035
|
else:
|
rate = base_rate - (dif - 89) * 0.0035
|
if rate < -0.3020:
|
rate = -0.3020
|
return round(rate, 4)
|
|
# 纯万手哥影响值(手数》=9000 OR 金额》=500w)
|
@classmethod
|
def get_big_money_rate(cls, num):
|
if num < 4:
|
return 0
|
rate = (num - 4) * 0.035 / 4 + 0.06
|
if rate > 0.9:
|
rate = 0.9
|
return round(rate, 4)
|
|
@classmethod
|
def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
|
limit_up_time, big_money_num):
|
|
# 行业涨停影响比例
|
industry_rate = 0
|
if total_industry_limit_percent is not None:
|
industry_rate = cls.get_industry_rate(total_industry_limit_percent)
|
# 量影响比例
|
volumn_rate = 0
|
if volumn_day60_max is not None and volumn_yest is not None and volumn_today is not None:
|
volumn_rate = cls.get_volumn_rate(int(volumn_day60_max), int(volumn_yest), int(volumn_today))
|
# 涨停时间影响比例
|
limit_up_time_rate = 0
|
if limit_up_time is not None:
|
limit_up_time_rate = cls.get_limit_up_time_rate(limit_up_time)
|
# 万手哥影响
|
big_money_rate = 0
|
if big_money_num is not None:
|
big_money_rate = cls.get_big_money_rate(big_money_num)
|
|
msg = "zyltgb:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(zyltgb,
|
industry_rate,
|
volumn_rate,
|
limit_up_time_rate,
|
big_money_rate)
|
|
final_rate = round(1 - (industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4)
|
if final_rate < 0.1:
|
final_rate = 0.1
|
return final_rate, msg
|
|
@classmethod
|
def compute_rate_by_code(cls, code):
|
factors = cls.__get_rate_factors(code)
|
return cls.compute_rate(factors[0], factors[1], factors[2], factors[3], factors[4], factors[5], factors[6])
|
|
# 获取代码当前所在的行业热度
|
@classmethod
|
def __get_industry_limit_percent(cls, code):
|
# 获取行业热度
|
industry = global_util.code_industry_map.get(code)
|
if industry is None:
|
global_data_loader.load_industry()
|
industry = global_util.code_industry_map.get(code)
|
|
total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None
|
# 当前票是否涨停
|
# if total_industry_limit_percent is not None:
|
# if code in global_util.limit_up_codes_percent:
|
# # 减去当前票的涨幅
|
# total_industry_limit_percent -= global_util.limit_up_codes_percent[code]
|
return total_industry_limit_percent
|
|
# 获取量
|
@classmethod
|
def __get_volumns(cls, code):
|
volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
|
code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
|
if volumn_day60_max is None or volumn_yest is None:
|
global_data_loader.load_volumn()
|
volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
|
code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
|
return volumn_day60_max, volumn_yest, volumn_today
|
|
@classmethod
|
def __get_rate_factors(cls, code):
|
zyltgb = global_util.zyltgb_map.get(code)
|
total_industry_limit_percent = cls.__get_industry_limit_percent(code)
|
# 获取量
|
volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
|
# 首次涨停时间
|
limit_up_time = global_util.limit_up_time.get(code)
|
if limit_up_time is None:
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
|
big_money_num = global_util.big_money_num.get(code)
|
if big_money_num is None:
|
big_money_num = big_money_num_manager.get_num(code)
|
return (
|
zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time,
|
big_money_num)
|
|
@classmethod
|
def factors_to_string(cls, code):
|
vals = cls.__get_rate_factors(code)
|
return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals
|
|
@classmethod
|
def get_zyltgb(cls, code):
|
zyltgb = global_util.zyltgb_map.get(code)
|
if zyltgb is None:
|
global_data_loader.load_zyltgb()
|
zyltgb = global_util.zyltgb_map.get(code)
|
return zyltgb
|
|
@classmethod
|
def compute_m_value(cls, code, volume_rate):
|
zyltgb = global_util.zyltgb_map.get(code)
|
if zyltgb is None:
|
global_data_loader.load_zyltgb()
|
zyltgb = global_util.zyltgb_map.get(code)
|
if zyltgb is None:
|
print("没有获取到自由流通市值")
|
return 10000000, ""
|
zyltgb = cls.get_base_safe_val(zyltgb)
|
# 暂时注释
|
# rate, msg = cls.compute_rate_by_code(code)
|
# print("m值获取:", code, round(zyltgb * rate))
|
rate = L2PlaceOrderParamsManager.get_m_val_rate(volume_rate)
|
|
return round(zyltgb * (1 + rate)), ""
|
|
# 获取安全笔数
|
@classmethod
|
def get_safe_buy_count(cls, code, is_first=False):
|
gb = cls.get_zyltgb(code)
|
return cls.get_safe_buy_count_by_gp(gb, is_first)
|
|
@classmethod
|
def get_safe_buy_count_by_gp(cls, gb, is_first=False):
|
MIN_VAL = 8
|
MAX_VAL = 16
|
if is_first:
|
MIN_VAL = 5
|
MAX_VAL = 13
|
if not gb:
|
# 默认8笔
|
return MIN_VAL, MIN_VAL, MAX_VAL
|
count = gb // 100000000
|
if True:
|
if count < 8:
|
count = MIN_VAL
|
elif count < 11:
|
count = MIN_VAL + 1
|
elif count < 14:
|
count = MIN_VAL + 2
|
elif count < 17:
|
count = MIN_VAL + 3
|
elif count < 20:
|
count = MIN_VAL + 4
|
elif count < 23:
|
count = MIN_VAL + 5
|
elif count < 26:
|
count = MIN_VAL + 6
|
elif count < 29:
|
count = MIN_VAL + 7
|
else:
|
count = MAX_VAL
|
if count < MIN_VAL:
|
count = MIN_VAL
|
elif count > MAX_VAL:
|
count = MAX_VAL
|
return count, MIN_VAL, MAX_VAL
|
|
|
# l2因子归因数据
|
class L2TradeFactorSourceDataUtil:
|
# 是否为大单
|
@classmethod
|
def is_big_money(cls, data):
|
if int(data["val"]["limitPrice"]) != 1:
|
return False
|
|
if int(data["val"]["num"]) >= 9000:
|
return True
|
money = round(float(data["val"]["price"]) * int(data["val"]["num"]) * 100)
|
if money >= 5000000:
|
return True
|
return False
|
|
|
if __name__ == "__main__":
|
# print(L2TradeFactorUtil.get_safe_buy_count("003005"))
|
# print(L2TradeFactorUtil.get_rate_factors("003004"))
|
# print(L2TradeFactorUtil.factors_to_string("003004"))
|
for i in range(2, 50):
|
print(i, L2TradeFactorUtil.get_base_safe_val(100000000 * i))
|
# print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
|
# print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
|
# print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00"))
|
# print(L2TradeFactorUtil.get_limit_up_time_rate("13:53:23"))
|
# print(L2TradeFactorUtil.get_limit_up_time_rate("14:23:23"))
|
|
# print(L2TradeFactorUtil.get_big_money_rate(2))
|
# print(L2TradeFactorUtil.get_big_money_rate(3))
|