import copy import json import time import constant from code_attribute import gpcode_manager, code_volumn_manager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2.code_price_manager import Buy1PriceManager from l2.huaxin import l2_huaxin_util from l2.l2_compute_util import L2DataComputeUtil from l2.l2_data_manager import OrderBeginPosInfo from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, HuaXinSellOrderStatisticManager, BigOrderDealManager from log_module import async_log_util from third_data import code_plate_key_manager from trade.buy_radical import radical_buy_data_manager from utils import tool from l2.transaction_progress import TradeBuyQueue from trade import l2_trade_factor, trade_record_log_util, trade_constant from l2 import l2_log, l2_data_source_util from l2.l2_data_util import L2DataUtil, local_today_datas, local_today_canceled_buyno_map, local_today_buyno_map from log_module.log import logger_l2_s_cancel, logger_debug, logger_l2_l_cancel, logger_l2_h_cancel from utils.tool import CodeDataCacheUtil import l2_data_util class SCancelRateManager: TYPE_S_FAST = 0 TYPE_S_FAST_BIG = 1 TYPE_S_SLOW = 2 @classmethod def get_threshhold_rate(cls, code, cancel_type): # 加红 must_buy = gpcode_manager.MustBuyCodesManager().is_in_cache(code) half_must_buy = False if not must_buy: try: can_buy_result = code_plate_key_manager.CodePlateKeyBuyManager.can_buy(code) if can_buy_result and can_buy_result[0]: if can_buy_result[0][1] <= 1 and can_buy_result[0][2] >= 3: # 炸板率>60%以上就不加半红 if can_buy_result[0][3] <= 0 or can_buy_result[0][2] / can_buy_result[0][3] > 0.4: half_must_buy = True except: pass if cancel_type == cls.TYPE_S_FAST: if must_buy: return constant.S_FAST_RATE_WITH_MUST_BUY, "加红" elif half_must_buy: return round((constant.S_FAST_RATE + constant.S_FAST_RATE_WITH_MUST_BUY) / 2, 2), "半红" else: return constant.S_FAST_RATE, "常规" elif cancel_type == cls.TYPE_S_FAST_BIG: # 重砸 if must_buy: return constant.S_FAST_BIG_RATE_WITH_MUST_BUY, "加红" elif half_must_buy: return round((constant.S_FAST_RATE + constant.S_FAST_BIG_RATE_WITH_MUST_BUY) / 2, 2), "半红" else: return constant.S_FAST_RATE, "常规" elif cancel_type == cls.TYPE_S_SLOW: if must_buy: return constant.S_SLOW_RATE_WITH_MUST_BUY, "加红" elif half_must_buy: return round((constant.S_SLOW_RATE + constant.S_SLOW_RATE_WITH_MUST_BUY) / 2, 2), "半红" else: return constant.S_SLOW_RATE, "常规" return None, "" class SCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __sCancelParamsManager = l2_trade_factor.SCancelParamsManager __s_cancel_real_place_order_index_cache = {} __s_down_watch_indexes_dict = {} __recompute_l_down_time_dict = {} # 最近处理的卖单号 __latest_process_sell_order_no = {} # S慢砸包含的范围 __s_slow_sell_watch_indexes_dict = {} # 成交进度位置 __trade_progress_index_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(SCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() I = 0 return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "s_cancel_real_place_order_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.__s_cancel_real_place_order_index_cache, code, val) finally: RedisUtils.realse(__redis) # 设置真实下单位置 def __save_real_place_order_index(self, code, index, is_default): CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, (index, is_default)) key = "s_cancel_real_place_order_index-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default))) def __get_real_place_order_index_info_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code) if cache_result[0]: return cache_result[1] return None def get_real_place_order_index_cache(self, code): cache_result = self.__get_real_place_order_index_info_cache(code) if cache_result: return cache_result[0] return None def __clear_data(self, code): CodeDataCacheUtil.clear_cache(self.__s_cancel_real_place_order_index_cache, code) CodeDataCacheUtil.clear_cache(self.__s_down_watch_indexes_dict, code) CodeDataCacheUtil.clear_cache(self.__trade_progress_index_dict, code) CodeDataCacheUtil.clear_cache(self.__s_slow_sell_watch_indexes_dict, code) ks = ["s_big_num_cancel_compute_data-{}".format(code), "s_cancel_real_place_order_index-{}".format(code)] for key in ks: RedisUtils.delete_async(self.__db, key) # 设置真实下单位置 def set_real_place_order_index(self, code, index, is_default): self.__save_real_place_order_index(code, index, is_default) if not is_default: self.__compute_s_slow_sell(code) def set_transaction_index(self, code, buy_single_index, index): self.__trade_progress_index_dict[code] = index self.__compute_s_slow_sell(code) def clear_data(self): ks = ["s_big_num_cancel_compute_data-*", "s_cancel_real_place_order_index-*"] for key in ks: keys = RedisUtils.keys(self.__get_redis(), key) for k in keys: code = k.split("-")[1] self.__clear_data(code) # 撤单成功 def cancel_success(self, code): self.__clear_data(code) # 下单成功 def place_order_success(self, code): self.__clear_data(code) def __get_fast_threshold_value(self, code): """ 获取S快炸的阈值 @param code: @return:(大单阈值, 800w内大单阈值) """ max60, yesterday = code_volumn_manager.CodeVolumeManager().get_histry_volumn(code) if max60: num = max60[0] limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price: money_y = min(round((num * float(limit_up_price)) / 1e8, 1), 50) money = int(round(15 * money_y + 276.5)) money_800 = int(round(money * 0.668)) return money, money_800 # 默认值 return 299, 200 # S前撤实现 def __need_cancel_for_up(self, code, big_sell_order_info, total_datas): # 查询是否是真的真实下单位置 trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 real_order_index_info = self.__get_real_place_order_index_info_cache(code) if real_order_index_info is None or real_order_index_info[1]: return False, "没找到真实下单位" real_order_index = real_order_index_info[0] total_deal_money = sum([x[1] * x[2] for x in big_sell_order_info[1]]) start_order_no = big_sell_order_info[1][0][3][1] # 防止分母位0 total_num = 1 # 获取正在成交的数据 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue if i == trade_index and dealing_info and str(total_datas[trade_index]["val"]["orderNo"]) == str( dealing_info[0]): # 减去当前正在成交的数据中已经成交了的数据 total_num -= dealing_info[1] // 100 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_num += val["num"] # 大于10w if total_deal_money >= 10 * 10000: cancel_result = self.__need_cancel_for_slow_sell(code, total_datas) if cancel_result[0]: return True, f"S慢砸:{cancel_result[1]}" # 卖金额>=均大单才触发重新囊括 THRESHOLD_MONEY, is_temp_threshold_money = radical_buy_data_manager.BeforeSubDealBigOrderManager().get_big_order_threshold_info( code) if total_deal_money >= THRESHOLD_MONEY: l2_log.s_cancel_debug(code, "准备更新L后囊括") start_order_no = big_sell_order_info[1][-1][4][1] latest_deal_time_ms = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True) real_trade_index = None for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue real_trade_index = i break if real_trade_index is None: l2_log.s_cancel_debug(code, f"没找到真实的成交进度:start_order_no-{start_order_no} 卖单-{big_sell_order_info}") return False, "" # 间隔1S以上才能重新囊括 if code in self.__recompute_l_down_time_dict and tool.trade_time_sub_with_ms(latest_deal_time_ms, self.__recompute_l_down_time_dict[ code]) < 1000: l2_log.s_cancel_debug(code, f"更新L后囊括:更新间隔在1s内,{latest_deal_time_ms}-{self.__recompute_l_down_time_dict[code]}") return False, "" self.__recompute_l_down_time_dict[code] = latest_deal_time_ms # 重新囊括L后 # 撤单时间比早成交时间大就需要计算在里面 LCancelBigNumComputer().re_compute_l_down_watch_indexes(code, big_sell_info=( real_trade_index, latest_deal_time_ms)) l2_log.s_cancel_debug(code, f"更新L后囊括完成:{(real_trade_index, latest_deal_time_ms)}") return False, "" def __need_cancel_for_slow_sell(self, code, total_datas): """ S慢撤的目的是判断较大金额卖占整个卖的比例不能太大 @param code: @param total_datas: @return: """ if code not in self.__s_slow_sell_watch_indexes_dict: return False, "S慢砸没有囊括" # 下单3分钟内有效 real_place_order_info = self.__get_real_place_order_index_info_cache(code) if not real_place_order_info: return False, "没有获取到真实下单位" if tool.trade_time_sub(total_datas[-1]['val']['time'], total_datas[real_place_order_info[0]]['val']['time']) > 180: return False, "超过守护时间" # 格式:[{监听index},当时正在成交的订单号,已经成交的手数] watch_info = self.__s_slow_sell_watch_indexes_dict.get(code) # 计算总买额 total_num = 0 for i in watch_info[0]: data = total_datas[i] val = data['val'] left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_num += val['num'] if val['orderNo'] == watch_info[1]: total_num -= watch_info[2] # 过滤最小金额 zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) zyltgb_unit_y = round(zyltgb / 100000000, 1) min_sell_money = int((zyltgb_unit_y / 2 + 5) * 10000) sell_orders = HuaXinSellOrderStatisticManager.get_latest_transaction_datas(code, min_sell_order_no= total_datas[real_place_order_info[0]]['val']['orderNo'], min_sell_money=min_sell_money) sell_order_num = sum([x[1] for x in sell_orders]) // 100 rate = round(sell_order_num / total_num, 2) threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code, SCancelRateManager.TYPE_S_SLOW) if rate > threshold_rate: return True, f"慢砸成交比例:{rate}/0.49({threshold_rate_msg}) 成交:{sell_order_num}/{total_num}" else: return False, f"尚未触发撤单比例:{rate}" # 计算S后撤监听的数据范围 def __compute_down_cancel_watch_index(self, code, big_sell_order_info, total_datas): trade_index, is_default = TradeBuyQueue().get_traded_index(code) real_order_index_info = self.__get_real_place_order_index_info_cache(code) if real_order_index_info is None or real_order_index_info[1]: return real_order_index = real_order_index_info[0] start_order_no = big_sell_order_info[1][-1][4][1] latest_deal_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True) if code in self.__s_down_watch_indexes_dict: # 更新囊括范围后1s内不能再次更新 if tool.trade_time_sub_with_ms(latest_deal_time, self.__s_down_watch_indexes_dict[code][0]) <= 1000: return watch_index_info = [] for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue if val['num'] * float(val['price']) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_index_info.append((i, val['num'])) watch_index_info.sort(key=lambda x: x[1], reverse=True) watch_indexes = set([x[0] for x in watch_index_info[:5]]) l2_log.s_cancel_debug(code, f"S后计算那概括范围:{watch_indexes} 最近成交的卖单信息:{big_sell_order_info[1][-1]}") if watch_indexes: # 保存卖单信息 self.__s_down_watch_indexes_dict[code] = (latest_deal_time, watch_indexes) # 是否有大卖单需要撤 def __need_cancel_for_big_sell_order(self, code, big_sell_order_info, order_begin_pos: OrderBeginPosInfo): """ @param code: @param big_sell_order_info:(总卖金额,[(卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)),...]) @param order_begin_pos: @return: """ # 需要排除成交时间在下单时间之前的 total_deal_money = 0 max_single_deal_money = 0 # 最大单笔卖金额 total_datas = local_today_datas.get(code) real_order_index_info = self.__get_real_place_order_index_info_cache(code) # 下单时间 real_order_time_ms = None real_order_index = None if real_order_index_info and not real_order_index_info[1]: real_order_index = real_order_index_info[0] real_order_time_ms = total_datas[real_order_index]["val"]["time"] + ".{0:0>3}".format( total_datas[real_order_index]["val"]["tms"]) for x in big_sell_order_info[1]: deal_time = l2_huaxin_util.convert_time(x[4][0], with_ms=True) if real_order_time_ms: if tool.trade_time_sub_with_ms(deal_time, real_order_time_ms) >= 0: m = x[1] * x[2] max_single_deal_money = max(max_single_deal_money, m) total_deal_money += m else: m = x[1] * x[2] max_single_deal_money = max(max_single_deal_money, m) total_deal_money += m zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) zyltgb_unit_y = round(zyltgb / 100000000, 1) limit_up_price = gpcode_manager.get_limit_up_price(code) limit_up_price = round(float(limit_up_price), 2) # 有超大单砸 S重砸 threshold_big_deal = (2 * zyltgb_unit_y + 339) * 10000 if total_deal_money >= threshold_big_deal: # S重砸必撤的金额满足以后,以前是无脑撤。现在优化一下,看成交进度位---真实下单位的区间额≤重砸金额*3.3倍,就撤。 try: # 单笔重砸 if max_single_deal_money >= threshold_big_deal: # 上证,下单3分钟内 try: if tool.is_sh_code(code) and real_order_index is not None and tool.trade_time_sub( total_datas[-1]["val"]["time"], total_datas[real_order_index]["val"]["time"]) < 3 * 60: # 上证如果重砸额大于阈值的1.5倍直接撤单 if not gpcode_manager.MustBuyCodesManager().is_in_cache(code): if total_deal_money >= threshold_big_deal * 1.5: return True, f"1s内成交({total_deal_money}) 大于大卖单({threshold_big_deal})的1.5倍" # 如果没有大单成交也直接撤单 deal_big_order_count = BigOrderDealManager().get_total_buy_count(code) if deal_big_order_count < 1: return True, f"1s内成交({total_deal_money}) 大于大卖单({threshold_big_deal})且无大单成交" except Exception as e: l2_log.s_cancel_debug(code, "S重砸出错了:{}", str(e)) need_compute = False trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 if real_order_index_info and not real_order_index_info[1]: need_compute = True if need_compute: total_count, total_num = L2DataComputeUtil.compute_left_buy_order(code, trade_index, real_order_index_info[0], limit_up_price) if total_num == 0: total_num = 1 threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code, SCancelRateManager.TYPE_S_FAST_BIG) if total_deal_money / (total_num * limit_up_price * 100) >= threshold_rate: # 大单成交额占总剩余总囊括的30% return True, f"1s内大于{threshold_big_deal}({threshold_rate_msg})大卖单({total_deal_money})" else: return True, f"1s内大于{threshold_big_deal}大卖单({total_deal_money})" except Exception as e: l2_log.info(code, logger_debug, f"S快计算出错:{str(e)}") # 判断是否为激进下单 threash_money_w, threash_money_danger_w = self.__get_fast_threshold_value(code) total_fast_num = 0 try: trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 if real_order_index_info and not real_order_index_info[1]: real_order_index = real_order_index_info[0] # 开始第一笔成交数据 fdeal = big_sell_order_info[1][0][3] start_order_no = fdeal[1] sell_time_str = l2_huaxin_util.convert_time(fdeal[0], with_ms=False) total_num = 0 # 获取正在成交的数据 # 计算成交进度到真实下单位的纯买额 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if int(val['orderNo']) < start_order_no: continue if i == trade_index and dealing_info and str( total_datas[trade_index]["val"]["orderNo"]) == str( dealing_info[0]): # 减去当前正在成交的数据中已经成交了的数据 total_num -= dealing_info[1] // 100 total_fast_num -= dealing_info[1] // 100 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_num += val["num"] total_fast_num += val['num'] if total_num * float(limit_up_price) < 800 * 100 and tool.trade_time_sub(sell_time_str, total_datas[real_order_index][ 'val']['time']) < 15 * 60: # 15分钟内真实成交位距离真实下单位,金额≤800万 ,大单阈值变为200w threash_money_w = threash_money_danger_w except Exception as e: l2_log.s_cancel_debug(code, f"S撤积极下单计算大单卖阈值出错:{str(e)}") total_fast_money = int(total_fast_num * 100 * float(limit_up_price)) if total_fast_money == 0: # 防止分母为0 total_fast_money = 1 rate = round(total_deal_money / total_fast_money, 2) threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code, SCancelRateManager.TYPE_S_FAST) if total_deal_money >= threash_money_w * 10000 and rate >= threshold_rate: return True, f"近1s有大卖单({round(total_deal_money / 10000, 1)}万/{threash_money_w}万({threshold_rate_msg}),成交占比:{total_deal_money}/{total_fast_money})" else: l2_log.s_cancel_debug(code, f"S快撤没达到撤单比例:成交金额({total_deal_money})/囊括金额({total_fast_money}),比例:{rate} 大单阈值:{threash_money_w}w") return False, f"无{threash_money_w}大单" # big_sell_order_info格式:[总共的卖单金额, 大卖单详情列表] def set_big_sell_order_info_for_cancel(self, code, big_sell_order_info, order_begin_pos: OrderBeginPosInfo): if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0: return False, "还未下单" if big_sell_order_info[0] < 500000 or not big_sell_order_info[1]: return False, "无大单卖" l2_log.s_cancel_debug(code, f"主动卖:{big_sell_order_info}") try: need_cancel, cancel_msg = self.__need_cancel_for_big_sell_order(code, big_sell_order_info, order_begin_pos) if need_cancel: return need_cancel, cancel_msg total_datas = local_today_datas.get(code) need_cancel, cancel_msg = self.__need_cancel_for_up(code, big_sell_order_info, total_datas) if need_cancel: return need_cancel, cancel_msg # S后撤取消 # if self.__latest_process_sell_order_no.get(code) != big_sell_order_info[1][-1][0]: # # 不处理重复的卖单 # # 获取最新的成交时间 # latest_trade_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0]) # if tool.trade_time_sub(latest_trade_time, # total_datas[order_begin_pos.buy_single_index]['val']['time']) >= 180: # self.__compute_down_cancel_watch_index(code, big_sell_order_info, total_datas) return False, "不满足撤单条件" finally: # self.__latest_process_sell_order_no[code] = big_sell_order_info[1][-1][0] pass # ----------------------------S慢砸范围计算------------------------------------ def __compute_s_slow_sell(self, code): try: if code in self.__s_slow_sell_watch_indexes_dict: return real_place_order_info = self.__get_real_place_order_index_info_cache(code) if not real_place_order_info or real_place_order_info[1]: return trade_index = self.__trade_progress_index_dict.get(code) if trade_index is None: return start_index = trade_index end_index = real_place_order_info[0] total_datas = local_today_datas.get(code) watch_indexes = set() for i in range(start_index, end_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val['num'] * float(val['price']) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes.add(i) dealing_order_no = None dealed_num = 0 # 格式:[监听的索引,正在成交索引,已成交的数量] dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) if dealing_info: dealing_order_no = dealing_info[0] dealed_num = dealing_info[1] // 100 watch_info = [watch_indexes, dealing_order_no, dealed_num] self.__s_slow_sell_watch_indexes_dict[code] = watch_info l2_log.s_cancel_debug(code, f"S慢砸监听范围:{watch_info}") except Exception as e: logger_l2_s_cancel.exception(e) # ---------------------------------L撤------------------------------- class LCancelRateManager: __block_limit_up_count_dict = {} __block_limit_up_count_for_l_up_dict = {} __big_num_deal_rate_dict = {} __MustBuyCodesManager = gpcode_manager.MustBuyCodesManager() # 获取撤单比例,返回(撤单比例,是否必买) @classmethod def get_cancel_rate(cls, code, is_up=False, is_l_down_recomputed=False, buy_mode=None): try: must_buy = cls.__MustBuyCodesManager.is_in_cache(code) if buy_mode == OrderBeginPosInfo.MODE_RADICAL: if must_buy: # 扫入加红 return constant.L_CANCEL_RATE_WITH_MUST_BUY_FOR_REDICAL_BUY, True else: # 根据成交额的大单成交占比来计算撤单比例 big_money_rate = radical_buy_data_manager.TotalDealBigOrderInfoManager.get_big_order_rate(code) if big_money_rate is not None: threshold_rate = min(big_money_rate * 3, 0.95) return threshold_rate, False else: deal_big_order_info = radical_buy_data_manager.get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num( code)) deal_rate = round(deal_big_order_info[1] / deal_big_order_info[2], 2) threshold_rate = 0.5 * deal_rate + 0.35 threshold_rate = max(threshold_rate, 0.375) threshold_rate = min(threshold_rate, 0.95) return threshold_rate, False if must_buy: if is_up: return constant.L_CANCEL_RATE_UP_WITH_MUST_BUY, True else: return constant.L_CANCEL_RATE_WITH_MUST_BUY, True except Exception as e: async_log_util.error(logger_l2_l_cancel, str(e)) base_rate = constant.L_CANCEL_RATE if is_up: base_rate = constant.L_CANCEL_RATE_UP if tool.is_sh_code(code): base_rate = constant.L_CANCEL_RATE_UP_SH try: block_rate = 0 count_dict = cls.__block_limit_up_count_dict if is_up: count_dict = cls.__block_limit_up_count_for_l_up_dict if code in count_dict: count = count_dict[code] rates = [0, 0.03, 0.06, 0.08, 0.12] if count >= len(rates): block_rate = rates[-1] else: block_rate = rates[count] deal_rate = 0 if code in cls.__big_num_deal_rate_dict: deal_rate = round(cls.__big_num_deal_rate_dict[code] / 100) base_rate += block_rate base_rate += deal_rate except Exception as e: l2_log.l_cancel_debug(code, f"计算撤单比例出错:{e}") return round(base_rate, 2), False # 获取L后成交太快的撤单比例 @classmethod def get_fast_deal_cancel_rate(cls, code): must_buy_cancel_rate = cls.__MustBuyCodesManager.get_cancel_rate_cache(code) if must_buy_cancel_rate is not None: return must_buy_cancel_rate return constant.L_CANCEL_FAST_DEAL_RATE # 设置板块涨停数量(除开自己) @classmethod def set_block_limit_up_count(cls, reason_codes_dict, limit_up_time_dict: dict): for reason in reason_codes_dict: codes = reason_codes_dict[reason] codes = list(codes) # 目标票在确认的涨停原因中,在总的身位的≤50%以外,则L前的涨停影响比例因素不生效。 codes.sort(key=lambda x: ( int(limit_up_time_dict.get(x).replace(":", "")) if x in limit_up_time_dict else int("150000"))) for i in range(len(codes)): c = codes[i] cls.__block_limit_up_count_dict[c] = len(codes) - 1 if i < len(codes) / 2: cls.__block_limit_up_count_for_l_up_dict[c] = len(codes) - 1 else: cls.__block_limit_up_count_for_l_up_dict[c] = 0 @classmethod def set_big_num_deal_info(cls, code, buy_money, sell_money): left_money_w = (buy_money - sell_money) // 10000 if left_money_w > 0: rate = ((left_money_w + 300) // 900) * 2 else: rate = ((left_money_w + 599) // 900) * 2 if rate < -10: rate = -10 if rate > 10: rate = 10 cls.__big_num_deal_rate_dict[code] = rate @classmethod def compute_big_num_deal_info(cls, code): total_buy_money = BigOrderDealManager().get_total_buy_money(code) total_sell_money = BigOrderDealManager().get_total_sell_money(code) cls.set_big_num_deal_info(code, total_buy_money, total_sell_money) # 计算成交位置之后的大单(特定笔数)的撤单比例 class LCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __last_trade_progress_dict = {} __real_place_order_index_dict = {} __cancel_watch_index_info_cache = {} # L后真实成交位之后的位置索引 __cancel_l_down_after_place_order_index_cache = {} # 成交位附近临近大单索引 __near_by_trade_progress_index_cache = {} __SCancelBigNumComputer = SCancelBigNumComputer() # L后囊括范围未撤单/未成交的总手数 __total_l_down_not_deal_num_dict = {} # L后最近的成交数信息 __l_down_latest_deal_info_dict = {} __last_l_up_compute_info = {} __l_down_after_by_big_order_dict = {} # 权重 __l_down_after_by_big_order_weight_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "l_cancel_watch_index_info-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) if val: val = json.loads(val) val[2] = set(val[2]) CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_info_cache, code, val) keys = RedisUtils.keys(__redis, "l_cancel_real_place_order_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) CodeDataCacheUtil.set_cache(cls.__real_place_order_index_dict, code, val) keys = RedisUtils.keys(__redis, "l_cancel_near_by_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) try: val = set(json.loads(val)) CodeDataCacheUtil.set_cache(cls.__near_by_trade_progress_index_cache, code, val) except: pass keys = RedisUtils.keys(__redis, "l_cancel_down_after_place_order_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) try: val = json.loads(val) CodeDataCacheUtil.set_cache(cls.__cancel_l_down_after_place_order_index_cache, code, val) except: pass finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def __set_watch_indexes(self, code, buy_single_index, re_compute: int, indexes): self.__cancel_watch_index_info_cache[code] = (buy_single_index, re_compute, indexes) RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}") RedisUtils.setex_async(self.__db, f"l_cancel_watch_index_info-{code}", tool.get_expire(), json.dumps((buy_single_index, re_compute, list(indexes)))) if indexes: trade_record_log_util.add_cancel_watch_indexes_log(code, trade_record_log_util.CancelWatchIndexesInfo( trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN, buy_single_index, list(indexes))) def __get_watch_indexes_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_info_cache, code) if cache_result[0]: return cache_result[1] return None def get_l_down_watch_indexes_cache(self, code): """ 获取L后的监听范围 @param code: @return:(buy_single_index, re_compute, indexes) """ return self.__get_watch_indexes_cache(code) def get_real_place_order_index_info(self, code): """ 获取真实下单索引信息 @param code: @return:(index, is_default) """ return self.__real_place_order_index_dict.get(code) def set_l_down_watch_index_info(self, code, buy_single_index, re_compute: int, indexes): """ 设置l后索引 @param code: @param buy_single_index: @param re_compute: @param indexes: @return: """ self.__set_watch_indexes(code, buy_single_index, re_compute, indexes) def __set_near_by_trade_progress_indexes(self, code, buy_single_index, indexes): if indexes: trade_record_log_util.add_cancel_watch_indexes_log(code, trade_record_log_util.CancelWatchIndexesInfo( trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_UP, buy_single_index, list(indexes))) self.__near_by_trade_progress_index_cache[code] = indexes RedisUtils.setex_async(self.__db, f"l_cancel_near_by_index-{code}", tool.get_expire(), json.dumps(list(indexes))) def __get_near_by_trade_progress_indexes(self, code): val = RedisUtils.get(self.__get_redis(), f"l_cancel_near_by_index-{code}") if val is None: return None return set(json.loads(val)) def __get_near_by_trade_progress_indexes_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__near_by_trade_progress_index_cache, code) if cache_result[0]: return cache_result[1] return None def __set_cancel_l_down_after_place_order_index(self, code, watch_index, index): if code not in self.__cancel_l_down_after_place_order_index_cache: self.__cancel_l_down_after_place_order_index_cache[code] = {} self.__cancel_l_down_after_place_order_index_cache[code][str(watch_index)] = index RedisUtils.setex_async(self.__db, f"l_cancel_down_after_place_order_index-{code}", tool.get_expire(), json.dumps(self.__cancel_l_down_after_place_order_index_cache[code])) def __get_cancel_l_down_after_place_order_index_dict(self, code): return self.__cancel_l_down_after_place_order_index_cache.get(code) def del_watch_index(self, code): CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_info_cache, code) RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}") def clear(self, code=None): if code: self.del_watch_index(code) if code in self.__l_down_after_by_big_order_dict: self.__l_down_after_by_big_order_dict.pop(code) if code in self.__l_down_after_by_big_order_weight_dict: self.__l_down_after_by_big_order_weight_dict.pop(code) if code in self.__real_place_order_index_dict: self.__real_place_order_index_dict.pop(code) RedisUtils.delete_async(self.__db, f"l_cancel_real_place_order_index-{code}") if code in self.__cancel_l_down_after_place_order_index_cache: self.__cancel_l_down_after_place_order_index_cache.pop(code) RedisUtils.delete_async(self.__db, f"l_cancel_down_after_place_order_index-{code}") else: keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index_info-*") for k in keys: code = k.replace("l_cancel_watch_index_info-", "") if code in self.__last_trade_progress_dict: self.__last_trade_progress_dict.pop(code) if code in self.__real_place_order_index_dict: self.__real_place_order_index_dict.pop(code) self.del_watch_index(code) keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_real_place_order_index-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) # 清除L后真实下单位置之后囊括的索引 self.__cancel_l_down_after_place_order_index_cache.clear() keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_down_after_place_order_index-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) self.__l_down_after_by_big_order_dict.clear() self.__l_down_after_by_big_order_weight_dict.clear() # 重新计算L上 # big_sell_info卖单信息,格式:[最近成交索引,最近成交时间(带毫秒)] def re_compute_l_down_watch_indexes(self, code, big_sell_info=None): watch_index_info = self.__cancel_watch_index_info_cache.get(code) if not watch_index_info or watch_index_info[1] > 0: return # 获取成交进度位与真实下单位置 real_place_order_index_info = self.__real_place_order_index_dict.get(code) last_trade_progress_index, is_default = TradeBuyQueue().get_traded_index(code) if big_sell_info: last_trade_progress_index = big_sell_info[0] if not real_place_order_index_info or last_trade_progress_index is None: return min_cancel_time_with_ms = None if big_sell_info: min_cancel_time_with_ms = big_sell_info[1] self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1, real_place_order_index_info[0], re_compute=1, min_cancel_time_with_ms=min_cancel_time_with_ms, msg=f"大单卖: 成交进度-{big_sell_info}" if big_sell_info is not None else '') # 计算观察索引,倒序计算 # re_compute:是否是重新计算的 # min_cancel_time_with_ms:最小撤单时间,大于等于此时间的撤单需要囊括进去 def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0, min_cancel_time_with_ms=None, msg=""): try: l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}") total_datas = local_today_datas.get(code) if re_compute > 0 and tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[buy_single_index]["val"][ "time"]) < 2 * 60 and min_cancel_time_with_ms is None: # 封单额稳了以后,间隔超过2分钟才能重新计算 l2_log.l_cancel_debug(code, f"要间隔2分钟过后才能重新计算") return if total_datas: # 计算的上截至位距离下截至位纯买额要小于2.5倍m值 # thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) # thresh_hold_money = int(thresh_hold_money * 2.5) min_num = int(5000 / gpcode_manager.get_limit_up_price_as_num(code)) # 统计净涨停买的数量 not_cancel_indexes_with_num = [] re_start_index = start_index MAX_COUNT = 5 for j in range(start_index, end_index): data = total_datas[j] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, j, total_datas, local_today_canceled_buyno_map.get( code)) if not cancel_data: not_cancel_indexes_with_num.append((j, val["num"])) elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms, L2DataUtil.get_time_with_ms( cancel_data['val'])) <= 0: not_cancel_indexes_with_num.append((j, val["num"])) min_num = 0 if not_cancel_indexes_with_num: temp_count = len(not_cancel_indexes_with_num) # 取后1/5的数据 if temp_count >= 30: temp_index = int(temp_count * 4 / 5) if tool.is_sh_code(code): # 上证取后3/10 temp_index = int(temp_count * 7 / 10) re_start_index = not_cancel_indexes_with_num[temp_index][0] MAX_COUNT = len(not_cancel_indexes_with_num[temp_index:]) else: not_cancel_indexes_with_num.sort(key=lambda x: x[1]) if temp_count >= 10: # 获取中位数 min_num = not_cancel_indexes_with_num[temp_count // 2][1] MIN_MONEYS = [300, 200, 100] watch_indexes = set() for min_money in MIN_MONEYS: for i in range(end_index, re_start_index - 1, -1): try: data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if float(val['price']) * val['num'] < min_money * 100: continue if val['num'] < min_num: continue cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if not cancel_data: watch_indexes.add(i) if len(watch_indexes) >= MAX_COUNT: break elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms, L2DataUtil.get_time_with_ms( cancel_data['val'])) <= 0: watch_indexes.add(i) if len(watch_indexes) >= MAX_COUNT: break except Exception as e: logger_l2_l_cancel.error(f"{code}: 范围: {start_index}-{end_index} 位置:{i}") logger_l2_l_cancel.exception(e) if len(watch_indexes) >= MAX_COUNT: break if watch_indexes: ##判断监听的数据中是否有大单## # 之前的大单为100w,现在改为正常大单 has_big_num = False BIG_ORDER_NUM_THRESHOLD = l2_data_util.get_big_money_val( gpcode_manager.get_limit_up_price_as_num(code), tool.is_ge_code(code)) BIG_ORDER_NUM_THRESHOLD = int( round(BIG_ORDER_NUM_THRESHOLD / (gpcode_manager.get_limit_up_price_as_num(code) * 100))) for i in watch_indexes: # 是否有大单 data = total_datas[i] val = data['val'] if val['num'] > BIG_ORDER_NUM_THRESHOLD: has_big_num = True break if not has_big_num: # 无大单,需要找大单 for i in range(re_start_index, start_index, -1): data = total_datas[i] val = data['val'] # 涨停买,且未撤单 if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if val['num'] < BIG_ORDER_NUM_THRESHOLD: continue cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if not cancel_data: watch_indexes.add(i) break elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms, L2DataUtil.get_time_with_ms( cancel_data['val'])) <= 0: watch_indexes.add(i) break # 计算真实下单位置到成交进度位的最大未撤大单前2 big_order_list = [] for i in range(start_index, end_index): data = total_datas[i] val = data['val'] # 涨停买,且未撤单 if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if val['num'] < BIG_ORDER_NUM_THRESHOLD: continue cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if not cancel_data: big_order_list.append((i, val['num'])) if big_order_list: big_order_list.sort(key=lambda x: x[1], reverse=True) watch_indexes |= set([x[0] for x in big_order_list[:2]]) # 获取真实下单位后面10笔大单 watch_indexes_after = self.__compute_l_down_watch_index_after_real_place_order_index(code) if watch_indexes_after: watch_indexes |= watch_indexes_after self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes) l2_log.l_cancel_debug(code, f"设置监听范围({msg}){'(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}") except Exception as e: l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}") async_log_util.exception(logger_l2_l_cancel, e) def __need_update_l_down_after(self, code, start_index, end_index): """ 是否需要更新l后真实下单位置之后的囊括范围 @param code: @return: """ real_place_order_info = self.__real_place_order_index_dict.get(code) if not real_place_order_info or real_place_order_info[1]: return False # 判断L后是否包含后面的数据 watch_indexes_info = self.__get_watch_indexes_cache(code) if not watch_indexes_info: # 没有囊括 return False watch_indexes = set([int(i) for i in watch_indexes_info[2]]) real_place_order_index = real_place_order_info[0] for index in watch_indexes: if index > real_place_order_index: # L后后段已经囊括 return False # 下单位置之后数10笔买单 watch_indexes = set() total_datas = local_today_datas.get(code) MIN_NUM = int(5000 / gpcode_manager.get_limit_up_price_as_num(code)) MAX_COUNT = 10 for i in range(real_place_order_index + 1, end_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val['num'] < MIN_NUM: continue watch_indexes.add(i) if len(watch_indexes) >= MAX_COUNT: break # 看里面的撤单率是否 if len(watch_indexes) < MAX_COUNT: # 数量不够 return False # 计算撤单比例是否足够 canceled_buyno_map = local_today_canceled_buyno_map.get(code) cancel_count = 0 for index in watch_indexes: # 是否撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, index, total_datas, canceled_buyno_map) if left_count <= 0: cancel_count += 1 if cancel_count > len(watch_indexes) * 0.5: return True return False def __compute_l_down_watch_index_after_real_place_order_index(self, code): """ 计算L后真实下单位置之后的监控索引 @return: """ watch_indexes = set() total_datas = local_today_datas.get(code) is_ge_code = tool.is_ge_code(code) try: # 真实下单位置后面的数据就只看大单 MIN_NUM = int(5000 / gpcode_manager.get_limit_up_price_as_num(code)) real_place_order_info = self.__real_place_order_index_dict.get(code) if real_place_order_info and not real_place_order_info[1]: # 从真实下单位往后找 after_count = 0 for i in range(real_place_order_info[0] + 1, total_datas[-1]['index'] + 1): if after_count > 10: break data = total_datas[i] val = data['val'] # 涨停买,且未撤单 if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if val['num'] < MIN_NUM: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: after_count += 1 if l2_data_util.is_big_money(val, is_ge_code): watch_indexes.add(i) # 记录索引的位置 self.__set_cancel_l_down_after_place_order_index(code, i, after_count - 1) except Exception as e: pass return watch_indexes def __compute_l_down_watch_index_after_by(self, code): """ 计算L后后半段大单备用 @param code: @return: """ if self.__l_down_after_by_big_order_dict.get(code): # 不用重复计算 return real_place_order_info = self.__real_place_order_index_dict.get(code) if not real_place_order_info or real_place_order_info[1]: return real_place_order_index = real_place_order_info[0] total_datas = local_today_datas.get(code) limit_up_price = gpcode_manager.get_limit_up_price(code) limit_up_price = round(float(limit_up_price), 2) bigger_num = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) // (limit_up_price * 100) min_num = 500000 // (limit_up_price * 100) index = -1 watch_indexes = set() if code not in self.__l_down_after_by_big_order_weight_dict: self.__l_down_after_by_big_order_weight_dict[code] = {} for i in range(real_place_order_index + 1, total_datas[-1]["index"]): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: if val["num"] >= bigger_num: if index < 0: # 开始计数 index = 0 # 加入大单监听 self.__l_down_after_by_big_order_weight_dict[code][str(i)] = int(str(index)) watch_indexes.add(i) if index >= 0: index += 1 if index > 10: break self.__l_down_after_by_big_order_dict[code] = watch_indexes l2_log.l_cancel_debug(code, f"L后后半段大单备用囊括范围:{watch_indexes}") try: watch_indexes_info = self.get_l_down_watch_indexes_cache(code) if watch_indexes_info: total_watch_indexes = set(watch_indexes) | set(watch_indexes_info[2]) trade_record_log_util.add_cancel_watch_indexes_log(code, trade_record_log_util.CancelWatchIndexesInfo( trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN, watch_indexes_info[0], list(total_watch_indexes))) except: pass # 设置真实下单位置 def set_real_place_order_index(self, code, index, buy_single_index=None, is_default=False): l2_log.l_cancel_debug(code, f"设置真实下单位-{index},buy_single_index-{buy_single_index}") self.__real_place_order_index_dict[code] = (index, is_default) RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(), json.dumps((index, is_default))) trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 if buy_single_index is not None: # L后撤从成交进度位开始计算 self.compute_watch_index(code, buy_single_index, trade_index, index) self.__compute_trade_progress_near_by_indexes(code, buy_single_index, trade_index, index) # 重新计算L前 def re_compute_l_up_watch_indexes(self, code, buy_single_index): trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: return if code not in self.__real_place_order_index_dict: return if code not in self.__last_l_up_compute_info or time.time() - self.__last_l_up_compute_info[code][0] >= 3: self.__compute_trade_progress_near_by_indexes(code, buy_single_index, trade_index + 1, self.__real_place_order_index_dict.get(code)[0]) # 计算范围内的成交位临近未撤大单 def __compute_trade_progress_near_by_indexes(self, code, buy_single_index, start_index, end_index): if start_index is None or end_index is None: return total_datas = local_today_datas.get(code) MIN_MONEY = 99 * 100 MAX_COUNT = 15 watch_indexes = set() total_num = 0 # thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) # threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100) for i in range(start_index, end_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if float(val['price']) * val['num'] < MIN_MONEY: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_num += val['num'] * left_count watch_indexes.add(i) if len(watch_indexes) >= MAX_COUNT: break changed = True l_up_compute_info = self.__last_l_up_compute_info.get(code) if l_up_compute_info: if l_up_compute_info[1] == watch_indexes: changed = False # 保存数据 if changed: threshold_time = 1 if tool.is_sz_code(code) else 2 if l_up_compute_info and l_up_compute_info[1]: if time.time() - l_up_compute_info[0] < threshold_time: l2_log.l_cancel_debug(code, f"L前监控更新太频繁:{threshold_time}") return l2_log.l_cancel_debug(code, f"L前监控范围:{watch_indexes} 计算范围:{start_index}-{end_index}") self.__set_near_by_trade_progress_indexes(code, buy_single_index, watch_indexes) self.__last_l_up_compute_info[code] = (time.time(), watch_indexes) # 计算L后还没成交的手数 def __compute_total_l_down_not_deal_num(self, code): # 只有真实获取到下单位置后才开始计算 try: if code in self.__total_l_down_not_deal_num_dict and time.time() - \ self.__total_l_down_not_deal_num_dict[code][ 1] < 1: # 需要间隔1s更新一次 return l_down_cancel_info = self.__cancel_watch_index_info_cache.get(code) if not l_down_cancel_info: return trade_progress, is_default = TradeBuyQueue().get_traded_index(code) if trade_progress is None: trade_progress = 0 if trade_progress is None: return l_down_indexes = l_down_cancel_info[2] # 统计还未成交的数据 total_left_num = 0 total_datas = local_today_datas.get(code) for i in l_down_indexes: # 已经成交了的不计算 if i < trade_progress: continue data = total_datas[i] val = data['val'] left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: fnum = val["num"] if i == trade_progress: # 需要减去已经成交的数据 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) if dealing_info: if str(val["orderNo"]) == str(dealing_info[0]): fnum -= dealing_info[1] // 100 total_left_num += fnum self.__total_l_down_not_deal_num_dict[code] = (total_left_num, time.time()) except Exception as e: async_log_util.exception(logger_l2_l_cancel, e) # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化 def set_trade_progress(self, code, buy_single_index, index, total_datas): if self.__last_trade_progress_dict.get(code) == index: self.__compute_total_l_down_not_deal_num(code) return self.__last_trade_progress_dict[code] = index self.__compute_total_l_down_not_deal_num(code) if total_datas is None: return if not self.__is_l_down_can_cancel(code, buy_single_index): # L后已经不能守护 l2_log.l_cancel_debug(code, f"L后已经无法生效:buy_single_index-{buy_single_index}") HourCancelBigNumComputer().start_compute_watch_indexes(code, buy_single_index) try: # 计算L后后半段大单监控范围 self.__compute_l_down_watch_index_after_by(code) except Exception as e: l2_log.l_cancel_debug(code, "__compute_l_down_watch_index_after_by出错") real_place_order_index_info = self.__real_place_order_index_dict.get(code) real_place_order_index = None if real_place_order_index_info: real_place_order_index = real_place_order_index_info[0] # 重新计算成交位置临近大单撤单 self.__compute_trade_progress_near_by_indexes(code, buy_single_index, index + 1, real_place_order_index) # 已经成交的索引 def add_deal_index(self, code, index, buy_single_index): """ L后囊括范围成交一笔重新囊括1笔 @param code: @param index: @param buy_single_index: @return: """ watch_indexes_info = self.__get_watch_indexes_cache(code) if not watch_indexes_info: return watch_indexes = watch_indexes_info[2] if index not in watch_indexes: return if buy_single_index is None: return # 重新囊括1笔 real_place_order_info = self.__real_place_order_index_dict.get(code) is_ge_code = tool.is_ge_code(code) if real_place_order_info and real_place_order_info[0] > index: total_datas = local_today_datas.get(code) min_num = int(5000 / gpcode_manager.get_limit_up_price_as_num(code)) for j in range(real_place_order_info[0] - 1, index, -1): data = total_datas[j] val = data['val'] if data["index"] in watch_indexes: continue if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, j, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes.add(data["index"]) l2_log.l_cancel_debug(code, f"L后有成交重新囊括:成交索引-{index} 囊括索引-{data['index']}") break # 从真实下单位置往后囊括大单 try: left_count_after = 0 for j in range(real_place_order_info[0] + 1, total_datas[-1]["index"]): data = total_datas[j] val = data['val'] if left_count_after > 10: break if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, j, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: left_count_after += 1 if l2_data_util.is_big_money(val, is_ge_code) and j not in watch_indexes: watch_indexes.add(j) l2_log.l_cancel_debug(code, f"L后有成交后半段增加囊括:{j}") self.__set_cancel_l_down_after_place_order_index(code, j, left_count_after - 1) break except: pass self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes) def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code): """ L后撤单; 撤单计算规则:计算撤单比例时,将真实下单位置之后的数据按权重(离下单位最近的权重越大)加入分子,不加入分母(总囊括手数)计算 @param code: @param buy_exec_index: @param start_index: @param end_index: @param total_data: @param is_first_code: @return: """ watch_indexes_info = self.__get_watch_indexes_cache(code) if not watch_indexes_info: return False, None # 这是下单位置之后的索引: key为字符串 after_place_order_index_dict = self.__get_cancel_l_down_after_place_order_index_dict(code) if after_place_order_index_dict is None: after_place_order_index_dict = {} after_place_order_index_by_dict = self.__l_down_after_by_big_order_weight_dict.get(code) if after_place_order_index_by_dict is None: after_place_order_index_by_dict = {} watch_indexes = set([int(i) for i in watch_indexes_info[2]]) try: # 将备用订单加进去 watch_indexes_by = self.__l_down_after_by_big_order_dict.get(code) if watch_indexes_by: # 是否是下单30分钟内 real_place_order_info = self.__real_place_order_index_dict.get(code) if real_place_order_info and tool.trade_time_sub(total_data[-1]["val"]["time"], total_data[real_place_order_info[0]]["val"][ "time"]) < 30 * 60: # 下单30分钟内有效 watch_indexes |= watch_indexes_by else: # 清除备用大单 watch_indexes_by.clear() except Exception as e: l2_log.l_cancel_debug(code, "将L2后后半段备用大单加入计算出错:{}", str(e)) # 计算监听的总条数 total_num = 0 max_num = 0 thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code) for wi in watch_indexes: if str(wi) in after_place_order_index_dict: # 如果加红就需要计算分母 if must_buy: total_num += total_data[wi]["val"]["num"] * ( 10 - after_place_order_index_dict[str(wi)]) // 10 continue elif str(wi) in after_place_order_index_by_dict: if must_buy: total_num += total_data[wi]["val"]["num"] * ( 10 - after_place_order_index_by_dict[str(wi)]) // 10 continue total_num += total_data[wi]["val"]["num"] * total_data[wi]["re"] if total_data[wi]["val"]["num"] > max_num: max_num = total_data[wi]["val"]["num"] # 判断撤单中是否有监听中的索引 need_compute = False for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy_cancel(val): # 查询买入位置 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get( code)) if buy_index is not None and buy_index in watch_indexes: need_compute = True break if need_compute: # 计算撤单比例 watch_indexes_list = list(watch_indexes) watch_indexes_list.sort() canceled_num = 0 # 记录撤单索引 canceled_indexes = [] deal_order_nos = HuaXinBuyOrderManager().get_deal_buy_order_nos(code) if deal_order_nos is None: deal_order_nos = set() trade_index, is_default = TradeBuyQueue().get_traded_index(code) if is_default: trade_index = None canceled_buyno_map = local_today_canceled_buyno_map.get(code) for wi in watch_indexes: cancel_data = L2DataComputeUtil.is_canceled(code, wi, total_data, canceled_buyno_map, trade_index, deal_order_nos) if cancel_data: if str(wi) in after_place_order_index_dict: # 真实下单位置之后的按照权重比例来计算 canceled_num += total_data[wi]["val"]["num"] * ( 10 - after_place_order_index_dict[str(wi)]) // 10 elif str(wi) in after_place_order_index_by_dict: canceled_num += total_data[wi]["val"]["num"] * ( 10 - after_place_order_index_by_dict[str(wi)]) // 10 else: canceled_num += total_data[wi]["val"]["num"] canceled_indexes.append(cancel_data["index"]) # if wi == watch_indexes_list[-1] and left_count == 0: # # 离下单位置最近的一个撤单,必须触发撤单 # l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},临近撤单:{wi}") # return True, total_data[-1] rate = round(canceled_num / total_num, 3) # 除开最大单的影响权重 if not must_buy: temp_thresh_hold_rate = round((total_num - max_num) * 0.9 / total_num, 2) if thresh_hold_rate > temp_thresh_hold_rate: # 目标撤单比例大于大单撤单比例就取比例均值 thresh_hold_rate = round((thresh_hold_rate + temp_thresh_hold_rate) / 2, 2) l2_log.l_cancel_debug(code, f"L后计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}, 下单位之后的索引:{after_place_order_index_dict}") if rate >= thresh_hold_rate: canceled_indexes.sort() l2_log.l_cancel_debug(code, f"L后撤单,撤单位置:{canceled_indexes[-1]}") return True, total_data[canceled_indexes[-1]] return False, None def __compute_near_by_trade_progress_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code): # L前守护时间为3分钟 if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.L_CANCEL_UP_EXPIRE_TIME: return False, None watch_indexes = self.__get_near_by_trade_progress_indexes_cache(code) if not watch_indexes: return False, None # 监听范围小于5笔不生效 if len(watch_indexes) < 5: return False, None # 计算监听的总条数 # 权重 WATCH_INDEX_WEIGHTS = [3, 2, 1] total_count_weight = 0 for wi in range(0, len(watch_indexes)): if wi < len(WATCH_INDEX_WEIGHTS): total_count_weight += WATCH_INDEX_WEIGHTS[wi] else: total_count_weight += WATCH_INDEX_WEIGHTS[-1] # 判断撤单中是否有监听中的索引 need_compute = False for i in range(start_index, end_index + 1): data = total_data[i] val = data["val"] if L2DataUtil.is_limit_up_price_buy_cancel(val): # 查询买入位置 buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get( code)) if buy_index is not None and buy_index in watch_indexes: need_compute = True break if need_compute: watch_indexes_list = list(watch_indexes) watch_indexes_list.sort() # 计算撤单比例 canceled_count_weight = 0 canceled_indexes = [] for wi in watch_indexes: canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, wi, total_data, local_today_canceled_buyno_map.get( code)) if canceled_data: canceled_indexes.append(canceled_data["index"]) # 获取索引权重 pos_index = watch_indexes_list.index(wi) if pos_index < len(WATCH_INDEX_WEIGHTS): canceled_count_weight += WATCH_INDEX_WEIGHTS[pos_index] else: canceled_count_weight += WATCH_INDEX_WEIGHTS[-1] rate = round(canceled_count_weight / total_count_weight, 3) thresh_cancel_rate, must_buy = LCancelRateManager.get_cancel_rate(code, is_up=True) l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},L前已撤单比例:{rate}/{thresh_cancel_rate}") if rate >= thresh_cancel_rate: # 计算成交进度位置到当前下单位置的纯买额 real_place_order_index_info = self.__real_place_order_index_dict.get(code) trade_progress_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_progress_index is None: trade_progress_index = 0 if real_place_order_index_info and trade_progress_index: total_num = 0 thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) thresh_hold_money = thresh_hold_money * 3 # 阈值为2倍m值 thresh_hold_num = thresh_hold_money // (gpcode_manager.get_limit_up_price_as_num(code) * 100) for i in range(trade_progress_index + 1, real_place_order_index_info[0]): data = total_data[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_data, local_today_canceled_buyno_map.get( code)) if not canceled_data: # 没有撤单 total_num += val["num"] * data["re"] if total_num > thresh_hold_num: # 成交位到下单位还有足够的单没撤 l2_log.l_cancel_debug(code, f"L前撤阻断: 成交位-{trade_progress_index} 真实下单位-{real_place_order_index_info[0]} 阈值-{thresh_hold_money}") return False, None canceled_indexes.sort() l2_log.l_cancel_debug(code, f"L前撤单,撤单位置:{canceled_indexes[-1]}") return True, total_data[canceled_indexes[-1]] return False, None # L后重新囊括的时间 __recompute_l_down_time_dict = {} def set_big_sell_order_info(self, code, big_sell_order_info): """ 设置大卖单信息 @param code: @param big_sell_order_info: @return: """ if not big_sell_order_info or not big_sell_order_info[0] or not big_sell_order_info[1]: return False, "" total_datas = local_today_datas.get(code) # 查询是否是真的真实下单位置 trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 real_order_index_info = self.get_real_place_order_index_info(code) if real_order_index_info is None or real_order_index_info[1]: return False, "没找到真实下单位" real_order_index = real_order_index_info[0] total_deal_money = sum([x[1] * x[2] for x in big_sell_order_info[1]]) start_order_no = big_sell_order_info[1][0][3][1] # 防止分母位0 total_num = 1 # 获取正在成交的数据 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue if i == trade_index and dealing_info and str(total_datas[trade_index]["val"]["orderNo"]) == str( dealing_info[0]): # 减去当前正在成交的数据中已经成交了的数据 total_num -= dealing_info[1] // 100 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_num += val["num"] # 卖金额>=均大单才触发重新囊括 THRESHOLD_MONEY = radical_buy_data_manager.BeforeSubDealBigOrderManager().get_big_sell_order_threshold(code) if total_deal_money >= THRESHOLD_MONEY: l2_log.l_cancel_debug(code, "准备更新L后囊括(大卖单)") start_order_no = big_sell_order_info[1][-1][4][1] latest_deal_time_ms = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True) real_trade_index = None for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue real_trade_index = i break if real_trade_index is None: l2_log.l_cancel_debug(code, f"没找到真实的成交进度(大卖单):start_order_no-{start_order_no} 卖单-{big_sell_order_info}") return False, "" # 间隔1S以上才能重新囊括 if code in self.__recompute_l_down_time_dict and tool.trade_time_sub_with_ms(latest_deal_time_ms, self.__recompute_l_down_time_dict[ code]) < 1000: l2_log.s_cancel_debug(code, f"更新L后囊括(大卖单):更新间隔在1s内,{latest_deal_time_ms}-{self.__recompute_l_down_time_dict[code]}") return False, "" self.__recompute_l_down_time_dict[code] = latest_deal_time_ms # 重新囊括L后 # 撤单时间比早成交时间大就需要计算在里面 self.re_compute_l_down_watch_indexes(code, big_sell_info=( real_trade_index, latest_deal_time_ms)) l2_log.l_cancel_debug(code, f"更新L后囊括完成(大卖单):{(real_trade_index, latest_deal_time_ms)}") else: l2_log.l_cancel_debug(code, f"大卖单金额不足({THRESHOLD_MONEY})") return False, "" # L后是否还有可能撤单 def __is_l_down_can_cancel(self, code, buy_exec_index): watch_indexes_info = self.__get_watch_indexes_cache(code) if not watch_indexes_info: return True trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 if trade_index is None: return True real_place_order_index_info = self.__real_place_order_index_dict.get(code) if not real_place_order_index_info: return True # 计算已经成交的比例 total_datas = local_today_datas.get(code) total_deal_nums = 0 total_nums = 1 for index in watch_indexes_info[2]: # 不能计算L后后半段 if index > real_place_order_index_info[0]: continue data = total_datas[index] val = data["val"] left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) total_nums += val["num"] if left_count > 0 and index < trade_index: total_deal_nums += val["num"] thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code) if total_deal_nums / total_nums > 1 - thresh_hold_rate - 0.05: return False return True def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code): if buy_exec_index is None: return False, None, "尚未找到下单位置", None # 守护S撤以外的数据 if int(tool.get_now_time_str().replace(":", "")) > int("145700") and not constant.TEST: return False, None, "", None try: LCancelOutOfDateWatchIndexesManager().process(code, start_index, end_index) except Exception as e: l2_log.l_cancel_debug("L后稳定更新出错:{}", str(e)) # 下单位临近撤 can_cancel, cancel_data, cancel_type = False, None, None try: can_cancel, cancel_data = self.__compute_need_cancel(code, buy_exec_index, start_index, end_index, total_data, is_first_code) if can_cancel: cancel_type = trade_constant.CANCEL_TYPE_L_DOWN except Exception as e: logger_l2_l_cancel.exception(e) raise e extra_msg = "L后" if not can_cancel: # 成交位临近撤 try: can_cancel, cancel_data = self.__compute_near_by_trade_progress_need_cancel(code, buy_exec_index, start_index, end_index, total_data, is_first_code) if can_cancel: cancel_type = trade_constant.CANCEL_TYPE_L_UP extra_msg = "L前" except Exception as e: logger_l2_l_cancel.exception(e) raise e try: if self.__need_update_l_down_after(code, start_index, end_index): # 更新后半段 watch_indexes = self.__compute_l_down_watch_index_after_real_place_order_index(code) if watch_indexes: l2_log.l_cancel_debug(code, "L后后半段囊括:{}", watch_indexes) watch_indexes_info = self.__get_watch_indexes_cache(code) if watch_indexes_info and watch_indexes_info[2]: # 没有囊括 watch_indexes |= set(watch_indexes_info[2]) self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes) except Exception as e: l2_log.l_cancel_debug(code, "L后后半段计算出错:{}", str(e)) return can_cancel, cancel_data, extra_msg, cancel_type def place_order_success(self, code): self.clear(code) def cancel_success(self, code): self.clear(code) def test(self): code = "000333" self.__set_cancel_l_down_after_place_order_index(code, 121, 0) time.sleep(6) self.__set_cancel_l_down_after_place_order_index(code, 121, 0), time.sleep(6) self.__set_cancel_l_down_after_place_order_index(code, 122, 1) print(self.__get_cancel_l_down_after_place_order_index_dict(code)) class LCancelOutOfDateWatchIndexesManager: """ L撤过期数据更新 """ __latest_cancel_time_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LCancelOutOfDateWatchIndexesManager, cls).__new__(cls, *args, **kwargs) return cls.__instance @classmethod def compute_latest_canceled_watch_index_time(cls, code, watch_indexes): """ 获取 最近撤单的索引 @param code: @param watch_indexes: @return: """ total_datas = local_today_datas.get(code) canceled_buyno_map = local_today_canceled_buyno_map.get(code) max_cancel_index = -1 for index in watch_indexes: orderNo = total_datas[index]["val"]["orderNo"] canceled_data = canceled_buyno_map.get(str(orderNo)) if canceled_data: max_cancel_index = max(max_cancel_index, canceled_data["index"]) if max_cancel_index < 0: # 没有撤单的 return None return total_datas[max_cancel_index]["val"]["time"] @classmethod def compute_l_down_common_watch_indexes(cls, code, start_index, end_index, max_count, min_num, exclude_watch_indexes): """ 获取L后监听数据 @param code: @param re_start_index: @param end_index: @param max_count: @return: """ total_datas = local_today_datas.get(code) MIN_MONEYS = [300, 200, 100] watch_indexes = set() for min_money in MIN_MONEYS: for i in range(end_index, start_index - 1, -1): try: data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if float(val['price']) * val['num'] < min_money * 100: continue if val['num'] < min_num: continue cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if not cancel_data and i not in exclude_watch_indexes: watch_indexes.add(i) if len(watch_indexes) >= max_count: break except Exception as e: logger_l2_l_cancel.error(f"{code}: 范围: {start_index}-{end_index} 位置:{i}") logger_l2_l_cancel.exception(e) if len(watch_indexes) >= max_count: break return watch_indexes def process(self, code, start_index, end_index): # 获取监听数据 watch_indexes_info = LCancelBigNumComputer().get_l_down_watch_indexes_cache(code) if not watch_indexes_info: return if watch_indexes_info[1]: # 已经重新计算过就不再执行 return watch_indexes = set(watch_indexes_info[2]) # 获取真实下单位数据 real_place_order_index_info = LCancelBigNumComputer().get_real_place_order_index_info(code) if not real_place_order_index_info or real_place_order_index_info[1]: return real_place_order_index = real_place_order_index_info[0] total_datas = local_today_datas.get(code) # 计算最近的撤单时间 buyno_map = local_today_buyno_map.get(code) # 已经改过了就不需要修改 for i in range(start_index, end_index + 1): # 判断里面是否有撤单 data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if str(val["orderNo"]) not in buyno_map: continue buy_index = buyno_map[str(val["orderNo"])]["index"] if buy_index in watch_indexes and real_place_order_index > buy_index: # 下单位之前的L后囊括范围的订单被撤单 self.__latest_cancel_time_dict[code] = val["time"] if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[real_place_order_index]["val"]["time"]) < 10 * 60: # 下单10分钟后才能生效 return if code not in self.__latest_cancel_time_dict: return if tool.trade_time_sub(total_datas[-1]["val"]["time"], self.__latest_cancel_time_dict.get(code)) < 10 * 60: # 最近的撤单时间小于10分钟 return l2_log.l_cancel_debug(code, f"L后稳定") canceled_buyno_map = local_today_canceled_buyno_map.get(code) cancel_time_dict = {} for index in watch_indexes: if index > real_place_order_index: continue orderNo = str(total_datas[index]["val"]["orderNo"]) if orderNo not in canceled_buyno_map: continue if tool.trade_time_sub(total_datas[-1]["val"]["time"], canceled_buyno_map[orderNo]["val"]["time"]) < 10 * 60: continue # 统计撤单时间 cancel_val = canceled_buyno_map[orderNo]["val"] time_str = L2DataUtil.get_time_with_ms(cancel_val) if time_str not in cancel_time_dict: cancel_time_dict[time_str] = set() cancel_time_dict[time_str].add(index) remove_indexes = set() for t in cancel_time_dict: if len(cancel_time_dict[t]) > 1: remove_indexes |= cancel_time_dict[t] if remove_indexes: # 移除索引 add_count = len(remove_indexes) limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) min_num = int(5000 / limit_up_price) trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 add_watch_index = self.compute_l_down_common_watch_indexes(code, trade_index, real_place_order_index, add_count, min_num, watch_indexes) watch_indexes -= remove_indexes watch_indexes |= add_watch_index # 保存数据 / LCancelBigNumComputer().set_l_down_watch_index_info(code, watch_indexes_info[0], True, watch_indexes) l2_log.l_cancel_debug(code, f"L后稳定后更新监控范围:删除-{remove_indexes} 增加-{add_watch_index}") # --------------------------------H撤------------------------------- class HourCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __tradeBuyQueue = TradeBuyQueue() __SCancelBigNumComputer = SCancelBigNumComputer() # 计算触发位置 __start_compute_index_dict = {} # 成交位置 __transaction_progress_index_dict = {} # 成交位置更新时间 __transaction_progress_index_updatetime_dict = {} # 缓存 __cancel_watch_indexs_cache = {} # L撤触发的代码 __l_cancel_triggered_codes = set() __h_cancel_update_time_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) if val: val = set(val) else: val = set() CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val) keys = RedisUtils.keys(__redis, "h_cancel_transaction_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = int(val) CodeDataCacheUtil.set_cache(cls.__transaction_progress_index_dict, code, val) finally: RedisUtils.realse(__redis) # 保存成交位置到执行位置的揽括范围数据 def __save_watch_index_set(self, code, buy_single_index, indexes): trade_record_log_util.add_cancel_watch_indexes_log(code, trade_record_log_util.CancelWatchIndexesInfo( trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H, buy_single_index, list(indexes))) self.__set_watch_indexes(code, indexes) def __set_watch_indexes(self, code, indexes): CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, indexes) key = f"h_cancel_watch_indexs-{code}" RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps(list(indexes))) # 保存成交进度 def __get_watch_index_set(self, code): key = f"h_cancel_watch_indexs-{code}" val = RedisUtils.get(self.__get_redis(), key) if val is None: return None val = json.loads(val) return val def __get_watch_index_set_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code) if cache_result[0]: return cache_result[1] return set() def __save_transaction_index(self, code, index): CodeDataCacheUtil.set_cache(self.__transaction_progress_index_dict, code, index) key = f"h_cancel_transaction_index-{code}" RedisUtils.setex_async(self.__db, key, tool.get_expire(), index) def __clear_data(self, code): CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code) CodeDataCacheUtil.clear_cache(self.__transaction_progress_index_dict, code) CodeDataCacheUtil.clear_cache(self.__start_compute_index_dict, code) self.__l_cancel_triggered_codes.discard(code) ks = [f"h_cancel_watch_indexs-{code}", f"h_cancel_transaction_index-{code}"] for key in ks: RedisUtils.delete_async(self.__db, key) # 计算观察索引,倒序计算 def __compute_watch_index(self, code, buy_single_index): """ @param code: @param buy_single_index: @return: """ if buy_single_index is None: return if self.__cancel_watch_indexs_cache.get(code): return real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code) if not real_place_order_index: l2_log.h_cancel_debug(code, "尚未找到真实下单位置") return start_compute_index = self.__start_compute_index_dict.get(code) if not start_compute_index: l2_log.h_cancel_debug(code, "尚未找到开始计算位置") return transaction_index = self.__transaction_progress_index_dict.get(code) if transaction_index: # 不能计算成交进度以前的数据 start_compute_index = transaction_index + 1 # max(transaction_index + 1, start_compute_index) total_datas = local_today_datas.get(code) # h撤计算必须超过3分钟 if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[real_place_order_index]["val"]["time"]) < 180: l2_log.h_cancel_debug(code, "180s内囊括计算H撤") return # -----------------计算H上------------------- watch_indexes_up = set() for i in range(real_place_order_index - 1, start_compute_index + 1, -1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue # 小金额过滤 if float(val['price']) * val['num'] < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes_up.add(i) if len(watch_indexes_up) >= 3: # 最多取3笔 break # ------------------计算H下----------------------- limit_up_price = gpcode_manager.get_limit_up_price(code) if not limit_up_price: return # 计算结束位置 total_num = 0 # 获取m值数据 thresh_hold_money = Buy1PriceManager().get_latest_buy1_money(code) # 封单额的1/10 thresh_hold_money = thresh_hold_money / 10 thresh_hold_num = thresh_hold_money // (float(limit_up_price) * 100) end_index = real_place_order_index + 1 watch_indexes = set() for i in range(real_place_order_index + 1, total_datas[-1]["index"]): # 看是否撤单 data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if float(val['price']) * val['num'] < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes.add(i) total_num += left_count * val["num"] count = len(watch_indexes) # 最小5笔,最大10笔 if (total_num > thresh_hold_num and count >= 5) or count >= 10: break if watch_indexes or watch_indexes_up: watch_indexes |= watch_indexes_up self.__save_watch_index_set(code, buy_single_index, watch_indexes) self.__h_cancel_update_time_cache[code] = total_datas[-1]["val"]["time"] l2_log.h_cancel_debug(code, f"设置监听范围, 数据范围:{real_place_order_index}-{end_index} 监听范围-{watch_indexes}") # 设置真实下单位置 def __remove_cancel_long_time(self, code, buy_single_index): """ 删除已经撤单很久的数据 @param code: @return: """ watch_indexes = self.__get_watch_index_set_cache(code) if not watch_indexes: return if code not in self.__h_cancel_update_time_cache or tool.trade_time_sub(tool.get_now_time_str(), self.__h_cancel_update_time_cache[ code]) < 30 * 60: # 没有更新过或者更新时间小于30分钟就不更新 return watch_indexes = copy.deepcopy(watch_indexes) # 删除撤单半小时之前的数据 total_datas = local_today_datas.get(code) remove_indexes = set() for i in watch_indexes: cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if cancel_data and tool.trade_time_sub(total_datas[-1]["val"]["time"], cancel_data["val"]["time"]) > 60 * 30: # 删除撤单时间30分钟以上的数据 remove_indexes.add(i) if not remove_indexes: return real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code) if not real_place_order_index: return transaction_index = self.__transaction_progress_index_dict.get(code) if transaction_index is None: return # 起点为真实下单位置往上数3笔 start_index = real_place_order_index count = 0 for i in range(real_place_order_index - 1, transaction_index, -1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if float(val['price']) * val['num'] < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: count += 1 start_index = i if count >= 3: break watch_indexes = watch_indexes - remove_indexes # 新增加囊括 add_indexes = set() for i in range(start_index, total_datas[-1]["index"]): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if i in watch_indexes: continue if float(val['price']) * val['num'] < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: add_indexes.add(i) if len(add_indexes) >= len(remove_indexes): break watch_indexes |= add_indexes l2_log.h_cancel_debug(code, f"H撤更新:新增索引-{add_indexes} 删除索引-{remove_indexes}") self.__save_watch_index_set(code, buy_single_index, watch_indexes) # 设置更新时间, self.__h_cancel_update_time_cache[code] = total_datas[-1]["val"]["time"] def __need_compute_watch_indexes(self, code, transaction_index): """ 1.成交进度位距离真实下单位置<=5笔触发囊括 2.成交到(下单那一刻的成交进度位)到(真实下单位置)的后1/10处也要触发囊括 3.L撤无法生效触发囊括 @param code: @param transaction_index: @return: """ start_compute_index = self.__start_compute_index_dict.get(code) if start_compute_index is None: return False real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code) total_datas = local_today_datas.get(code) if real_place_order_index and real_place_order_index > transaction_index: # 成交位置离我们真实下单的位置只有5笔没撤的大单就需要计算H撤的囊括范围了 total_left_count = 0 for index in range(transaction_index + 1, real_place_order_index): data = total_datas[index] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if float(val['price']) * val['num'] < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_left_count += 1 if total_left_count > 5: break if total_left_count <= 5: return True # 成交位到开始计算位置没有买单之后 for index in range(transaction_index + 1, start_compute_index): data = total_datas[index] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if float(val['price']) * val['num'] < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: # 中间还有未撤的买单 return False return True # 设置成交进度 def set_transaction_index(self, code, buy_single_index, index): try: # 每3s或者成交进度变化就囊括一次 if index == self.__transaction_progress_index_dict.get(code): if code in self.__transaction_progress_index_updatetime_dict and time.time() - self.__transaction_progress_index_updatetime_dict.get( code) < 3: return self.__transaction_progress_index_dict[code] = index self.__transaction_progress_index_updatetime_dict[code] = time.time() if self.__need_compute_watch_indexes(code, index): self.__compute_watch_index(code, buy_single_index) except Exception as e: l2_log.h_cancel_debug(code, "设置成交进度位置出错:{}", str(e)) async_log_util.exception(logger_l2_h_cancel, e) # 设置真实下单位置 def set_real_place_order_index(self, code, index, buy_single_index): if buy_single_index is None: return try: # 计算触发位置 min_num = int(5000 / (gpcode_manager.get_limit_up_price_as_num(code))) # 统计净涨停买的数量 not_cancel_indexes = [] total_datas = local_today_datas.get(code) for j in range(buy_single_index, index): data = total_datas[j] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, j, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: not_cancel_indexes.append(j) if not_cancel_indexes: temp_count = len(not_cancel_indexes) temp_index = int(temp_count * 9 / 10) if temp_index + 1 >= temp_count: temp_index = temp_count - 1 self.__start_compute_index_dict[code] = not_cancel_indexes[temp_index] except Exception as e: async_log_util.exception(logger_l2_h_cancel, e) l2_log.h_cancel_debug(code, "设置真实下单位置出错:{}", str(e)) def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, buy_volume_index, volume_index, is_first_code): if buy_single_index is None or buy_exec_index is None: return False, "尚未找到下单位置" if int(tool.get_now_time_str().replace(":", "")) > int("145000"): return False, None # 设置成交进度 if code not in self.__transaction_progress_index_dict: return False, "没找到成交进度" watch_index_set = self.__get_watch_index_set_cache(code) if not watch_index_set: # 是否有涨停买撤 need_compute = False # 有涨停买撤才会计算位置 for i in range(start_index, end_index + 1): data = total_data[i] val = data['val'] if L2DataUtil.is_limit_up_price_buy_cancel(val): need_compute = True break if need_compute: if self.__need_compute_watch_indexes(code, self.__transaction_progress_index_dict.get(code)): self.__compute_watch_index(code, buy_single_index) watch_index_set = self.__get_watch_index_set_cache(code) if not watch_index_set: return False, "没有监听索引" l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index) if watch_index_set: cancel_num = 0 total_num = 0 for i in watch_index_set: if i is None: l2_log.h_cancel_debug(code, f"空值:{watch_index_set}") continue data = total_data[i] val = data['val'] total_num += val['num'] * data['re'] # 判断是否撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_data, local_today_canceled_buyno_map.get( code)) cancel_num += val['num'] * (data['re'] - left_count) rate = round(cancel_num / total_num, 4) threshold_rate = constant.H_CANCEL_RATE_WITH_LDOWN_CANT_INVALID if code in self.__l_cancel_triggered_codes else constant.H_CANCEL_RATE try: must_buy = gpcode_manager.MustBuyCodesManager().is_in_cache(code) if must_buy: threshold_rate = constant.H_CANCEL_RATE_WITH_MUST_BUY except Exception as e: async_log_util.error(logger_l2_h_cancel, str(e)) if rate >= threshold_rate: l2_log.h_cancel_debug(code, f"撤单比例:{rate}") return True, total_data[-1] try: self.__remove_cancel_long_time(code, buy_single_index) except Exception as e: l2_log.h_cancel_debug(code, f"更新H撤囊括范围出错:{str(e)}") return False, None # 下单成功 def place_order_success(self, code, buy_single_index, buy_exec_index, total_data): self.__clear_data(code) # 获取H撤监听的数据索引范围 # 返回监听范围与已撤单索引 def get_watch_index_dict(self, code): return {}, set() def start_compute_watch_indexes(self, code, buy_single_index): """ L后撤无法生效触发的囊括 @param code: @param buy_single_index: @return: """ self.__l_cancel_triggered_codes.add(code) self.__compute_watch_index(code, buy_single_index)