""" L2数据交易影响因子 """ # l2交易因子 from code_attribute import big_money_num_manager, limit_up_time_manager, global_data_loader, gpcode_manager import constant from trade import trade_manager, deal_big_money_manager, trade_constant from utils import global_util, tool # 下单参数 class L2PlaceOrderParamsManager: # 获取买入等级描述 def get_buy_rank_desc(self): continue_count = self.get_begin_continue_buy_count() time_range = self.get_time_range() m = self.get_m_val()[0] m = m // 10000 desc = "" if self.buy_rank == 0: desc = f"买入信号({continue_count})" elif self.buy_rank == 1: desc = f"买入信号({continue_count})+M值≥{m}万" elif self.buy_rank == 2: desc = f"买入信号({continue_count})+M值≥{m}万+至少含1笔大单" elif self.buy_rank == 3: desc = f"买入信号({continue_count})+M值≥{m}万+至少含2笔大单" else: desc = "常规买入" desc += f"+囊括时间{time_range}s" return desc # 获取买入等级 # 0:买入信号 # 1:买入信号+M1000万 # 2:买入信号+M1000万+1笔大单 # 3:买入信号+M1000+2笔大单 # 100:执行之前固有方案 def get_buy_rank(self): if self.score_index == 0: return 0 elif self.score_index == 1: return 1 elif self.score_index == 2: return 2 else: return 100 # 暂时不需要次此中策略 # # 判断有没有炸开 # if code_price_manager.Buy1PriceManager().is_can_buy(self.code): # # 回封 # if self.score_index == 0: # return 0 # elif self.score_index == 1: # return 1 # elif self.score_index == 2: # return 2 # else: # return 100 # else: # # 首封 # if tool.trade_time_sub(self.now_time, "10:30:00") < 0 or tool.trade_time_sub(self.now_time, "14:00:00") > 0: # if self.score_index == 0: # return 1 # elif self.score_index == 1: # return 2 # elif self.score_index == 2: # return 3 # else: # return 100 # else: # if self.score_index == 0: # return 2 # elif self.score_index == 1: # return 3 # elif self.score_index == 2: # return 100 # else: # return 100 # 得分 def __init__(self, code, is_first_code, volume_rate, volume_rate_index, score, now_time=tool.get_now_time_str()): self.code = code self.is_first_code = is_first_code score_ranks = [constant.BUY_SCORE_RANK_3, constant.BUY_SCORE_RANK_2, constant.BUY_SCORE_RANK_1, constant.BUY_SCORE_RANK_0] # self.score = score[0][0] # self.score_info = score # 为分数设置等级 # score_index = -1 # for i in range(0, len(score_ranks)): # if self.score >= score_ranks[i]: # score_index = i # break # 固定设置为2 self.score_index = 2 # 只要加入想买单的,全部执行主动买入二星方案 if gpcode_manager.WantBuyCodesManager().is_in_cache(code): self.score_index = 1 self.is_want_buy = True else: self.is_want_buy = False # 没有加入想买单的,2星/3星将为1星 if self.score_index == 0 or self.score_index == 1: self.score_index = 2 self.volume_rate = volume_rate self.volume_rate_index = volume_rate_index self.now_time = now_time self.buy_rank = self.get_buy_rank() # 是否是初次下单 @classmethod def is_first_place_order(cls, code): return trade_manager.CodesTradeStateManager().get_trade_state_cache(code) == trade_constant.TRADE_STATE_NOT_TRADE # 设置分数 def set_score(self, score): score_ranks = [constant.BUY_SCORE_RANK_3, constant.BUY_SCORE_RANK_2, constant.BUY_SCORE_RANK_1, constant.BUY_SCORE_RANK_0] self.score = score[0][0] self.score_info = score # 为分数设置等级 score_index = -1 for i in range(0, len(score_ranks)): if self.score >= score_ranks[i]: score_index = i break self.score_index = score_index # 获取信号连续买笔数 def get_begin_continue_buy_count(self): counts = [3, 3, 3, 2, 2, 2, 2] volume_rate_index = self.volume_rate_index if self.volume_rate_index >= len(counts): volume_rate_index = -1 return counts[volume_rate_index] # 获取时间计算范围,返回s def get_time_range(self): # ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)] ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)] # 暂时去除分的影响 # if -1 < self.score_index < 3: # return ts[0] volume_rate_index = self.volume_rate_index if self.volume_rate_index >= len(ts): volume_rate_index = -1 # 首次下单必须同1s if self.is_first_place_order(self.code): return 1 return ts[volume_rate_index] # 获取需要的大单个数 def get_big_num_count(self): # counts = [1, 1, 1, 1, 0, 0, 0, 0] # volume_rate_index = self.volume_rate_index # if self.volume_rate_index >= len(counts): # volume_rate_index = -1 # return counts[volume_rate_index] # 有大单成交就不需要大单了,没有大单成交就需要大单 deal_big_money_count = deal_big_money_manager.get_deal_big_money_count(self.code) if deal_big_money_count > 0: return 0 else: return 1 # 获取安全笔数影响比例 def get_safe_count_rate(self): rates = [0, -0.1, -0.2, -0.3, -0.4, -0.5, -0.6, -0.7] volume_rate_index = self.volume_rate_index if self.volume_rate_index >= len(rates): volume_rate_index = -1 return rates[volume_rate_index] def get_safe_count(self): # if self.is_first_code: # if self.buy_rank < 4: # return 2 base_count, min_count, max_count = L2TradeFactorUtil.get_safe_buy_count(self.code, True) rate = self.get_safe_count_rate() count = int(round(base_count * (1 + rate))) # 最少8笔 return max(count, 8) # 获取m值影响比例 @classmethod def get_m_val_rate(cls, volume_rate_index): rates = [0.0, -0.1, -0.15, -0.2, -0.25, -0.3, -0.35, -0.4] if volume_rate_index >= len(rates): volume_rate_index = -1 return rates[volume_rate_index] @classmethod def get_base_m_val(cls, code): zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() zyltgb = global_util.zyltgb_map.get(code) base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb) return base_m # 获取m值 def get_m_val(self): base_m = self.get_base_m_val(self.code) if self.is_first_place_order(self.code) and tool.is_sz_code(self.code): base_m = int(base_m * 2) rate = self.get_m_val_rate(self.volume_rate_index) m = round(base_m * (1 + rate)) return m, "" # S撤参数 class SCancelParamsManager: # 剔除前几的点火大单 @staticmethod def get_max_exclude_count(volume_rate_index): counts = [5, 4, 3, 2, 1, 1, 1, 1] if volume_rate_index >= len(counts): volume_rate_index = -1 return counts[volume_rate_index] # 获取买时间范围(距离执行位),返回s @staticmethod def get_buy_time_range(volume_rate_index): # rates = [0.2, 0.4, 0.6, 0.8, 1, 1.2, 1.4, 1.6] seconds = [pow(2, 1), pow(2, 2), pow(2, 3), pow(2, 4), pow(2, 5), pow(2, 5), pow(2, 4), pow(2, 4)] if volume_rate_index >= len(seconds): volume_rate_index = -1 return seconds[volume_rate_index] # 获取撤销比例 @staticmethod def get_cancel_rate(volume_rate_index): rates = [0.34, 0.44, 0.54, 0.64, 0.74, 0.84, 0.94, 1.04] if volume_rate_index >= len(rates): volume_rate_index = -1 return 0.79 # rates[volume_rate_index] # H撤参数 class HCancelParamsManager: # 获取最大的观察笔数 @staticmethod def get_max_watch_count(volume_rate_index): counts = [40, 36, 32, 28, 24, 20, 16, 12] if volume_rate_index >= len(counts): volume_rate_index = -1 return counts[volume_rate_index] # 获取撤销比例 @staticmethod def get_cancel_rate(volume_rate_index): # 暂时固定死 # rates = [0.35, 0.45, 0.55, 0.65, 0.65, 0.75, 0.75, 0.75] # if volume_rate_index >= len(rates): # volume_rate_index = -1 # return rates[volume_rate_index] return 0.89 class L2TradeFactorUtil: # 获取基础m值,返回单位为元 @classmethod def get_base_safe_val(cls, zyltgb): if zyltgb is None: return 10000000 yi = round(zyltgb / 100000000) if yi < 1: yi = 1 return int(-0.058 * yi * yi + 60.9 * yi) * 10000 # m = 5000000 + (yi - 1) * 500000 # return round(m * (1 - 0.3)) # 获取行业影响比例 # total_limit_percent为统计的比例之和乘以100 @classmethod def get_industry_rate(cls, total_limit_percent): if total_limit_percent is None: return 0 t = total_limit_percent / 10 if t < 0.9: return 0 elif t <= 1.1: return 0.7 elif t <= 1.6: return 0.4 else: rate = 0 for i in range(0, 30): if t <= 2.1 + 0.5 * i: rate = 0.18 + 0.08 * i break if rate > 0.9: rate = 0.9 return round(rate, 4) # 获取量影响比例 @classmethod def get_volumn_rate(cls, day60_max, yest, today): if day60_max is None: return 0 if yest is None: return 0 if today is None: return 0 old_volumn = int(yest) if int(day60_max[0]) > int(yest): old_volumn = int(day60_max[0]) r = round(int(today) / old_volumn, 2) if r < 0.01: r = 0.01 print("比例:", r) rate = 0 if r <= 0.5: rate = 0.6 - (r - 0.01) * 2 elif r <= 0.85: rate = -0.38 + (r - 0.5) * 2.8 elif r <= 1.15: rate = 0.6 - (r - 0.85) * 4 else: rate = -0.6 return round(rate, 4) @classmethod def get_volumn_rate_by_code(cls, code): volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code) rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today) return rate # 当前股票首次涨停时间的影响比例 @classmethod def get_limit_up_time_rate(cls, time_str): times = time_str.split(":") start_m = 9 * 60 + 30 m = int(times[0]) * 60 + int(times[1]) dif = m - start_m base_rate = 0.3 rate = 0 if dif < 1: rate = base_rate elif dif <= 120: # 11:30之前 rate = base_rate - dif * 0.0035 else: rate = base_rate - (dif - 89) * 0.0035 if rate < -0.3020: rate = -0.3020 return round(rate, 4) # 纯万手哥影响值(手数》=9000 OR 金额》=500w) @classmethod def get_big_money_rate(cls, num): if num < 4: return 0 rate = (num - 4) * 0.035 / 4 + 0.06 if rate > 0.9: rate = 0.9 return round(rate, 4) @classmethod def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time, big_money_num): # 行业涨停影响比例 industry_rate = 0 if total_industry_limit_percent is not None: industry_rate = cls.get_industry_rate(total_industry_limit_percent) # 量影响比例 volumn_rate = 0 if volumn_day60_max is not None and volumn_yest is not None and volumn_today is not None: volumn_rate = cls.get_volumn_rate(int(volumn_day60_max), int(volumn_yest), int(volumn_today)) # 涨停时间影响比例 limit_up_time_rate = 0 if limit_up_time is not None: limit_up_time_rate = cls.get_limit_up_time_rate(limit_up_time) # 万手哥影响 big_money_rate = 0 if big_money_num is not None: big_money_rate = cls.get_big_money_rate(big_money_num) msg = "zyltgb:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}".format(zyltgb, industry_rate, volumn_rate, limit_up_time_rate, big_money_rate) final_rate = round(1 - (industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4) if final_rate < 0.1: final_rate = 0.1 return final_rate, msg @classmethod def compute_rate_by_code(cls, code): factors = cls.__get_rate_factors(code) return cls.compute_rate(factors[0], factors[1], factors[2], factors[3], factors[4], factors[5], factors[6]) # 获取代码当前所在的行业热度 @classmethod def __get_industry_limit_percent(cls, code): # 获取行业热度 industry = global_util.code_industry_map.get(code) if industry is None: global_data_loader.load_industry() industry = global_util.code_industry_map.get(code) total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None # 当前票是否涨停 # if total_industry_limit_percent is not None: # if code in global_util.limit_up_codes_percent: # # 减去当前票的涨幅 # total_industry_limit_percent -= global_util.limit_up_codes_percent[code] return total_industry_limit_percent # 获取量 @classmethod def __get_volumns(cls, code): volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get( code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code) if volumn_day60_max is None or volumn_yest is None: global_data_loader.load_volumn() volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get( code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code) return volumn_day60_max, volumn_yest, volumn_today @classmethod def __get_rate_factors(cls, code): zyltgb = global_util.zyltgb_map.get(code) total_industry_limit_percent = cls.__get_industry_limit_percent(code) # 获取量 volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code) # 首次涨停时间 limit_up_time = global_util.limit_up_time.get(code) if limit_up_time is None: limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code) big_money_num = global_util.big_money_num.get(code) if big_money_num is None: big_money_num = big_money_num_manager.get_num_cache(code) return ( zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time, big_money_num) @classmethod def factors_to_string(cls, code): vals = cls.__get_rate_factors(code) return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals @classmethod def get_zyltgb(cls, code): zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() zyltgb = global_util.zyltgb_map.get(code) return zyltgb @classmethod def compute_m_value(cls, code, volume_rate): zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: print("没有获取到自由流通市值") return 10000000, "" zyltgb = cls.get_base_safe_val(zyltgb) # 暂时注释 # rate, msg = cls.compute_rate_by_code(code) # print("m值获取:", code, round(zyltgb * rate)) rate = L2PlaceOrderParamsManager.get_m_val_rate(volume_rate) return round(zyltgb * (1 + rate)), "" # 获取安全笔数 @classmethod def get_safe_buy_count(cls, code, is_first=False): gb = cls.get_zyltgb(code) return cls.get_safe_buy_count_by_gp(gb, is_first) @classmethod def get_safe_buy_count_by_gp(cls, gb, is_first=False): MIN_VAL = 8 MAX_VAL = 16 if is_first: MIN_VAL = 5 MAX_VAL = 13 if not gb: # 默认8笔 return MIN_VAL, MIN_VAL, MAX_VAL count = gb // 100000000 if True: if count < 8: count = MIN_VAL elif count < 11: count = MIN_VAL + 1 elif count < 14: count = MIN_VAL + 2 elif count < 17: count = MIN_VAL + 3 elif count < 20: count = MIN_VAL + 4 elif count < 23: count = MIN_VAL + 5 elif count < 26: count = MIN_VAL + 6 elif count < 29: count = MIN_VAL + 7 else: count = MAX_VAL if count < MIN_VAL: count = MIN_VAL elif count > MAX_VAL: count = MAX_VAL return count, MIN_VAL, MAX_VAL # l2因子归因数据 class L2TradeFactorSourceDataUtil: # 是否为大单 @classmethod def is_big_money(cls, data): if int(data["val"]["limitPrice"]) != 1: return False if int(data["val"]["num"]) >= 9000: return True money = round(float(data["val"]["price"]) * int(data["val"]["num"]) * 100) if money >= 5000000: return True return False if __name__ == "__main__": # print(L2TradeFactorUtil.get_safe_buy_count("003005")) # print(L2TradeFactorUtil.get_rate_factors("003004")) # print(L2TradeFactorUtil.factors_to_string("003004")) # for i in range(2, 150): print(22, L2TradeFactorUtil.get_base_safe_val(100000000 * 22)) # print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00")) # print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00")) # print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00")) # print(L2TradeFactorUtil.get_limit_up_time_rate("13:53:23")) # print(L2TradeFactorUtil.get_limit_up_time_rate("14:23:23")) # print(L2TradeFactorUtil.get_big_money_rate(2)) # print(L2TradeFactorUtil.get_big_money_rate(3))