""" L2撤单策略 """ # ---------------------------------S撤------------------------------- # s级平均大单计算 # 计算范围到申报时间的那一秒 import json import logging import time import constant from cancel_strategy import s_l_h_cancel_strategy from code_attribute import big_money_num_manager, gpcode_manager, code_volumn_manager import l2_data_util from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2.l2_data_manager import OrderBeginPosInfo from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, HuaXinSellOrderStatisticManager from trade.sell.sell_rule_manager import TradeRuleManager from utils import tool from l2.transaction_progress import TradeBuyQueue from trade import trade_queue_manager, l2_trade_factor, trade_manager, trade_data_manager from l2 import l2_log, l2_data_source_util, code_price_manager from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas, local_today_buyno_map, \ local_today_canceled_buyno_map from log_module.log import logger_buy_1_volumn, logger_l2_error from utils.tool import CodeDataCacheUtil from settings import trade_setting def set_real_place_position(code, index, buy_single_index=None, is_default=True): # DCancelBigNumComputer().set_real_order_index(code, index) s_l_h_cancel_strategy.SCancelBigNumComputer().set_real_place_order_index(code, index, is_default) s_l_h_cancel_strategy.LCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index=buy_single_index, is_default=is_default) s_l_h_cancel_strategy.HourCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index) NewGCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default) FCancelBigNumComputer().set_real_order_index(code, index, is_default) JCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default) NBCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default) RDCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default) class BaseCancel: def set_real_place_order_index(self, code, index, buy_single_index, is_default): pass # 撤单成功 def cancel_success(self, code): pass # 下单成功 def place_order_success(self, code): pass class L2DataComputeUtil: """ L2数据计算帮助类 """ @classmethod def compute_left_buy_order(cls, code, start_index, end_index, limit_up_price, min_money=500000): """ 计算剩下的委托买单 @param code: 代码 @param start_index:起始索引(包含) @param end_index: 结束索引(包含) @param limit_up_price: 涨停价 @param min_money: 最小的资金 @return:笔数,手数 """ min_volume = min_money // int(limit_up_price * 100) total_datas = local_today_datas.get(code) canceled_buyno_map = local_today_canceled_buyno_map.get(code) total_count = 0 total_num = 0 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val['num'] < min_volume: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, canceled_buyno_map) if left_count > 0: total_count += 1 total_num += val["num"] return total_count, total_num @classmethod def compute_end_index(cls, code, start_index, end_index, limit_up_price, max_count, min_money=500000): """ 计算结束索引 @param code: 代码 @param start_index:起始索引(包含) @param end_index: 结束索引(包含) @param limit_up_price: 涨停价 @param max_count: 最大数量 @param min_money: 最小的资金 @return:结束索引 """ min_volume = min_money // int(limit_up_price * 100) total_datas = local_today_datas.get(code) canceled_buyno_map = local_today_canceled_buyno_map.get(code) total_count = 0 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val['num'] < min_volume: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, canceled_buyno_map) if left_count > 0: total_count += 1 if total_count >= max_count: return i return end_index @classmethod def compute_max_buy_order_info(cls, code, start_index, end_index): """ 计算区间最大买单 @param code: @param start_index: @param end_index: @return: """ total_datas = local_today_datas.get(code) canceled_buyno_map = local_today_canceled_buyno_map.get(code) max_info = (0, 0) # 索引,手数 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, canceled_buyno_map) if left_count > 0: if val["num"] > max_info[1]: max_info = (i, val["num"]) return total_datas[max_info[0]] @classmethod def is_canceled(cls, code, index, total_datas, canceled_buyno_map, trade_index, deal_order_nos): """ 是否已经撤单 @param deal_order_nos: 成交大单集合 @param trade_index: 成交进度位 @param index: 索引 @param code: 代码 @param total_datas: @param canceled_buyno_map:撤单的订单号 @return: """ cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index, total_datas, canceled_buyno_map) if cancel_data: # 已经撤单 return True else: if trade_index and trade_index > index: # 成交进度大于索引位置,且还没成交 if total_datas[index]["val"]["orderNo"] not in deal_order_nos: return True return False # ---------------------------------D撤------------------------------- # 计算 成交位->真实下单位置 总共还剩下多少手没有撤单 # 成交位变化之后重新计算 class DCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __instance = None # 下单位之后的封单中的大单 __follow_big_order_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: pass finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() # 是否有撤买的规则 def has_auto_cancel_rules(self, code): rules = self.get_auto_cancel_rules(code) return True if rules else False def get_auto_cancel_rules(self, code): try: rules = TradeRuleManager().list_can_excut_rules_cache([TradeRuleManager.TYPE_BUY_CANCEL], code) return rules except: return None def need_cancel(self, code, buy1_volume): rules = self.get_auto_cancel_rules(code) if rules: for r in rules: if r.buy1_volume >= buy1_volume: # 量低于 return True, r.id_ return False, None def compute_d_cancel_watch_index(self, code): # 计算D撤囊括范围 # real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code) pass def __clear_data(self, code): if code in self.__follow_big_order_cache: self.__follow_big_order_cache.pop(code) def place_order_success(self, code, buy_single_index, buy_exec_index): self.__clear_data(code) # ---------------------------------RD撤------------------------------- # 扫入下单大单撤 class RDCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __instance = None # 下单位之后的封单中的大单 __watch_indexes_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(RDCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): keys = RedisUtils.keys(cls.__get_redis(), "radical_big_order_watch-*") for k in keys: code = k.split("-")[1] val = RedisUtils.get(cls.__get_redis(), k) val = json.loads(val) cls.__watch_indexes_cache[code] = set(val) def set_watch_indexes(self, code, indexes): l2_log.d_cancel_debug(code, f"扫入大单监听:{indexes}") self.__watch_indexes_cache[code] = set(indexes) RedisUtils.setex_async(self.__db, f"radical_big_order_watch-{code}", tool.get_expire(), json.dumps(list(indexes))) def set_real_place_order_index(self, code, index, buy_single_index, is_default): if is_default: return if code in self.__watch_indexes_cache and len(self.__watch_indexes_cache[code]) > 1: # 囊括的单大于1个 return # 从买入信号位开始囊括 limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) min_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) min_num = int(round(min_money / limit_up_price / 100)) total_datas = local_today_datas.get(code) watch_indexes = set() for i in range(buy_single_index, index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < min_num: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes.add(i) if watch_indexes: l2_log.d_cancel_debug(code, f"更新扫入大单监听:{watch_indexes}") self.__watch_indexes_cache[code] = watch_indexes def need_cancel(self, code, start_index, end_index): """ 是否需要撤单 @param code: @param start_index: @param end_index: @return: 是否需要撤单,撤单索引, 消息 """ buyno_map = local_today_buyno_map.get(code) watch_indexes = self.__watch_indexes_cache.get(code) if not watch_indexes: return False, None, "无大单监听" total_datas = local_today_datas.get(code) need_compute = False for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if val["num"] * float(val["price"]) < 5000: continue buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, buyno_map) if buy_index is None: continue if buy_index in watch_indexes: need_compute = True break cancel_indexes = set() canceled_buyno_map = local_today_canceled_buyno_map.get(code) if need_compute or True: cancel_count = 0 cancel_data = None # 成交买单号 deal_order_nos = HuaXinBuyOrderManager().get_deal_buy_order_nos(code) if deal_order_nos is None: deal_order_nos = set() trade_index, is_default = TradeBuyQueue().get_traded_index(code) if is_default: trade_index = None for index in watch_indexes: if L2DataComputeUtil.is_canceled(code, index, total_datas, canceled_buyno_map, trade_index, deal_order_nos): # 买单已撤单 cancel_count += 1 cancel_indexes.add(index) rate = round(cancel_count / len(watch_indexes), 2) if rate >= 0.8: return True, cancel_data, f"撤单比例-{rate},大单撤单({cancel_indexes})" return False, None, "无大单撤单" def __clear_data(self, code): if code in self.__watch_indexes_cache: self.__watch_indexes_cache.pop(code) RedisUtils.delete_async(self.__db, f"radical_big_order_watch-{code}") def clear_data(self, code): self.__clear_data(code) # 新F撤,根据成交数据来撤 class FCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __real_order_index_cache = {} __max_buy_order_num_cache = {} # 下单距离太远计算 __far_away_computed_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(FCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "f_cancel_real_order_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) CodeDataCacheUtil.set_cache(cls.__real_order_index_cache, code, val) finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def __set_real_order_index(self, code, index, is_default): CodeDataCacheUtil.set_cache(self.__real_order_index_cache, code, (index, is_default)) RedisUtils.setex_async(self.__db, f"f_cancel_real_order_index-{code}", tool.get_expire(), json.dumps((index, is_default))) def __del_real_order_index(self, code): CodeDataCacheUtil.clear_cache(self.__real_order_index_cache, code) RedisUtils.delete_async(self.__db, f"f_cancel_real_order_index-{code}") def __get_real_order_index(self, code): val = RedisUtils.get(self.__db, f"f_cancel_real_order_index-{code}") if val: val = json.loads(val) return val[0] return None def __get_real_order_index_cache(self, code): cache_result = CodeDataCacheUtil.get_cache(self.__real_order_index_cache, code) if cache_result[0]: return cache_result[1] return None def clear(self, code=None): if code: self.__del_real_order_index(code) if code in self.__max_buy_order_num_cache: self.__max_buy_order_num_cache.pop(code) else: keys = RedisUtils.keys(self.__get_redis(), "f_cancel_real_order_index-*") if keys: for k in keys: code = k.split("-")[1] self.__del_real_order_index(code) # 设置真实的下单位置 def set_real_order_index(self, code, index, is_default): self.__set_real_order_index(code, index, is_default) self.__far_away_computed_cache[code] = False # if not is_default and code.find("60") == 0: try: # 统计未成交的最大单 trade_index = 0 trade_info = TradeBuyQueue().get_traded_index(code) if trade_info and not trade_info[1] and trade_info[0] is not None: trade_index = trade_info[0] data = L2DataComputeUtil.compute_max_buy_order_info(code, trade_index, index) # 保存最大单 self.__max_buy_order_num_cache[code] = data["val"]["num"] l2_log.f_cancel_debug(code, f"最大单计算:索引-{data['index']} 范围:{trade_index}-{index}") except Exception as e: logger_l2_error.exception(e) def place_order_success(self, code): self.clear(code) def cancel_success(self, code): self.clear(code) # 撤单成功之后就清除当前的成交 HuaXinSellOrderStatisticManager.clear_latest_deal_volume(code) def __get_fast_deal_threshold_value(self, code, place_order_time_str): """ 获取 F撤阈值 @param code: @param place_order_time_str:下单时间 @return:(金额(单位:W),笔数) """ max60, yesterday = code_volumn_manager.CodeVolumeManager().get_histry_volumn(code) if max60: money_y = code_volumn_manager.CodeVolumeManager().get_reference_volume_as_money_y(code) money = int(200 * money_y + 280) if tool.trade_time_sub(place_order_time_str, "10:00:00") <= 0: # 10点前下单打7折 money = int(money * 0.7) count = int(money_y * 10) // 10 + 3 return money, count # 默认值 return 300, 3 # 下单3分钟内距离下单位置不足3笔(包含正在成交)且不到300w就需要撤单 def need_cancel_for_deal_fast(self, code, trade_index=None): if trade_index is None: trade_info = TradeBuyQueue().get_traded_index(code) if trade_info and not trade_info[1] and trade_info[0] is not None: trade_index = trade_info[0] if trade_index is None: return False, "没获取到成交进度位" # 判断是否具有真实的下单位置 real_order_index_info = self.__get_real_order_index_cache(code) if not real_order_index_info: return False, "没找到真实下单位" if real_order_index_info[1]: return False, "真实下单位为默认" if real_order_index_info[0] <= trade_index: return False, "真实下单位在成交位之前" real_order_index = real_order_index_info[0] # 统计未撤订单的数量与金额 total_datas = local_today_datas.get(code) # 是否是下单1分钟内 if tool.trade_time_sub(tool.get_now_time_str(), total_datas[real_order_index]['val']['time']) > 1 * 60: return False, "下单超过60s" try: if code in self.__max_buy_order_num_cache: max_num = self.__max_buy_order_num_cache[code] if max_num and tool.is_ge_code(code): max_num = max_num * 6.6 details = HuaXinSellOrderStatisticManager.get_latest_3s_continue_deal_volumes(code) threshold_num = int(max_num * 0.5 * 100) count = 0 for d in details: if d[1] > threshold_num: count += 1 if count >= 2: return True, f"连续3秒有2s抛压过大:{details} 最大值:{max_num}" except Exception as e: l2_log.f_cancel_debug(code, f"大抛压撤单计算出错:{str(e)}") # 查询最近2秒成交是否超过阈值 THRESHOLD_MONEY_W, THRESHOLD_COUNT = self.__get_fast_deal_threshold_value(code, total_datas[real_order_index]['val'][ 'time']) total_left_count = 0 total_left_num = 0 # 成交位到真实下单位剩余的未成交的单 for i in range(trade_index, real_order_index_info[0]): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_left_count += left_count total_left_num += val["num"] * left_count if total_left_count > THRESHOLD_COUNT: break if trade_index == i: dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) if dealing_info and str(dealing_info[0]) == str(val["orderNo"]): total_left_num -= dealing_info[1] // 100 limit_up_price = gpcode_manager.get_limit_up_price(code) total_left_money = total_left_num * float(limit_up_price) * 100 if total_left_money < 1e8: if total_left_count <= 1 or ( total_left_count <= THRESHOLD_COUNT and limit_up_price and total_left_num * float( limit_up_price) < THRESHOLD_MONEY_W * 100): return True, f"剩余笔数({total_left_count})/金额({round(total_left_num * float(limit_up_price) * 100)})不足,成交进度:{trade_index},真实下单位置:{real_order_index} 阈值:({THRESHOLD_MONEY_W},{THRESHOLD_COUNT}) " return False, f"不满足撤单条件: 成交进度-{trade_index} 真实下单位置-{real_order_index} total_left_count-{total_left_count} total_left_num-{total_left_num}" # 距离太近,封单不足 def need_cancel_for_p(self, code, order_begin_pos): if True: return False, "" if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return False, "已加红" if not order_begin_pos or not order_begin_pos.buy_exec_index or order_begin_pos.buy_exec_index < 0: return False, "尚未下单" # 判断是否具有真实的下单位置 real_order_index_info = self.__get_real_order_index_cache(code) if not real_order_index_info: return False, "没找到真实下单位" if real_order_index_info[1]: return False, "真实下单位为默认" trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 real_order_index = real_order_index_info[0] if real_order_index <= trade_index: return False, "真实下单位在成交位之前" total_datas = local_today_datas.get(code) # 是否是下单3分钟内 sub_time = tool.trade_time_sub(total_datas[-1]['val']['time'], total_datas[real_order_index]['val']['time']) if sub_time > 60: return False, "下单超过60s" limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) # 判断累计大单数量 min_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) total_count, total_num = L2DataComputeUtil.compute_left_buy_order(code, trade_index, real_order_index, limit_up_price, min_money) # 下单后3秒,排撤比例≥65%则撤掉,视为P撤的一种,排得太后了。 # if sub_time > 3 and not self.__far_away_computed_cache.get(code): # self.__far_away_computed_cache[code] = True # # 成交进度位到真实下单位的位置过远 # total_count_, total_num_ = L2DataComputeUtil.compute_left_buy_order(code, trade_index, real_order_index, # limit_up_price, 500000) # # 获取买1金额 # buy1_money = code_price_manager.Buy1PriceManager().get_latest_buy1_money(code) # if buy1_money: # if total_num_ * limit_up_price * 100 > buy1_money * 0.65: # return True, f"P撤:成交位置距离下单位置太远 成交位-{trade_index} 下单位-{real_order_index} 买1-{buy1_money}" min_time_s, max_time_s = 2, 60 # if total_num * limit_up_price >= 299 * 100: # min_time_s, max_time_s = 30, 60 if sub_time <= min_time_s: return False, f"下单在{min_time_s}s内" if sub_time > max_time_s: return False, f"下单超过{max_time_s}s" # 计算我们后面的大单与涨停纯买额 total_left_num = 0 total_big_num_count = 0 canceled_buyno_map = local_today_canceled_buyno_map.get(code) for i in range(real_order_index + 1, len(total_datas)): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue money = val["num"] * float(val["price"]) * 100 if money < 500000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, canceled_buyno_map) if left_count > 0: if money >= min_money: total_big_num_count += 1 total_left_num += val["num"] left_money = total_left_num * float(limit_up_price) if left_money < 1000 * 100 or total_big_num_count < 2: # 实际下单位后方所有涨停纯买额≤1000万 return True, f"P撤:封单纯买额-{round(left_money / 100, 1)}万 剩余大单数量-{total_big_num_count} 下单位-{real_order_index}" return False, "不满足撤单条件" # w撤 def need_cancel_for_w(self, code): real_order_index_info = self.__get_real_order_index_cache(code) if not real_order_index_info: return False, "没找到真实下单位" if real_order_index_info[1]: return False, "真实下单位为默认" place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if place_order_count > 1: return False, "不是初次下单" trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: return False, "没获取到成交进度位" real_order_index = real_order_index_info[0] total_datas = local_today_datas.get(code) if tool.trade_time_sub(total_datas[-1]['val']['time'], total_datas[real_order_index]['val']['time']) > 60: return False, "超过守护时间" limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) end_index = L2DataComputeUtil.compute_end_index(code, real_order_index + 1, total_datas[-1]["index"], limit_up_price, 10) # 从成交进度位到截至位置计算大单 min_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) left_count, left_money = L2DataComputeUtil.compute_left_buy_order(code, trade_index, end_index, limit_up_price, min_money=min_money) if left_count < 1: return True, f"范围:{trade_index}-{end_index} 大单数量:{left_count}" return False, "大单数量够" # ---------------------------------G撤------------------------------- # 已不效 class GCancelBigNumComputer: __real_place_order_index_dict = {} __trade_progress_index_dict = {} __watch_indexes_dict = {} __watch_indexes_by_dict = {} __expire_time_dict = {} # 最新处理的卖单 __latest_process_sell_order_no_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(GCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) return cls.__instance def set_real_place_order_index(self, code, index, buy_single_index, is_default): self.__real_place_order_index_dict[code] = (index, is_default) start_index = buy_single_index if code in self.__trade_progress_index_dict: start_index = self.__trade_progress_index_dict.get(code) total_datas = local_today_datas.get(code) # 设置生效截至时间 self.__reset_expire_time(code, total_datas[index]['val']['time']) self.__commpute_watch_indexes(code, start_index, (index, is_default), from_real_order_index_changed=True) # 重新设置G撤守护时间 def __reset_expire_time(self, code, now_time): self.__expire_time_dict[code] = tool.trade_time_add_second(now_time, 9) # 大有单卖 # 暂时不启用 def set_big_sell_order_info(self, code, big_sell_order_info): money = big_sell_order_info[0] if money < 50 * 10000: # 无大卖单 return # 获取真实下单位置 real_place_order_info = self.__real_place_order_index_dict.get(code) if not real_place_order_info: # 没有真实下单位置 return if real_place_order_info[1]: # 不能是默认下单位置 return trade_index = self.__trade_progress_index_dict.get(code) if trade_index is None: trade_index = 0 # 下单9s之后才会重新触发 total_datas = local_today_datas.get(code) if tool.trade_time_sub(total_datas[-1]['val']['time'], total_datas[real_place_order_info[0]]['val']['time']) <= 9: return # 不重复处理同一卖单号 if self.__latest_process_sell_order_no_dict.get(code) == big_sell_order_info[1][-1][0]: return # 查找真实成交位置 real_order_index = real_place_order_info[0] start_order_no = big_sell_order_info[1][-1][4][1] real_trade_index = trade_index for i in range(trade_index, real_order_index): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if int(val['orderNo']) < start_order_no: continue real_trade_index = i break l2_log.g_cancel_debug(code, f"大单卖触发重新囊括,真实成交位置:{real_trade_index},卖信息:{big_sell_order_info}") self.__commpute_watch_indexes(code, real_trade_index, real_place_order_info, recompute=True) self.__reset_expire_time(code, total_datas[real_trade_index]['val']['time']) self.__latest_process_sell_order_no_dict[code] = big_sell_order_info[1][-1][0] def clear(self, code=None): if code: if code in self.__expire_time_dict: self.__expire_time_dict.pop(code) if code in self.__real_place_order_index_dict: self.__real_place_order_index_dict.pop(code) if code in self.__watch_indexes_dict: self.__watch_indexes_dict.pop(code) if code in self.__watch_indexes_by_dict: self.__watch_indexes_by_dict.pop(code) if code in self.__trade_progress_index_dict: self.__trade_progress_index_dict.pop(code) else: self.__real_place_order_index_dict.clear() self.__watch_indexes_dict.clear() self.__trade_progress_index_dict.clear() self.__watch_indexes_by_dict.clear() # recompute:是否重新计算 def __commpute_watch_indexes(self, code, traded_index, real_order_index_info, from_real_order_index_changed=False, recompute=False): if traded_index is None or real_order_index_info is None: return real_order_index, is_default = real_order_index_info[0], real_order_index_info[1] origin_watch_index = self.__watch_indexes_dict.get(code) if origin_watch_index is None: origin_watch_index = set() # 不需要备用 origin_watch_index_by = self.__watch_indexes_by_dict.get(code) if origin_watch_index_by is None: origin_watch_index_by = set() # 重新计算需要清除之前的数据 if recompute: origin_watch_index.clear() origin_watch_index_by.clear() start_index = traded_index if traded_index in origin_watch_index or traded_index in origin_watch_index_by: # 正在成交的是已经囊括了的大单 start_index = traded_index + 1 total_datas = local_today_datas.get(code) watch_indexes = set() # G撤的囊括范围向后面延申5笔 end_index = real_order_index count = 0 for i in range(real_order_index + 1, total_datas[-1]["index"] + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count <= 0: continue count += 1 if count > 5: break end_index = i for i in range(start_index, end_index + 1): # 判断是否有未撤的大单 data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 29900 and val["num"] < 7999: continue # 是否已撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes.add(i) if watch_indexes: # 还有300万以上的大单没有撤单 if from_real_order_index_changed or recompute: # 真实下单位改变后才会更新 final_watch_indexes = origin_watch_index | watch_indexes self.__watch_indexes_dict[code] = final_watch_indexes l2_log.g_cancel_debug(code, f"大单监听:{final_watch_indexes} 是否重新计算:{recompute} 计算范围:{start_index}-{end_index}") # 有大单监听,需要移除之前的小单监听 if code in self.__watch_indexes_by_dict: self.__watch_indexes_by_dict[code].clear() else: l2_log.g_cancel_debug(code, f"没有大单监听,开始计算小单:{start_index}-{real_order_index}, 是否重新计算:{recompute}") # 没有300万以上的大单了,计算备用 # 只有备用单成交了或者没有备用单,才会再次寻找备用单 need_find_by = False if not origin_watch_index_by: need_find_by = True else: # 备用单是否成交 need_find_by = True for i in origin_watch_index_by: if i >= start_index: # 只要有一个还没成交就不需要重新计算 need_find_by = False break if need_find_by and (not is_default or not watch_indexes): l2_log.g_cancel_debug(code, f"启动小单备用监听:{start_index}-{real_order_index}") temp_list = [] for i in range(start_index, real_order_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue # 不能囊括已撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count <= 0: continue if val["num"] * float(val["price"]) < 5000: continue temp_list.append((val["num"], data)) if len(temp_list) > 15: break temp_list.sort(key=lambda x: x[0], reverse=True) # 次大单的笔数1-3笔 THRESH_COUNT = len(temp_list) // 5 if THRESH_COUNT < 1: THRESH_COUNT = 1 if temp_list: for i in range(THRESH_COUNT): l2_log.g_cancel_debug(code, f"小单备用监听位置:{temp_list[i][1]['index']}") watch_indexes.add(temp_list[i][1]["index"]) self.__watch_indexes_by_dict[code] = watch_indexes def set_trade_progress(self, code, buy_single_index, index): if self.__trade_progress_index_dict.get(code) != index: self.__trade_progress_index_dict[code] = index self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code)) def need_cancel(self, code, buy_exec_index, start_index, end_index): if 1 > 0: return False, None, "G撤被注释掉" if code not in self.__real_place_order_index_dict: return False, None, "没有找到真实下单位" expire_time = self.__expire_time_dict.get(code) if not expire_time: return False, None, "尚未设置生效时间" real_place_order_index, is_default = self.__real_place_order_index_dict.get(code) total_datas = local_today_datas.get(code) if tool.trade_time_sub(total_datas[end_index]["val"]["time"], total_datas[buy_exec_index]['val']['time']) > 180: return False, None, "超过180s的生效时间" # 30s内有效 if tool.trade_time_sub(total_datas[end_index]["val"]["time"], expire_time) > 0: return False, None, "超过生效时间" watch_indexes = self.__watch_indexes_dict.get(code) if watch_indexes is None: watch_indexes = set() watch_indexes_by = self.__watch_indexes_by_dict.get(code) if watch_indexes_by is None: watch_indexes_by = set() need_compute = False need_compute_by = False for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if val["num"] * float(val["price"]) < 5000: continue buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get( code)) if buy_index is not None and buy_index < real_place_order_index and ( buy_index in watch_indexes or buy_index in watch_indexes_by): if buy_index in watch_indexes_by: need_compute_by = True break # 备用撤单,直接撤 # return True, data, f"次大单撤:{buy_index}" elif buy_index in watch_indexes: # 大单撤需要重新计算大单撤单比例 need_compute = True break if need_compute and watch_indexes: canceled_indexes = set() for index in watch_indexes: cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) if cancel_data: canceled_indexes.add(cancel_data["index"]) cancel_rate = round(len(canceled_indexes) / len(watch_indexes), 2) threshhold_rate = constant.G_CANCEL_RATE situation = trade_setting.MarketSituationManager().get_situation_cache() if situation == trade_setting.MarketSituationManager.SITUATION_GOOD: threshhold_rate = constant.G_CANCEL_RATE_FOR_GOOD_MARKET if cancel_rate > threshhold_rate: canceled_indexes_list = list(canceled_indexes) canceled_indexes_list.sort() return True, total_datas[canceled_indexes_list[-1]], f"撤单比例:{cancel_rate}" # 计算备用 if need_compute_by and watch_indexes_by: canceled_indexes = set() for index in watch_indexes_by: cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) if cancel_data: canceled_indexes.add(cancel_data["index"]) cancel_rate = round(len(canceled_indexes) / len(watch_indexes_by), 2) threshhold_rate = constant.G_CANCEL_RATE situation = trade_setting.MarketSituationManager().get_situation_cache() if situation == trade_setting.MarketSituationManager.SITUATION_GOOD: threshhold_rate = constant.G_CANCEL_RATE_FOR_GOOD_MARKET if cancel_rate > threshhold_rate: canceled_indexes_list = list(canceled_indexes) canceled_indexes_list.sort() return True, total_datas[canceled_indexes_list[-1]], f"次大单撤单比例:{cancel_rate}" return False, None, "" # B撤单 # 剩余一个大单撤半截就撤单 def need_cancel_for_b(self, code, start_index, end_index): if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return False, None, "已加红" real_place_order_info = self.__real_place_order_index_dict.get(code) if not real_place_order_info or real_place_order_info[1]: # 没有真实下单位置 return False, None, "没有真实下单位置" trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: return False, None, "未获取到成交进度位" total_datas = local_today_datas.get(code) buyno_map = local_today_buyno_map.get(code) cancel_half_index = None for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue orderNo = val['orderNo'] buy_data = buyno_map.get(f"{orderNo}") if not buy_data: continue # 判断大单 if buy_data["val"]["num"] * float(buy_data["val"]["price"]) < 29900: continue # 判断是否撤半截 if buy_data["val"]["num"] > val["num"]: cancel_half_index = i break if cancel_half_index is None: return False, None, "没有撤半截" # 有大单撤半截桩 # 计算是否还剩下大单 total_big_num_count = 0 for i in range(trade_index, real_place_order_info[0]): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 29900: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_big_num_count += 1 break if total_big_num_count == 0: # 没有剩下大单 return True, total_datas[cancel_half_index], f"B撤:撤单索引-{cancel_half_index}" return False, None, "还有大单没撤单" def place_order_success(self, code): self.clear(code) def cancel_success(self, code): self.clear(code) # ---------------------------------新G撤------------------------------- class NewGCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __real_place_order_index_dict = {} __trade_progress_index_dict = {} __watch_indexes_dict = {} # 最新处理的卖单 __latest_process_sell_order_no_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(NewGCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "g_cancel_watch_index_info-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) if val: val = json.loads(val) val = set(val) CodeDataCacheUtil.set_cache(cls.__watch_indexes_dict, code, val) finally: RedisUtils.realse(__redis) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def __set_watch_index(self, code, indexes): CodeDataCacheUtil.set_cache(self.__watch_indexes_dict, code, indexes) RedisUtils.setex_async(self.__db, f"g_cancel_watch_index_info-{code}", tool.get_expire(), json.dumps(list(indexes))) def set_real_place_order_index(self, code, index, buy_single_index, is_default): self.__real_place_order_index_dict[code] = (index, is_default) start_index = buy_single_index if not is_default: trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: start_index = 0 else: start_index = trade_index self.__commpute_watch_indexes(code, start_index, (index, is_default), from_real_order_index_changed=True) def clear(self, code=None): if code: if code in self.__real_place_order_index_dict: self.__real_place_order_index_dict.pop(code) if code in self.__watch_indexes_dict: self.__watch_indexes_dict.pop(code) RedisUtils.delete_async(self.__db, f"g_cancel_watch_index_info-{code}") if code in self.__trade_progress_index_dict: self.__trade_progress_index_dict.pop(code) else: self.__real_place_order_index_dict.clear() self.__watch_indexes_dict.clear() self.__trade_progress_index_dict.clear() keys = RedisUtils.keys(self.__get_redis(), f"g_cancel_watch_index_info-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) # recompute:是否重新计算 def __commpute_watch_indexes(self, code, traded_index, real_order_index_info, from_real_order_index_changed=False, recompute=False): if traded_index is None or real_order_index_info is None: return real_order_index, is_default = real_order_index_info[0], real_order_index_info[1] origin_watch_index = self.__watch_indexes_dict.get(code) if origin_watch_index is None: origin_watch_index = set() # 重新计算需要清除之前的数据 if recompute: origin_watch_index.clear() start_index = traded_index if traded_index in origin_watch_index: # 正在成交的是已经囊括了的大单 start_index = traded_index + 1 total_datas = local_today_datas.get(code) # 查找成交进度位到真实下单位置所有的索引 watch_indexes_info = [] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) big_num = int(l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) / (limit_up_price * 100)) for i in range(start_index, real_order_index + 1): # 判断是否有未撤的大单 data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] < big_num: continue # 是否已撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: watch_indexes_info.append((i, val["num"])) # 当所有大单≤6笔时则G撤全部囊括 # 大单>6笔时囊括其三分之一 if len(watch_indexes_info) > 6: watch_indexes = set([x[0] for x in watch_indexes_info[:int(round(len(watch_indexes_info) / 3, 0))]]) # 找出最大单 max_info = watch_indexes_info[0] for mi in watch_indexes_info: if mi[1] > max_info[1]: max_info = mi watch_indexes.add(max_info[0]) else: watch_indexes = set([x[0] for x in watch_indexes_info]) if watch_indexes: # 还有300万以上的大单没有撤单 if from_real_order_index_changed or recompute: # 真实下单位改变后才会更新 final_watch_indexes = origin_watch_index | watch_indexes self.__set_watch_index(code, final_watch_indexes) l2_log.g_cancel_debug(code, f"大单监听:{final_watch_indexes} 是否重新计算:{recompute} 计算范围:{start_index}-{real_order_index}") def set_trade_progress(self, code, buy_single_index, index): # if self.__trade_progress_index_dict.get(code) != index: self.__trade_progress_index_dict[code] = index # self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code)) def need_cancel(self, code, buy_exec_index, start_index, end_index): if code not in self.__real_place_order_index_dict: return False, None, "没有找到真实下单位" real_place_order_index, is_default = self.__real_place_order_index_dict.get(code) total_datas = local_today_datas.get(code) # if tool.trade_time_sub(total_datas[end_index]["val"]["time"], total_datas[buy_exec_index]['val']['time']) > 180: # return False, None, "超过180s的生效时间" watch_indexes = self.__watch_indexes_dict.get(code) if watch_indexes is None: watch_indexes = set() if len(watch_indexes) < 3: return False, None, "大于等于3个大单生效" need_compute = False for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if val["num"] * float(val["price"]) < 5000: continue buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get( code)) if buy_index is not None and buy_index < real_place_order_index and (buy_index in watch_indexes): if buy_index in watch_indexes: # 大单撤需要重新计算大单撤单比例 need_compute = True break if need_compute and watch_indexes: canceled_indexes = set() for index in watch_indexes: cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index, total_datas, local_today_canceled_buyno_map.get( code)) if cancel_data: canceled_indexes.add(cancel_data["index"]) cancel_rate = round(len(canceled_indexes) / len(watch_indexes), 2) threshhold_rate = constant.G_CANCEL_RATE situation = trade_setting.MarketSituationManager().get_situation_cache() if situation == trade_setting.MarketSituationManager.SITUATION_GOOD: threshhold_rate = constant.G_CANCEL_RATE_FOR_GOOD_MARKET if gpcode_manager.MustBuyCodesManager().is_in_cache(code): threshhold_rate = constant.G_CANCEL_RATE_WITH_MUST_BUY if cancel_rate > threshhold_rate: canceled_indexes_list = list(canceled_indexes) canceled_indexes_list.sort() return True, total_datas[canceled_indexes_list[-1]], f"撤单比例:{cancel_rate}" return False, None, "" def place_order_success(self, code): self.clear(code) def cancel_success(self, code): self.clear(code) # B撤单 # 剩余一个大单撤半截就撤单 def need_cancel_for_b(self, code, start_index, end_index): real_place_order_info = self.__real_place_order_index_dict.get(code) if not real_place_order_info or real_place_order_info[1]: # 没有真实下单位置 return False, None, "没有真实下单位置" trade_index, is_default = TradeBuyQueue().get_traded_index(code) if trade_index is None: return False, None, "未获取到成交进度位" total_datas = local_today_datas.get(code) buyno_map = local_today_buyno_map.get(code) cancel_half_index = None for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue orderNo = val['orderNo'] buy_data = buyno_map.get(f"{orderNo}") if not buy_data: continue # 判断大单 if buy_data["val"]["num"] * float(buy_data["val"]["price"]) < 29900: continue # 判断是否撤半截 if buy_data["val"]["num"] > val["num"]: cancel_half_index = i break if cancel_half_index is None: return False, None, "没有撤半截" # 有大单撤半截桩 # 计算是否还剩下大单 total_big_num_count = 0 for i in range(trade_index, real_place_order_info[0]): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 29900: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_big_num_count += 1 break if total_big_num_count == 0: # 没有剩下大单 return True, total_datas[cancel_half_index], f"B撤:撤单索引-{cancel_half_index}" return False, None, "还有大单没撤单" def test(self): self.__set_watch_index("000333", {1, 2, 3, 4, 5, 6, 7}) # ---------------------------------独苗撤------------------------------- class UCancelBigNumComputer: __db = 0 __redis_manager = redis_manager.RedisManager(0) __cancel_real_order_index_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(UCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): pass @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() # 是否可以撤单 def need_cancel(self, code, transaction_index, order_begin_pos: OrderBeginPosInfo, current_limit_up_block_codes_dict, volume_rate): if not order_begin_pos or not order_begin_pos.buy_exec_index or order_begin_pos.buy_exec_index < 0: return False, "尚未下单" if not current_limit_up_block_codes_dict: return False, "涨停列表无数据" # 获取涨停原因 block = None for b in current_limit_up_block_codes_dict: if code in current_limit_up_block_codes_dict[b]: block = b break if not block: return False, "没有找到代码的涨停原因" if len(current_limit_up_block_codes_dict[block]) > 1: return False, "有后排无需撤单" total_datas = local_today_datas.get(code) time_sub = tool.trade_time_sub(tool.get_now_time_str(), total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) if 2 < time_sub < 30 * 60: real_place_order_index = s_l_h_cancel_strategy.SCancelBigNumComputer().get_real_place_order_index_cache( code) if not real_place_order_index: return False, "尚未找到真实下单位置" total_left_count = 0 for i in range(transaction_index, real_place_order_index): data = total_datas[i] val = data["val"] if float(val['price']) * val['num'] < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, local_today_canceled_buyno_map.get( code)) if left_count > 0: total_left_count += 1 if total_left_count > 5: break # 成交进度变化 # if len(current_limit_up_block_codes_dict[block]) == 1 and volume_rate < 0.6 and total_left_count <= 5: # return True, "独苗下单30分钟无后排且成交位离我们很近且量小于60%" # if time_sub > 10 * 60: # if len(current_limit_up_block_codes_dict[block]) == 1 and volume_rate < 0.7: # return True, f"独苗下单10分钟无后排且量({volume_rate})小于70%" return False, "不需要撤单" # --------------------------------封单额变化撤------------------------ # 涨停封单额统计 class L2LimitUpMoneyStatisticUtil: _db = 1 _redisManager = redis_manager.RedisManager(1) _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager() __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(L2LimitUpMoneyStatisticUtil, cls).__new__(cls, *args, **kwargs) return cls.__instance @classmethod def __get_redis(cls): return cls._redisManager.getRedis() # 设置l2的每一秒涨停封单额数据 def __set_l2_second_money_record(self, code, time, num, from_index, to_index): old_num, old_from, old_to = self.__get_l2_second_money_record(code, time) if old_num is None: old_num = num old_from = from_index old_to = to_index else: old_num += num old_to = to_index key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) def __get_l2_second_money_record(self, code, time): key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) val = RedisUtils.get(self.__get_redis(), key) return self.__format_second_money_record_val(val) def __format_second_money_record_val(self, val): if val is None: return None, None, None val = json.loads(val) return val[0], val[1], val[2] def __get_l2_second_money_record_keys(self, code, time_regex): key = "l2_limit_up_second_money-{}-{}".format(code, time_regex) keys = RedisUtils.keys(self.__get_redis(), key) return keys # 设置l2最新的封单额数据 def __set_l2_latest_money_record(self, code, index, num): key = "l2_limit_up_money-{}".format(code) RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num, index))) # 返回数量,索引 def __get_l2_latest_money_record(self, code): key = "l2_limit_up_money-{}".format(code) result = RedisUtils.get(self.__get_redis(), key) if result: result = json.loads(result) return result[0], result[1] else: return 0, -1 # 矫正数据 # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标 def verify_num(self, code, num, time_str): # 记录买1矫正日志 logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str) time_ = time_str.replace(":", "") key = None # 获取矫正时间前1分钟的数据 keys = [] if not constant.TEST: for i in range(0, 3600): temp_time = tool.trade_time_add_second(time_str, 0 - i) # 只处理9:30后的数据 if int(temp_time.replace(":", "")) < int("093000"): break keys_ = self.__get_l2_second_money_record_keys(code, temp_time.replace(":", "")) if len(keys_) > 0: keys.append(keys_[0]) if len(keys) >= 1: break else: keys_ = self.__get_l2_second_money_record_keys(code, "*") key_list = [] for k in keys_: time__ = k.split("-")[-1] key_list.append((int(time__), k)) key_list.sort(key=lambda tup: tup[0]) for t in key_list: if t[0] <= int(time_): keys.append(t[1]) break keys.sort(key=lambda tup: int(tup.split("-")[-1])) if len(keys) > 0: key = keys[0] val = RedisUtils.get(self.__get_redis(), key) old_num, old_from, old_to = self.__format_second_money_record_val(val) end_index = old_to # 保存最近的数据 self.__set_l2_latest_money_record(code, end_index, num) logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num) else: logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num) # 取消此种方法 # # for i in range(4, -2, -2): # # 获取本(分钟/小时/天)内秒分布数据 # time_regex = "{}*".format(time_[:i]) # keys_ = cls.__get_l2_second_money_record_keys(code, time_regex) # if keys_ and len(keys_) > 1: # # 需要排序 # keys = [] # for k in keys_: # keys.append(k) # keys.sort(key=lambda tup: int(tup.split("-")[-1])) # # if i == 4: # # keys=keys[:5] # # 有2个元素 # for index in range(0, len(keys) - 1): # time_1 = keys[index].split("-")[-1] # time_2 = keys[index + 1].split("-")[-1] # if int(time_1) <= int(time_) <= int(time_2): # # 在此时间范围内 # if time_ == time_2: # key = keys[index + 1] # else: # key = keys[index] # break # if key: # break # # 如果没有找到匹配的区间 # if not key: # # 最后一条数据的时间为相应的区间 # total_datas = local_today_datas[code] # # if key: # val = cls.__get_redis().get(key) # old_num, old_from, old_to = cls.__format_second_money_record_val(val) # end_index = old_to # # 保存最近的数据 # cls.__set_l2_latest_money_record(code, end_index, num) # logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num) # 计算量,用于涨停封单量的计算 def __compute_num(self, code, data, buy_single_data): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]): # 涨停买撤与卖 return 0 - int(data["val"]["num"]) * data["re"] else: # 卖撤 if L2DataUtil.is_sell_cancel(data["val"]): # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算 if l2_data_util.is_sell_index_before_target(data, buy_single_data, local_today_num_operate_map.get(code)): return 0 return int(data["val"]["num"]) * data["re"] def clear(self, code): key = "l2_limit_up_money-{}".format(code) RedisUtils.delete(self.__get_redis(), key) # 返回取消的标志数据 # with_cancel 是否需要判断是否撤销 def process_data(self, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True): if buy_single_begin_index is None or buy_exec_index is None: return None, None start_time = round(time.time() * 1000) total_datas = local_today_datas[code] time_dict_num = {} # 记录计算的坐标 time_dict_num_index = {} # 坐标-量的map num_dict = {} # 统计时间分布 time_dict = {} for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict: time_dict[time_] = i for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] time_ = val["time"] if time_ not in time_dict_num: time_dict_num[time_] = 0 time_dict_num_index[time_] = {"s": i, "e": i} time_dict_num_index[time_]["e"] = i num = self.__compute_num(code, data, total_datas[buy_single_begin_index]) num_dict[i] = num time_dict_num[time_] = time_dict_num[time_] + num for t_ in time_dict_num: self.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"], time_dict_num_index[t_]["e"]) # print("保存涨停封单额时间:", round(time.time() * 1000) - start_time) # 累计最新的金额 total_num, index = self.__get_l2_latest_money_record(code) record_msg = f"同花顺买1信息 {total_num},{index}" if index == -1: # 没有获取到最新的矫正封单额,需要从买入信号开始点计算 index = buy_single_begin_index - 1 total_num = 0 cancel_index = None cancel_msg = None # 待计算量 limit_up_price = gpcode_manager.get_limit_up_price(code) min_volumn = round(10000000 / (limit_up_price * 100)) min_volumn_big = min_volumn * 5 # 不同时间的数据开始坐标 time_start_index_dict = {} # 数据时间分布 time_list = [] # 到当前时间累积的买1量 time_total_num_dict = {} # 大单撤销笔数 cancel_big_num_count = 0 buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"]) # 获取最大封单额 max_buy1_volume = self._thsBuy1VolumnManager.get_max_buy1_volume_cache(code) # 从同花顺买1矫正过后的位置开始计算,到end_index结束 for i in range(index + 1, end_index + 1): data = total_datas[i] # 统计撤销数量 try: if big_money_num_manager.is_big_num(data["val"]): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): cancel_big_num_count += int(data["re"]) # 获取是否在买入执行信号周围2s buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get( code)) if buy_index is not None: # 相差1s buy_time = total_datas[buy_index]["val"]["time"] if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2: cancel_big_num_count += int(data["re"]) elif L2DataUtil.is_limit_up_price_buy(data["val"]): cancel_big_num_count -= int(data["re"]) except Exception as e: logging.exception(e) threshold_rate = 0.5 if cancel_big_num_count >= 0: if cancel_big_num_count < 10: threshold_rate = threshold_rate - cancel_big_num_count * 0.01 else: threshold_rate = threshold_rate - 10 * 0.01 time_ = data["val"]["time"] if time_ not in time_start_index_dict: # 记录每一秒的开始位置 time_start_index_dict[time_] = i # 记录时间分布 time_list.append(time_) # 上一段时间的总数 time_total_num_dict[time_] = total_num exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) val = num_dict.get(i) if val is None: val = self.__compute_num(code, data, total_datas[buy_single_begin_index]) total_num += val # 在处理数据的范围内,就需要判断是否要撤单了 if start_index <= i <= end_index: # 如果是减小项 if val < 0: # 累计封单金额小于1000万 if total_num < min_volumn: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: cancel_index = i cancel_msg = "封单金额小于1000万,为{}".format(total_num) break # 相邻2s内的数据减小50% # 上1s的总数 last_second_total_volumn = time_total_num_dict.get(time_list[-1]) if last_second_total_volumn > 0 and ( last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: # 相邻2s内的数据减小50% cancel_index = i cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn, total_num) break # 记录中有上2个数据 if len(time_list) >= 2: # 倒数第2个数据 last_2_second_total_volumn = time_total_num_dict.get(time_list[-2]) if last_2_second_total_volumn > 0: if last_2_second_total_volumn > last_second_total_volumn > total_num: dif = last_2_second_total_volumn - total_num if dif / last_2_second_total_volumn >= threshold_rate: # 与执行位相隔>=5s时规则生效 if exec_time_offset >= 5: cancel_index = i cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_, last_2_second_total_volumn, last_second_total_volumn, total_num) break if not with_cancel: cancel_index = None # print("封单额计算时间:", round(time.time() * 1000) - start_time) process_end_index = end_index if cancel_index: process_end_index = cancel_index # 保存最新累计金额 # cls.__set_l2_latest_money_record(code, process_end_index, total_num) # l2_data_log.l2_time(code, round(time.time() * 1000) - start_time, # "l2数据封单额计算时间", # False) if cancel_index: l2_log.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg, total_num) return total_datas[cancel_index], cancel_msg return None, None # ---------------------------------板上卖----------------------------- # 涨停卖统计 class L2LimitUpSellStatisticUtil: __db = 0 __redisManager = redis_manager.RedisManager(0) __limit_up_sell_num_cache = {} __limit_up_sell_index_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(L2LimitUpSellStatisticUtil, cls).__new__(cls, *args, **kwargs) # 初始化 cls.load_data() return cls.__instance @classmethod def load_data(cls): redis_ = cls.__get_redis() try: keys = RedisUtils.keys(redis_, "limit_up_sell_num-*") for k in keys: code = k.split["-"][-1] cls.__limit_up_sell_num_cache[code] = RedisUtils.get(redis_, k) keys = RedisUtils.keys(redis_, "limit_up_sell_index-*") for k in keys: code = k.split["-"][-1] cls.__limit_up_sell_index_cache[code] = RedisUtils.get(redis_, k) finally: RedisUtils.realse(redis_) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 新增卖数据 def __incre_sell_data(self, code, num): if code not in self.__limit_up_sell_num_cache: self.__limit_up_sell_num_cache[code] = 0 self.__limit_up_sell_num_cache[code] += num key = "limit_up_sell_num-{}".format(code) RedisUtils.incrby_async(self.__db, key, num) RedisUtils.expire_async(self.__db, key, tool.get_expire()) def __get_sell_data(self, code): key = "limit_up_sell_num-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return 0 return int(val) def __get_sell_data_cache(self, code): if code in self.__limit_up_sell_num_cache: return int(self.__limit_up_sell_num_cache[code]) return 0 def __save_process_index(self, code, index): tool.CodeDataCacheUtil.set_cache(self.__limit_up_sell_index_cache, code, index) key = "limit_up_sell_index-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), index) def __get_process_index(self, code): key = "limit_up_sell_index-{}".format(code) val = RedisUtils.get(self.__get_redis(), key) if val is None: return -1 return int(val) def __get_process_index_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_sell_index_cache, code) if cache_result[0]: return int(cache_result[1]) return -1 # 清除数据,当取消成功与买入之前需要清除数据 def delete(self, code): tool.CodeDataCacheUtil.clear_cache(self.__limit_up_sell_index_cache, code) tool.CodeDataCacheUtil.clear_cache(self.__limit_up_sell_num_cache, code) key = "limit_up_sell_num-{}".format(code) RedisUtils.delete_async(self.__db, key) key = "limit_up_sell_index-{}".format(code) RedisUtils.delete_async(self.__db, key) def clear(self): keys = RedisUtils.keys(self.__get_redis(), "limit_up_sell_num-*") for k in keys: code = k.split("-")[-1] self.delete(code) # 处理数据,返回是否需要撤单 # 处理范围:买入执行位-当前最新位置 def process(self, code, start_index, end_index, buy_exec_index): # 获取涨停卖的阈值 limit_up_price = gpcode_manager.get_limit_up_price(code) zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) # 大于自由流通市值的4.8% threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100) total_num = self.__get_sell_data_cache(code) cancel_index = None process_index = self.__get_process_index_cache(code) total_datas = local_today_datas.get(code) for i in range(start_index, end_index + 1): if i < buy_exec_index: continue if i <= process_index: continue if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]): num = int(total_datas[i]["val"]["num"]) self.__incre_sell_data(code, num) total_num += num if total_num > threshold_num: cancel_index = i break if cancel_index is not None: process_index = cancel_index else: process_index = end_index l2_log.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num, threshold_num) self.__save_process_index(code, process_index) if cancel_index is not None: return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num) return None, "" class LatestCancelIndexManager: __db = 0 __redis_manager = redis_manager.RedisManager(0) __latest_cancel_index_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LatestCancelIndexManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "latest_cancel_index-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = int(val) CodeDataCacheUtil.set_cache(cls.__latest_cancel_index_cache, code, val) finally: RedisUtils.realse(__redis) def __save_latest_cancel_index(self, code, index): RedisUtils.setex_async(self.__db, f"latest_cancel_index-{code}", tool.get_expire(), index) def set_latest_cancel_index(self, code, index): CodeDataCacheUtil.set_cache(self.__latest_cancel_index_cache, code, index) self.__save_latest_cancel_index(code, index) def get_latest_cancel_index_cache(self, code): result = CodeDataCacheUtil.get_cache(self.__latest_cancel_index_cache, code) if result[0]: return result[1] return None pass class JCancelBigNumComputer(BaseCancel): """ J撤: 1000ms内若有三笔,我们后面的涨停撤买L , 此时算一次信号,即开始计算我们后面的未撤的所有涨停总额, 当此总涨停额下降至50%则撤单。 更新屏幕,3分钟以后有新的信号, 则重新计算总额。如果3分钟后没有新的信号, 则沿用上一次计算的后涨停额。 守护时间14点50分00秒, 此撤关于我们后面的不想顶而撤, 导致封单额陡然降低 """ __cancel_single_cache = {} __real_place_order_index_info_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(JCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) return cls.__instance def set_real_place_order_index(self, code, index, buy_single_index, is_default): self.__real_place_order_index_info_dict[code] = (index, is_default) def __compute_cancel_single(self, code, start_index, end_index): """ 计算撤单信号:1000ms内有3笔撤单 @param code: @param start_index: 开始索引 @param end_index: 结束索引 @return: """ # 获取真实下单位置 real_place_order_index_info = self.__real_place_order_index_info_dict.get(code) if not real_place_order_index_info or real_place_order_index_info[1]: # 尚未获取到真实下单位置 return real_place_order_index = real_place_order_index_info[0] # 获取撤单信号[时间,真实下单位置, 信号总手数,目前手数,最新计算的索引] cancel_single_info = self.__cancel_single_cache.get(code) outoftime = False total_datas = local_today_datas.get(code) if cancel_single_info: outoftime = tool.trade_time_sub(total_datas[-1]['val']['time'], cancel_single_info[0]) > 180 if not outoftime: # 上次计算还未超时 return limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if cancel_single_info and outoftime: # 更新需要计算信号 min_volume = 50 * 10000 // int(limit_up_price * 100) # 计算本批数据是否有撤单 single_info = None # (index, 时间ms) for i in range(end_index, start_index - 1, -1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if val['num'] < min_volume: continue single_info = (i, L2DataUtil.get_time_with_ms(val)) break if not single_info: # "无涨停撤单" return indexes = [single_info[0]] # 包含信号笔数 for i in range(single_info[0] - 1, real_place_order_index, -1): data = total_datas[i] val = data['val'] if not L2DataUtil.is_limit_up_price_buy_cancel(val): continue if val['num'] < min_volume: continue time_ms = L2DataUtil.get_time_with_ms(val) if tool.trade_time_sub_with_ms(single_info[1], time_ms) > 1000: break indexes.append(i) if len(indexes) >= 3: break if len(indexes) < 3: # 不满足更新条件 return total_count, total_num = L2DataComputeUtil.compute_left_buy_order(code, real_place_order_index + 1, total_datas[-1]['index'], limit_up_price) self.__cancel_single_cache[code] = [total_datas[-1]['val']['time'], real_place_order_index, total_num, total_num, total_datas[-1]['index']] l2_log.j_cancel_debug(code, f"触发囊括:{self.__cancel_single_cache[code]}") def need_cancel(self, code, start_index, end_index): if 1 > 0: return False, None, "J撤不生效" # 需要先计算 self.__compute_cancel_single(code, start_index, end_index) # [时间, 真实下单位置, 信号总手数, 目前手数, 最新计算的索引] cancel_single_info = self.__cancel_single_cache.get(code) if not cancel_single_info: return False, None, "没有监听" # 计算剩余数量 total_datas = local_today_datas.get(code) if int(total_datas[end_index]['val']['time'].replace(":", "")) > 145000: return False, None, "超过生效时间" buyno_map = local_today_buyno_map.get(code) limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) min_volume = 50 * 10000 // int(limit_up_price * 100) # 计算纯买额 for i in range(start_index, end_index + 1): data = total_datas[i] val = data['val'] if data['index'] <= cancel_single_info[4]: continue if val['num'] < min_volume: continue if L2DataUtil.is_limit_up_price_buy_cancel(val): orderNo = val['orderNo'] buy_data = buyno_map.get(f"{orderNo}") if buy_data and buy_data['index'] > cancel_single_info[1]: cancel_single_info[3] -= val['num'] elif L2DataUtil.is_limit_up_price_buy(val): cancel_single_info[3] += val['num'] else: continue cancel_single_info[4] = end_index # self.__cancel_single_cache[code] = cancel_single_info threshold_rate = constant.J_CANCEL_RATE if gpcode_manager.MustBuyCodesManager().is_in_cache(code): threshold_rate = constant.J_CANCEL_RATE_WITH_MUST_BUY cancel_num = cancel_single_info[2] - cancel_single_info[3] rate = round(cancel_num / cancel_single_info[2], 2) if rate >= threshold_rate: return True, total_datas[ end_index], f"撤单比例达到:{rate}/{threshold_rate} 剩余手数:{cancel_single_info[3]}/{cancel_single_info[2]}" return False, None, f"尚未达到撤单比例{rate} , {cancel_num}/{cancel_single_info[2]}" def __clear_data(self, code): if code in self.__cancel_single_cache: self.__cancel_single_cache.pop(code) if code in self.__real_place_order_index_info_dict: self.__real_place_order_index_info_dict.pop(code) def cancel_success(self, code): self.__clear_data(code) def place_order_success(self, code): self.__clear_data(code) class NBCancelBigNumComputer(BaseCancel): """ NB撤: 大市值5分钟内前面没有大单撤 """ __real_place_order_index_info_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(NBCancelBigNumComputer, cls).__new__(cls, *args, **kwargs) return cls.__instance def set_real_place_order_index(self, code, index, buy_single_index, is_default): self.__real_place_order_index_info_dict[code] = (index, is_default) def need_cancel(self, code, trade_index): if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return False, "已加红" # [时间, 真实下单位置, 信号总手数, 目前手数, 最新计算的索引] real_place_order_index_info = self.__real_place_order_index_info_dict.get(code) if not real_place_order_index_info or real_place_order_index_info[1]: return False, "没有找到真实下单位" real_place_order_index = real_place_order_index_info[0] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if limit_up_price < 3: return False, "股价小于3块" zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) zyltgb_unit_y = round(zyltgb / 100000000, 1) if zyltgb_unit_y < 50: return False, '自由流通股本小于50亿' # 计算剩余数量 total_datas = local_today_datas.get(code) if tool.trade_time_sub(tool.get_now_time_str(), total_datas[real_place_order_index]['val']['time']) > 300: return False, "超过生效时间" total_left_count, total_left_num = L2DataComputeUtil.compute_left_buy_order(code, trade_index, real_place_order_index, limit_up_price, 299 * 10000) if total_left_count > 0: return False, '有大单' return True, f'无大单:{trade_index}-{real_place_order_index}' def __clear_data(self, code): if code in self.__real_place_order_index_info_dict: self.__real_place_order_index_info_dict.pop(code) def cancel_success(self, code): self.__clear_data(code) def place_order_success(self, code): self.__clear_data(code) if __name__ == "__main__": pass