import json
|
import time
|
|
import constant
|
from code_attribute import gpcode_manager, code_volumn_manager
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from l2.code_price_manager import Buy1PriceManager
|
from l2.huaxin import l2_huaxin_util
|
from l2.l2_compute_util import L2DataComputeUtil
|
from l2.l2_data_manager import OrderBeginPosInfo
|
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, HuaXinSellOrderStatisticManager, BigOrderDealManager
|
from log_module import async_log_util
|
from third_data import code_plate_key_manager
|
|
from utils import tool
|
from l2.transaction_progress import TradeBuyQueue
|
from trade import l2_trade_factor, trade_record_log_util
|
from l2 import l2_log, l2_data_source_util
|
from l2.l2_data_util import L2DataUtil, local_today_datas, local_today_canceled_buyno_map, local_today_buyno_map
|
from log_module.log import logger_l2_s_cancel, logger_debug, logger_l2_l_cancel, logger_l2_h_cancel
|
from utils.tool import CodeDataCacheUtil
|
import l2_data_util
|
|
|
class SCancelRateManager:
|
TYPE_S_FAST = 0
|
TYPE_S_FAST_BIG = 1
|
TYPE_S_SLOW = 2
|
|
@classmethod
|
def get_threshhold_rate(cls, code, cancel_type):
|
# 加红
|
must_buy = gpcode_manager.MustBuyCodesManager().is_in_cache(code)
|
half_must_buy = False
|
if not must_buy:
|
try:
|
can_buy_result = code_plate_key_manager.CodePlateKeyBuyManager.can_buy(code)
|
if can_buy_result and can_buy_result[0]:
|
if can_buy_result[0][1] <= 1 and can_buy_result[0][2] >= 3:
|
# 炸板率>60%以上就不加半红
|
if can_buy_result[0][3] <= 0 or can_buy_result[0][2] / can_buy_result[0][3] > 0.4:
|
half_must_buy = True
|
except:
|
pass
|
if cancel_type == cls.TYPE_S_FAST:
|
if must_buy:
|
return constant.S_FAST_RATE_WITH_MUST_BUY, "加红"
|
elif half_must_buy:
|
return round((constant.S_FAST_RATE + constant.S_FAST_RATE_WITH_MUST_BUY) / 2, 2), "半红"
|
else:
|
return constant.S_FAST_RATE, "常规"
|
elif cancel_type == cls.TYPE_S_FAST_BIG:
|
# 重砸
|
if must_buy:
|
return constant.S_FAST_BIG_RATE_WITH_MUST_BUY, "加红"
|
elif half_must_buy:
|
return round((constant.S_FAST_RATE + constant.S_FAST_BIG_RATE_WITH_MUST_BUY) / 2, 2), "半红"
|
else:
|
return constant.S_FAST_RATE, "常规"
|
elif cancel_type == cls.TYPE_S_SLOW:
|
if must_buy:
|
return constant.S_SLOW_RATE_WITH_MUST_BUY, "加红"
|
elif half_must_buy:
|
return round((constant.S_SLOW_RATE + constant.S_SLOW_RATE_WITH_MUST_BUY) / 2, 2), "半红"
|
else:
|
return constant.S_SLOW_RATE, "常规"
|
return None, ""
|
|
|
class SCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__sCancelParamsManager = l2_trade_factor.SCancelParamsManager
|
__s_cancel_real_place_order_index_cache = {}
|
__s_down_watch_indexes_dict = {}
|
__recompute_l_down_time_dict = {}
|
# 最近处理的卖单号
|
__latest_process_sell_order_no = {}
|
|
# S慢砸包含的范围
|
__s_slow_sell_watch_indexes_dict = {}
|
|
# 成交进度位置
|
__trade_progress_index_dict = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(SCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
I = 0
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "s_cancel_real_place_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.__s_cancel_real_place_order_index_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 设置真实下单位置
|
def __save_real_place_order_index(self, code, index, is_default):
|
CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, (index, is_default))
|
key = "s_cancel_real_place_order_index-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
|
|
def __get_real_place_order_index_info_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def get_real_place_order_index_cache(self, code):
|
cache_result = self.__get_real_place_order_index_info_cache(code)
|
if cache_result:
|
return cache_result[0]
|
return None
|
|
def __clear_data(self, code):
|
CodeDataCacheUtil.clear_cache(self.__s_cancel_real_place_order_index_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__s_down_watch_indexes_dict, code)
|
CodeDataCacheUtil.clear_cache(self.__trade_progress_index_dict, code)
|
CodeDataCacheUtil.clear_cache(self.__s_slow_sell_watch_indexes_dict, code)
|
ks = ["s_big_num_cancel_compute_data-{}".format(code), "s_cancel_real_place_order_index-{}".format(code)]
|
for key in ks:
|
RedisUtils.delete_async(self.__db, key)
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index, is_default):
|
self.__save_real_place_order_index(code, index, is_default)
|
if not is_default:
|
self.__compute_s_slow_sell(code)
|
|
def set_transaction_index(self, code, buy_single_index, index):
|
self.__trade_progress_index_dict[code] = index
|
self.__compute_s_slow_sell(code)
|
|
def clear_data(self):
|
ks = ["s_big_num_cancel_compute_data-*", "s_cancel_real_place_order_index-*"]
|
for key in ks:
|
keys = RedisUtils.keys(self.__get_redis(), key)
|
for k in keys:
|
code = k.split("-")[1]
|
self.__clear_data(code)
|
|
# 撤单成功
|
def cancel_success(self, code):
|
self.__clear_data(code)
|
|
# 下单成功
|
def place_order_success(self, code):
|
self.__clear_data(code)
|
|
def __get_fast_threshold_value(self, code):
|
"""
|
获取S快炸的阈值
|
@param code:
|
@return:(大单阈值, 800w内大单阈值)
|
"""
|
max60, yesterday = code_volumn_manager.get_histry_volumn(code)
|
if max60:
|
num = max60[0]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price:
|
money_y = min(round((num * float(limit_up_price)) / 1e8, 1), 50)
|
money = int(round(15 * money_y + 276.5))
|
money_800 = int(round(money * 0.668))
|
return money, money_800
|
# 默认值
|
return 299, 200
|
|
# S前撤实现
|
def __need_cancel_for_up(self, code, big_sell_order_info, total_datas):
|
# 查询是否是真的真实下单位置
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
trade_index = 0
|
real_order_index_info = self.__get_real_place_order_index_info_cache(code)
|
if real_order_index_info is None or real_order_index_info[1]:
|
return False, "没找到真实下单位"
|
real_order_index = real_order_index_info[0]
|
total_deal_money = sum([x[1] * x[2] for x in big_sell_order_info[1]])
|
start_order_no = big_sell_order_info[1][0][3][1]
|
# 防止分母位0
|
total_num = 1
|
# 获取正在成交的数据
|
dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
|
for i in range(trade_index, real_order_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if int(val['orderNo']) < start_order_no:
|
continue
|
if i == trade_index and dealing_info and str(total_datas[trade_index]["val"]["orderNo"]) == str(
|
dealing_info[0]):
|
# 减去当前正在成交的数据中已经成交了的数据
|
total_num -= dealing_info[1] // 100
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_num += val["num"]
|
|
# 大于10w
|
if total_deal_money >= 10 * 10000:
|
cancel_result = self.__need_cancel_for_slow_sell(code, total_datas)
|
if cancel_result[0]:
|
return True, f"S慢砸:{cancel_result[1]}"
|
|
if total_deal_money >= 100 * 10000:
|
l2_log.s_cancel_debug(code, "准备更新L后囊括")
|
start_order_no = big_sell_order_info[1][-1][4][1]
|
latest_deal_time_ms = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True)
|
real_trade_index = None
|
for i in range(trade_index, real_order_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if int(val['orderNo']) < start_order_no:
|
continue
|
real_trade_index = i
|
break
|
if real_trade_index is None:
|
l2_log.s_cancel_debug(code, f"没找到真实的成交进度:start_order_no-{start_order_no} 卖单-{big_sell_order_info}")
|
return False, ""
|
# 间隔1S以上才能重新囊括
|
if code in self.__recompute_l_down_time_dict and tool.trade_time_sub_with_ms(latest_deal_time_ms,
|
self.__recompute_l_down_time_dict[
|
code]) < 1000:
|
l2_log.s_cancel_debug(code,
|
f"更新L后囊括:更新间隔在1s内,{latest_deal_time_ms}-{self.__recompute_l_down_time_dict[code]}")
|
return False, ""
|
self.__recompute_l_down_time_dict[code] = latest_deal_time_ms
|
# 重新囊括L后
|
# 撤单时间比早成交时间大就需要计算在里面
|
LCancelBigNumComputer().re_compute_l_down_watch_indexes(code, big_sell_info=(
|
real_trade_index, latest_deal_time_ms))
|
l2_log.s_cancel_debug(code, f"更新L后囊括完成:{(real_trade_index, latest_deal_time_ms)}")
|
return False, ""
|
|
def __need_cancel_for_slow_sell(self, code, total_datas):
|
"""
|
S慢撤的目的是判断较大金额卖占整个卖的比例不能太大
|
@param code:
|
@param total_datas:
|
@return:
|
"""
|
if code not in self.__s_slow_sell_watch_indexes_dict:
|
return False, "S慢砸没有囊括"
|
# 下单3分钟内有效
|
real_place_order_info = self.__get_real_place_order_index_info_cache(code)
|
if not real_place_order_info:
|
return False, "没有获取到真实下单位"
|
if tool.trade_time_sub(total_datas[-1]['val']['time'],
|
total_datas[real_place_order_info[0]]['val']['time']) > 180:
|
return False, "超过守护时间"
|
|
# 格式:[{监听index},当时正在成交的订单号,已经成交的手数]
|
watch_info = self.__s_slow_sell_watch_indexes_dict.get(code)
|
# 计算总买额
|
total_num = 0
|
for i in watch_info[0]:
|
data = total_datas[i]
|
val = data['val']
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_num += val['num']
|
if val['orderNo'] == watch_info[1]:
|
total_num -= watch_info[2]
|
# 过滤最小金额
|
zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
|
zyltgb_unit_y = round(zyltgb / 100000000, 1)
|
min_sell_money = int((zyltgb_unit_y / 2 + 5) * 10000)
|
sell_orders = HuaXinSellOrderStatisticManager.get_latest_transaction_datas(code, min_sell_order_no=
|
total_datas[real_place_order_info[0]]['val']['orderNo'], min_sell_money=min_sell_money)
|
sell_order_num = sum([x[1] for x in sell_orders]) // 100
|
rate = round(sell_order_num / total_num, 2)
|
threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code,
|
SCancelRateManager.TYPE_S_SLOW)
|
if rate > threshold_rate:
|
return True, f"慢砸成交比例:{rate}/0.49({threshold_rate_msg}) 成交:{sell_order_num}/{total_num}"
|
else:
|
return False, f"尚未触发撤单比例:{rate}"
|
|
# 计算S后撤监听的数据范围
|
def __compute_down_cancel_watch_index(self, code, big_sell_order_info, total_datas):
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
real_order_index_info = self.__get_real_place_order_index_info_cache(code)
|
if real_order_index_info is None or real_order_index_info[1]:
|
return
|
real_order_index = real_order_index_info[0]
|
start_order_no = big_sell_order_info[1][-1][4][1]
|
latest_deal_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0], with_ms=True)
|
if code in self.__s_down_watch_indexes_dict:
|
# 更新囊括范围后1s内不能再次更新
|
if tool.trade_time_sub_with_ms(latest_deal_time, self.__s_down_watch_indexes_dict[code][0]) <= 1000:
|
return
|
|
watch_index_info = []
|
for i in range(trade_index, real_order_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if int(val['orderNo']) < start_order_no:
|
continue
|
if val['num'] * float(val['price']) < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_index_info.append((i, val['num']))
|
watch_index_info.sort(key=lambda x: x[1], reverse=True)
|
watch_indexes = set([x[0] for x in watch_index_info[:5]])
|
l2_log.s_cancel_debug(code, f"S后计算那概括范围:{watch_indexes} 最近成交的卖单信息:{big_sell_order_info[1][-1]}")
|
if watch_indexes:
|
# 保存卖单信息
|
self.__s_down_watch_indexes_dict[code] = (latest_deal_time, watch_indexes)
|
|
# 是否有大卖单需要撤
|
def __need_cancel_for_big_sell_order(self, code, big_sell_order_info, order_begin_pos: OrderBeginPosInfo):
|
# 需要排除成交时间在下单时间之前的
|
total_deal_money = 0
|
total_datas = local_today_datas.get(code)
|
real_order_index_info = self.__get_real_place_order_index_info_cache(code)
|
real_order_time_ms = None
|
real_order_index = None
|
if real_order_index_info and not real_order_index_info[1]:
|
real_order_index = real_order_index_info[0]
|
real_order_time_ms = total_datas[real_order_index]["val"]["time"] + ".{0:0>3}".format(
|
total_datas[real_order_index]["val"]["tms"])
|
|
for x in big_sell_order_info[1]:
|
deal_time = l2_huaxin_util.convert_time(x[4][0], with_ms=True)
|
if real_order_time_ms:
|
if tool.trade_time_sub_with_ms(deal_time, real_order_time_ms) >= 0:
|
m = x[1] * x[2]
|
total_deal_money += m
|
else:
|
m = x[1] * x[2]
|
total_deal_money += m
|
|
zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
|
zyltgb_unit_y = round(zyltgb / 100000000, 1)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
limit_up_price = round(float(limit_up_price), 2)
|
# 有超大单砸 S重砸
|
threshold_big_deal = (2 * zyltgb_unit_y + 339) * 10000
|
if total_deal_money >= threshold_big_deal:
|
# S重砸必撤的金额满足以后,以前是无脑撤。现在优化一下,看成交进度位---真实下单位的区间额≤重砸金额*3.3倍,就撤。
|
try:
|
# 上证,下单3分钟内
|
try:
|
if tool.is_sh_code(code) and real_order_index is not None and tool.trade_time_sub(
|
total_datas[-1]["val"]["time"], total_datas[real_order_index]["val"]["time"]) < 3 * 60:
|
# 上证如果重砸额大于阈值的1.5倍直接撤单
|
if not gpcode_manager.MustBuyCodesManager().is_in_cache(code):
|
if total_deal_money >= threshold_big_deal * 1.5:
|
return True, f"1s内成交({total_deal_money}) 大于大卖单({threshold_big_deal})的1.5倍"
|
# 如果没有大单成交也直接撤单
|
deal_big_order_count = BigOrderDealManager().get_total_buy_count(code)
|
if deal_big_order_count < 1:
|
return True, f"1s内成交({total_deal_money}) 大于大卖单({threshold_big_deal})且无大单成交"
|
except Exception as e:
|
l2_log.s_cancel_debug(code, "S重砸出错了:{}", str(e))
|
|
need_compute = False
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
trade_index = 0
|
if real_order_index_info and not real_order_index_info[1]:
|
need_compute = True
|
|
if need_compute:
|
total_count, total_num = L2DataComputeUtil.compute_left_buy_order(code, trade_index,
|
real_order_index_info[0],
|
limit_up_price)
|
if total_num == 0:
|
total_num = 1
|
|
threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code,
|
SCancelRateManager.TYPE_S_FAST_BIG)
|
|
if total_deal_money / (total_num * limit_up_price * 100) >= threshold_rate:
|
# 大单成交额占总剩余总囊括的30%
|
return True, f"1s内大于{threshold_big_deal}({threshold_rate_msg})大卖单({total_deal_money})"
|
else:
|
return True, f"1s内大于{threshold_big_deal}大卖单({total_deal_money})"
|
except Exception as e:
|
l2_log.info(code, logger_debug, f"S快计算出错:{str(e)}")
|
|
# 判断是否为激进下单
|
threash_money_w, threash_money_danger_w = self.__get_fast_threshold_value(code)
|
total_fast_num = 0
|
try:
|
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
trade_index = 0
|
if real_order_index_info and not real_order_index_info[1]:
|
real_order_index = real_order_index_info[0]
|
# 开始第一笔成交数据
|
fdeal = big_sell_order_info[1][0][3]
|
start_order_no = fdeal[1]
|
sell_time_str = l2_huaxin_util.convert_time(fdeal[0], with_ms=False)
|
|
total_num = 0
|
# 获取正在成交的数据
|
# 计算成交进度到真实下单位的纯买额
|
dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
|
for i in range(trade_index, real_order_index):
|
data = total_datas[i]
|
val = data['val']
|
if int(val['orderNo']) < start_order_no:
|
continue
|
if i == trade_index and dealing_info and str(
|
total_datas[trade_index]["val"]["orderNo"]) == str(
|
dealing_info[0]):
|
# 减去当前正在成交的数据中已经成交了的数据
|
total_num -= dealing_info[1] // 100
|
total_fast_num -= dealing_info[1] // 100
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
|
code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_num += val["num"]
|
total_fast_num += val['num']
|
if total_num * float(limit_up_price) < 800 * 100 and tool.trade_time_sub(sell_time_str,
|
total_datas[real_order_index][
|
'val']['time']) < 15 * 60:
|
# 15分钟内真实成交位距离真实下单位,金额≤800万 ,大单阈值变为200w
|
threash_money_w = threash_money_danger_w
|
except Exception as e:
|
l2_log.s_cancel_debug(code, f"S撤激进下单计算大单卖阈值出错:{str(e)}")
|
total_fast_money = int(total_fast_num * 100 * float(limit_up_price))
|
if total_fast_money == 0:
|
# 防止分母为0
|
total_fast_money = 1
|
rate = round(total_deal_money / total_fast_money, 2)
|
threshold_rate, threshold_rate_msg = SCancelRateManager.get_threshhold_rate(code,
|
SCancelRateManager.TYPE_S_FAST)
|
|
if total_deal_money >= threash_money_w * 10000 and rate >= threshold_rate:
|
return True, f"近1s有大卖单({round(total_deal_money / 10000, 1)}万/{threash_money_w}万({threshold_rate_msg}),成交占比:{total_deal_money}/{total_fast_money})"
|
else:
|
l2_log.s_cancel_debug(code,
|
f"S快撤没达到撤单比例:成交金额({total_deal_money})/囊括金额({total_fast_money}),比例:{rate} 大单阈值:{threash_money_w}w")
|
return False, f"无{threash_money_w}大单"
|
|
# big_sell_order_info格式:[总共的卖单金额, 大卖单详情列表]
|
def set_big_sell_order_info_for_cancel(self, code, big_sell_order_info, order_begin_pos: OrderBeginPosInfo):
|
if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
|
return False, "还未下单"
|
|
if big_sell_order_info[0] < 500000 or not big_sell_order_info[1]:
|
return False, "无大单卖"
|
l2_log.s_cancel_debug(code, f"主动卖:{big_sell_order_info}")
|
try:
|
need_cancel, cancel_msg = self.__need_cancel_for_big_sell_order(code, big_sell_order_info, order_begin_pos)
|
if need_cancel:
|
return need_cancel, cancel_msg
|
|
total_datas = local_today_datas.get(code)
|
need_cancel, cancel_msg = self.__need_cancel_for_up(code, big_sell_order_info, total_datas)
|
if need_cancel:
|
return need_cancel, cancel_msg
|
# S后撤取消
|
# if self.__latest_process_sell_order_no.get(code) != big_sell_order_info[1][-1][0]:
|
# # 不处理重复的卖单
|
# # 获取最新的成交时间
|
# latest_trade_time = l2_huaxin_util.convert_time(big_sell_order_info[1][-1][4][0])
|
# if tool.trade_time_sub(latest_trade_time,
|
# total_datas[order_begin_pos.buy_single_index]['val']['time']) >= 180:
|
# self.__compute_down_cancel_watch_index(code, big_sell_order_info, total_datas)
|
return False, "不满足撤单条件"
|
finally:
|
# self.__latest_process_sell_order_no[code] = big_sell_order_info[1][-1][0]
|
pass
|
|
# ----------------------------S慢砸范围计算------------------------------------
|
def __compute_s_slow_sell(self, code):
|
try:
|
if code in self.__s_slow_sell_watch_indexes_dict:
|
return
|
real_place_order_info = self.__get_real_place_order_index_info_cache(code)
|
if not real_place_order_info or real_place_order_info[1]:
|
return
|
trade_index = self.__trade_progress_index_dict.get(code)
|
if trade_index is None:
|
return
|
start_index = trade_index
|
end_index = real_place_order_info[0]
|
total_datas = local_today_datas.get(code)
|
watch_indexes = set()
|
for i in range(start_index, end_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val['num'] * float(val['price']) < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
|
dealing_order_no = None
|
dealed_num = 0
|
# 格式:[监听的索引,正在成交索引,已成交的数量]
|
dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
|
if dealing_info:
|
dealing_order_no = dealing_info[0]
|
dealed_num = dealing_info[1] // 100
|
watch_info = [watch_indexes, dealing_order_no, dealed_num]
|
self.__s_slow_sell_watch_indexes_dict[code] = watch_info
|
l2_log.s_cancel_debug(code, f"S慢砸监听范围:{watch_info}")
|
except Exception as e:
|
logger_l2_s_cancel.exception(e)
|
|
|
# ---------------------------------L撤-------------------------------
|
class LCancelRateManager:
|
__block_limit_up_count_dict = {}
|
__block_limit_up_count_for_l_up_dict = {}
|
__big_num_deal_rate_dict = {}
|
__MustBuyCodesManager = gpcode_manager.MustBuyCodesManager()
|
|
# 获取撤单比例,返回(撤单比例,是否必买)
|
@classmethod
|
def get_cancel_rate(cls, code, buy_exec_time, is_up=False, is_l_down_recomputed=False):
|
try:
|
must_buy = cls.__MustBuyCodesManager.is_in_cache(code)
|
if must_buy:
|
if is_up:
|
return constant.L_CANCEL_RATE_UP_WITH_MUST_BUY, True
|
else:
|
return constant.L_CANCEL_RATE_WITH_MUST_BUY, True
|
except Exception as e:
|
async_log_util.error(logger_l2_l_cancel, str(e))
|
|
base_rate = constant.L_CANCEL_RATE
|
if is_up:
|
base_rate = constant.L_CANCEL_RATE_UP
|
if tool.is_sh_code(code):
|
base_rate = constant.L_CANCEL_RATE_UP_SH
|
try:
|
block_rate = 0
|
count_dict = cls.__block_limit_up_count_dict
|
if is_up:
|
count_dict = cls.__block_limit_up_count_for_l_up_dict
|
if code in count_dict:
|
count = count_dict[code]
|
rates = [0, 0.03, 0.06, 0.08, 0.12]
|
if count >= len(rates):
|
block_rate = rates[-1]
|
else:
|
block_rate = rates[count]
|
|
deal_rate = 0
|
if code in cls.__big_num_deal_rate_dict:
|
deal_rate = round(cls.__big_num_deal_rate_dict[code] / 100)
|
|
base_rate += block_rate
|
base_rate += deal_rate
|
except Exception as e:
|
l2_log.l_cancel_debug(code, f"计算撤单比例出错:{e}")
|
return round(base_rate, 2), False
|
|
# 获取L后成交太快的撤单比例
|
@classmethod
|
def get_fast_deal_cancel_rate(cls, code):
|
must_buy_cancel_rate = cls.__MustBuyCodesManager.get_cancel_rate_cache(code)
|
if must_buy_cancel_rate is not None:
|
return must_buy_cancel_rate
|
return constant.L_CANCEL_FAST_DEAL_RATE
|
|
# 设置板块涨停数量(除开自己)
|
@classmethod
|
def set_block_limit_up_count(cls, reason_codes_dict, limit_up_time_dict: dict):
|
for reason in reason_codes_dict:
|
codes = reason_codes_dict[reason]
|
codes = list(codes)
|
# 目标票在确认的涨停原因中,在总的身位的≤50%以外,则L前的涨停影响比例因素不生效。
|
codes.sort(key=lambda x: (
|
int(limit_up_time_dict.get(x).replace(":", "")) if x in limit_up_time_dict else int("150000")))
|
for i in range(len(codes)):
|
c = codes[i]
|
cls.__block_limit_up_count_dict[c] = len(codes) - 1
|
if i < len(codes) / 2:
|
cls.__block_limit_up_count_for_l_up_dict[c] = len(codes) - 1
|
else:
|
cls.__block_limit_up_count_for_l_up_dict[c] = 0
|
|
@classmethod
|
def set_big_num_deal_info(cls, code, buy_money, sell_money):
|
left_money_w = (buy_money - sell_money) // 10000
|
if left_money_w > 0:
|
rate = ((left_money_w + 300) // 900) * 2
|
else:
|
rate = ((left_money_w + 599) // 900) * 2
|
if rate < -10:
|
rate = -10
|
if rate > 10:
|
rate = 10
|
cls.__big_num_deal_rate_dict[code] = rate
|
|
@classmethod
|
def compute_big_num_deal_info(cls, code):
|
total_buy_money = BigOrderDealManager().get_total_buy_money(code)
|
total_sell_money = BigOrderDealManager().get_total_sell_money(code)
|
cls.set_big_num_deal_info(code, total_buy_money, total_sell_money)
|
|
|
# 计算成交位置之后的大单(特定笔数)的撤单比例
|
class LCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__last_trade_progress_dict = {}
|
__real_place_order_index_dict = {}
|
__cancel_watch_index_info_cache = {}
|
# L后真实成交位之后的位置索引
|
__cancel_l_down_after_place_order_index_cache = {}
|
# 成交位附近临近大单索引
|
__near_by_trade_progress_index_cache = {}
|
|
__SCancelBigNumComputer = SCancelBigNumComputer()
|
|
# L后囊括范围未撤单/未成交的总手数
|
__total_l_down_not_deal_num_dict = {}
|
# L后最近的成交数信息
|
__l_down_latest_deal_info_dict = {}
|
|
__last_l_up_compute_info = {}
|
|
__l_down_after_by_big_order_dict = {}
|
|
# 权重
|
__l_down_after_by_big_order_weight_dict = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "l_cancel_watch_index_info-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
if val:
|
val = json.loads(val)
|
val[2] = set(val[2])
|
CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_info_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "l_cancel_real_place_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
CodeDataCacheUtil.set_cache(cls.__real_place_order_index_dict, code, val)
|
|
keys = RedisUtils.keys(__redis, "l_cancel_near_by_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
try:
|
val = set(json.loads(val))
|
CodeDataCacheUtil.set_cache(cls.__near_by_trade_progress_index_cache, code, val)
|
except:
|
pass
|
keys = RedisUtils.keys(__redis, "l_cancel_down_after_place_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
try:
|
val = json.loads(val)
|
CodeDataCacheUtil.set_cache(cls.__cancel_l_down_after_place_order_index_cache, code, val)
|
except:
|
pass
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def __set_watch_indexes(self, code, buy_single_index, re_compute: int, indexes):
|
self.__cancel_watch_index_info_cache[code] = (buy_single_index, re_compute, indexes)
|
RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
|
RedisUtils.setex_async(self.__db, f"l_cancel_watch_index_info-{code}", tool.get_expire(),
|
json.dumps((buy_single_index, re_compute, list(indexes))))
|
if indexes:
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN,
|
buy_single_index,
|
list(indexes)))
|
|
def __get_watch_indexes_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_info_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def __set_near_by_trade_progress_indexes(self, code, buy_single_index, indexes):
|
if indexes:
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_UP,
|
buy_single_index,
|
list(indexes)))
|
self.__near_by_trade_progress_index_cache[code] = indexes
|
RedisUtils.setex_async(self.__db, f"l_cancel_near_by_index-{code}", tool.get_expire(),
|
json.dumps(list(indexes)))
|
|
def __get_near_by_trade_progress_indexes(self, code):
|
val = RedisUtils.get(self.__get_redis(), f"l_cancel_near_by_index-{code}")
|
if val is None:
|
return None
|
return set(json.loads(val))
|
|
def __get_near_by_trade_progress_indexes_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__near_by_trade_progress_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def __set_cancel_l_down_after_place_order_index(self, code, watch_index, index):
|
if code not in self.__cancel_l_down_after_place_order_index_cache:
|
self.__cancel_l_down_after_place_order_index_cache[code] = {}
|
self.__cancel_l_down_after_place_order_index_cache[code][str(watch_index)] = index
|
RedisUtils.setex_async(self.__db, f"l_cancel_down_after_place_order_index-{code}", tool.get_expire(),
|
json.dumps(self.__cancel_l_down_after_place_order_index_cache[code]))
|
|
def __get_cancel_l_down_after_place_order_index_dict(self, code):
|
return self.__cancel_l_down_after_place_order_index_cache.get(code)
|
|
def del_watch_index(self, code):
|
CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_info_cache, code)
|
RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
|
|
def clear(self, code=None):
|
if code:
|
self.del_watch_index(code)
|
if code in self.__l_down_after_by_big_order_dict:
|
self.__l_down_after_by_big_order_dict.pop(code)
|
if code in self.__l_down_after_by_big_order_weight_dict:
|
self.__l_down_after_by_big_order_weight_dict.pop(code)
|
if code in self.__real_place_order_index_dict:
|
self.__real_place_order_index_dict.pop(code)
|
RedisUtils.delete_async(self.__db, f"l_cancel_real_place_order_index-{code}")
|
if code in self.__cancel_l_down_after_place_order_index_cache:
|
self.__cancel_l_down_after_place_order_index_cache.pop(code)
|
RedisUtils.delete_async(self.__db, f"l_cancel_down_after_place_order_index-{code}")
|
else:
|
keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index_info-*")
|
for k in keys:
|
code = k.replace("l_cancel_watch_index_info-", "")
|
if code in self.__last_trade_progress_dict:
|
self.__last_trade_progress_dict.pop(code)
|
if code in self.__real_place_order_index_dict:
|
self.__real_place_order_index_dict.pop(code)
|
self.del_watch_index(code)
|
keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_real_place_order_index-*")
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
# 清除L后真实下单位置之后囊括的索引
|
self.__cancel_l_down_after_place_order_index_cache.clear()
|
keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_down_after_place_order_index-*")
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
self.__l_down_after_by_big_order_dict.clear()
|
self.__l_down_after_by_big_order_weight_dict.clear()
|
|
# 重新计算L上
|
|
# big_sell_info卖单信息,格式:[最近成交索引,最近成交时间(带毫秒)]
|
def re_compute_l_down_watch_indexes(self, code, big_sell_info=None):
|
watch_index_info = self.__cancel_watch_index_info_cache.get(code)
|
if not watch_index_info or watch_index_info[1] > 0:
|
return
|
# 获取成交进度位与真实下单位置
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
last_trade_progress_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if big_sell_info:
|
last_trade_progress_index = big_sell_info[0]
|
|
if not real_place_order_index_info or last_trade_progress_index is None:
|
return
|
min_cancel_time_with_ms = None
|
if big_sell_info:
|
min_cancel_time_with_ms = big_sell_info[1]
|
self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1,
|
real_place_order_index_info[0],
|
re_compute=1, min_cancel_time_with_ms=min_cancel_time_with_ms,
|
msg=f"大单卖: 成交进度-{big_sell_info}" if big_sell_info is not None else '')
|
|
# 计算观察索引,倒序计算
|
# re_compute:是否是重新计算的
|
# min_cancel_time_with_ms:最小撤单时间,大于等于此时间的撤单需要囊括进去
|
def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0,
|
min_cancel_time_with_ms=None, msg=""):
|
try:
|
l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
|
total_datas = local_today_datas.get(code)
|
if re_compute > 0 and tool.trade_time_sub(total_datas[-1]["val"]["time"],
|
total_datas[buy_single_index]["val"][
|
"time"]) < 2 * 60 and min_cancel_time_with_ms is None:
|
# 封单额稳了以后,间隔超过2分钟才能重新计算
|
l2_log.l_cancel_debug(code, f"要间隔2分钟过后才能重新计算")
|
return
|
if total_datas:
|
# 计算的上截至位距离下截至位纯买额要小于2.5倍m值
|
# thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
# thresh_hold_money = int(thresh_hold_money * 2.5)
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
# 统计净涨停买的数量
|
not_cancel_indexes_with_num = []
|
re_start_index = start_index
|
MAX_COUNT = 5
|
for j in range(start_index, end_index):
|
data = total_datas[j]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
|
if val["num"] < min_num:
|
continue
|
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if not cancel_data:
|
not_cancel_indexes_with_num.append((j, val["num"]))
|
elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
|
L2DataUtil.get_time_with_ms(
|
cancel_data['val'])) <= 0:
|
not_cancel_indexes_with_num.append((j, val["num"]))
|
|
min_num = 0
|
if not_cancel_indexes_with_num:
|
temp_count = len(not_cancel_indexes_with_num)
|
# 取后1/5的数据
|
if temp_count >= 30:
|
temp_index = int(temp_count * 4 / 5)
|
if tool.is_sh_code(code): # 上证取后3/10
|
temp_index = int(temp_count * 7 / 10)
|
re_start_index = not_cancel_indexes_with_num[temp_index][0]
|
MAX_COUNT = len(not_cancel_indexes_with_num[temp_index:])
|
else:
|
not_cancel_indexes_with_num.sort(key=lambda x: x[1])
|
if temp_count >= 10:
|
# 获取中位数
|
min_num = not_cancel_indexes_with_num[temp_count // 2][1]
|
MIN_MONEYS = [300, 200, 100, 50]
|
watch_indexes = set()
|
for min_money in MIN_MONEYS:
|
for i in range(end_index, re_start_index - 1, -1):
|
try:
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < min_money * 100:
|
continue
|
|
if val['num'] < min_num:
|
continue
|
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if not cancel_data:
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
|
elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
|
L2DataUtil.get_time_with_ms(
|
cancel_data['val'])) <= 0:
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
except Exception as e:
|
logger_l2_l_cancel.error(f"{code}: 范围: {start_index}-{end_index} 位置:{i}")
|
logger_l2_l_cancel.exception(e)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
if watch_indexes:
|
##判断监听的数据中是否有大单##
|
has_big_num = False
|
for i in watch_indexes:
|
# 是否有大单
|
data = total_datas[i]
|
val = data['val']
|
if float(val['price']) * val['num'] > 100 * 100:
|
has_big_num = True
|
break
|
if not has_big_num:
|
# 无大单,需要找大单
|
for i in range(re_start_index, start_index, -1):
|
data = total_datas[i]
|
val = data['val']
|
# 涨停买,且未撤单
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < 100 * 100:
|
continue
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if not cancel_data:
|
watch_indexes.add(i)
|
break
|
|
elif min_cancel_time_with_ms and tool.compare_time_with_ms(min_cancel_time_with_ms,
|
L2DataUtil.get_time_with_ms(
|
cancel_data['val'])) <= 0:
|
watch_indexes.add(i)
|
break
|
# 获取真实下单位后面10笔大单
|
watch_indexes_after = self.__compute_l_down_watch_index_after_real_place_order_index(code)
|
if watch_indexes_after:
|
watch_indexes |= watch_indexes_after
|
self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes)
|
l2_log.l_cancel_debug(code,
|
f"设置监听范围({msg}){'(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
|
except Exception as e:
|
l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}")
|
async_log_util.exception(logger_l2_l_cancel, e)
|
|
def __need_update_l_down_after(self, code, start_index, end_index):
|
"""
|
是否需要更新l后真实下单位置之后的囊括范围
|
@param code:
|
@return:
|
"""
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
if not real_place_order_info or real_place_order_info[1]:
|
return False
|
# 判断L后是否包含后面的数据
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
# 没有囊括
|
return False
|
watch_indexes = set([int(i) for i in watch_indexes_info[2]])
|
real_place_order_index = real_place_order_info[0]
|
for index in watch_indexes:
|
if index > real_place_order_index:
|
# L后后段已经囊括
|
return False
|
# 下单位置之后数10笔卖单
|
watch_indexes = set()
|
total_datas = local_today_datas.get(code)
|
MIN_NUM = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
MAX_COUNT = 10
|
for i in range(real_place_order_index + 1, end_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val['num'] < MIN_NUM:
|
continue
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
# 看里面的撤单率是否
|
if len(watch_indexes) < MAX_COUNT:
|
# 数量不够
|
return False
|
# 计算撤单比例是否足够
|
canceled_buyno_map = local_today_canceled_buyno_map.get(code)
|
cancel_count = 0
|
for index in watch_indexes:
|
# 是否撤单
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
|
code, index,
|
total_datas,
|
canceled_buyno_map)
|
if left_count <= 0:
|
cancel_count += 1
|
if cancel_count > len(watch_indexes) * 0.5:
|
return True
|
return False
|
|
def __compute_l_down_watch_index_after_real_place_order_index(self, code):
|
"""
|
计算L后真实下单位置之后的监控索引
|
@return:
|
"""
|
watch_indexes = set()
|
total_datas = local_today_datas.get(code)
|
is_ge_code = tool.is_ge_code(code)
|
try:
|
# 真实下单位置后面的数据就只看大单
|
MIN_NUM = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
if real_place_order_info and not real_place_order_info[1]:
|
# 从真实下单位往后找
|
after_count = 0
|
for i in range(real_place_order_info[0] + 1, total_datas[-1]['index'] + 1):
|
if after_count > 10:
|
break
|
data = total_datas[i]
|
val = data['val']
|
# 涨停买,且未撤单
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if val['num'] < MIN_NUM:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
|
code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
after_count += 1
|
if l2_data_util.is_big_money(val, is_ge_code):
|
watch_indexes.add(i)
|
# 记录索引的位置
|
self.__set_cancel_l_down_after_place_order_index(code, i, after_count - 1)
|
except Exception as e:
|
pass
|
return watch_indexes
|
|
def __compute_l_down_watch_index_after_by(self, code):
|
"""
|
计算L后后半段大单备用
|
@param code:
|
@return:
|
"""
|
if self.__l_down_after_by_big_order_dict.get(code):
|
# 不用重复计算
|
return
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
if not real_place_order_info or real_place_order_info[1]:
|
return
|
real_place_order_index = real_place_order_info[0]
|
total_datas = local_today_datas.get(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
limit_up_price = round(float(limit_up_price), 2)
|
bigger_num = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) // (limit_up_price * 100)
|
min_num = 500000 // (limit_up_price * 100)
|
index = -1
|
watch_indexes = set()
|
if code not in self.__l_down_after_by_big_order_weight_dict:
|
self.__l_down_after_by_big_order_weight_dict[code] = {}
|
for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
|
if val["num"] < bigger_num:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
if val["num"] >= bigger_num:
|
if index < 0:
|
# 开始计数
|
index = 0
|
# 加入大单监听
|
self.__l_down_after_by_big_order_weight_dict[code][str(i)] = int(str(index))
|
watch_indexes.add(i)
|
if index >= 0:
|
index += 1
|
if index > 10:
|
break
|
self.__l_down_after_by_big_order_dict[code] = watch_indexes
|
l2_log.l_cancel_debug(code, f"L后后半段大单备用囊括范围:{watch_indexes}")
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index, buy_single_index=None, is_default=False):
|
l2_log.l_cancel_debug(code, f"设置真实下单位-{index},buy_single_index-{buy_single_index}")
|
self.__real_place_order_index_dict[code] = (index, is_default)
|
RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(),
|
json.dumps((index, is_default)))
|
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
trade_index = 0
|
|
if buy_single_index is not None:
|
# L后撤从成交进度位开始计算
|
self.compute_watch_index(code, buy_single_index, trade_index, index)
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index, trade_index, index)
|
|
# 重新计算L前
|
def re_compute_l_up_watch_indexes(self, code, buy_single_index):
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
return
|
if code not in self.__real_place_order_index_dict:
|
return
|
if code not in self.__last_l_up_compute_info or time.time() - self.__last_l_up_compute_info[code][0] >= 3:
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index,
|
trade_index + 1,
|
self.__real_place_order_index_dict.get(code)[0])
|
|
# 计算范围内的成交位临近未撤大单
|
def __compute_trade_progress_near_by_indexes(self, code, buy_single_index, start_index, end_index):
|
if start_index is None or end_index is None:
|
return
|
total_datas = local_today_datas.get(code)
|
MIN_MONEY = 99 * 100
|
MAX_COUNT = 15
|
watch_indexes = set()
|
total_num = 0
|
# thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
# threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
|
for i in range(start_index, end_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < MIN_MONEY:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_num += val['num'] * left_count
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
|
changed = True
|
l_up_compute_info = self.__last_l_up_compute_info.get(code)
|
if l_up_compute_info:
|
if l_up_compute_info[1] == watch_indexes:
|
changed = False
|
# 保存数据
|
if changed:
|
threshold_time = 1 if tool.is_sz_code(code) else 2
|
if l_up_compute_info and l_up_compute_info[1]:
|
if time.time() - l_up_compute_info[0] < threshold_time:
|
l2_log.l_cancel_debug(code, f"L前监控更新太频繁:{threshold_time}")
|
return
|
l2_log.l_cancel_debug(code, f"L前监控范围:{watch_indexes} 计算范围:{start_index}-{end_index}")
|
self.__set_near_by_trade_progress_indexes(code, buy_single_index, watch_indexes)
|
self.__last_l_up_compute_info[code] = (time.time(), watch_indexes)
|
|
# 计算L后还没成交的手数
|
def __compute_total_l_down_not_deal_num(self, code):
|
# 只有真实获取到下单位置后才开始计算
|
|
try:
|
if code in self.__total_l_down_not_deal_num_dict and time.time() - \
|
self.__total_l_down_not_deal_num_dict[code][
|
1] < 1:
|
# 需要间隔1s更新一次
|
return
|
l_down_cancel_info = self.__cancel_watch_index_info_cache.get(code)
|
if not l_down_cancel_info:
|
return
|
|
trade_progress, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_progress is None:
|
trade_progress = 0
|
|
if trade_progress is None:
|
return
|
|
l_down_indexes = l_down_cancel_info[2]
|
# 统计还未成交的数据
|
total_left_num = 0
|
total_datas = local_today_datas.get(code)
|
for i in l_down_indexes:
|
# 已经成交了的不计算
|
if i < trade_progress:
|
continue
|
data = total_datas[i]
|
val = data['val']
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
fnum = val["num"]
|
if i == trade_progress:
|
# 需要减去已经成交的数据
|
dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code)
|
if dealing_info:
|
if str(val["orderNo"]) == str(dealing_info[0]):
|
fnum -= dealing_info[1] // 100
|
total_left_num += fnum
|
self.__total_l_down_not_deal_num_dict[code] = (total_left_num, time.time())
|
except Exception as e:
|
async_log_util.exception(logger_l2_l_cancel, e)
|
|
# 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
|
def set_trade_progress(self, code, buy_single_index, index, total_datas):
|
if self.__last_trade_progress_dict.get(code) == index:
|
self.__compute_total_l_down_not_deal_num(code)
|
return
|
self.__last_trade_progress_dict[code] = index
|
self.__compute_total_l_down_not_deal_num(code)
|
|
if total_datas is None:
|
return
|
|
if not self.__is_l_down_can_cancel(code, buy_single_index):
|
# L后已经不能守护
|
l2_log.l_cancel_debug(code, f"L后已经无法生效:buy_single_index-{buy_single_index}")
|
|
HourCancelBigNumComputer().start_compute_watch_indexes(code, buy_single_index)
|
try:
|
# 计算L后后半段大单监控范围
|
self.__compute_l_down_watch_index_after_by(code)
|
except Exception as e:
|
l2_log.l_cancel_debug(code, "__compute_l_down_watch_index_after_by出错")
|
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
real_place_order_index = None
|
if real_place_order_index_info:
|
real_place_order_index = real_place_order_index_info[0]
|
|
# 重新计算成交位置临近大单撤单
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index, index + 1, real_place_order_index)
|
|
# 已经成交的索引
|
def add_deal_index(self, code, index, buy_single_index):
|
"""
|
L后囊括范围成交一笔重新囊括1笔
|
@param code:
|
@param index:
|
@param buy_single_index:
|
@return:
|
"""
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return
|
watch_indexes = watch_indexes_info[2]
|
if index not in watch_indexes:
|
return
|
if buy_single_index is None:
|
return
|
# 重新囊括1笔
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
is_ge_code = tool.is_ge_code(code)
|
if real_place_order_info and real_place_order_info[0] > index:
|
total_datas = local_today_datas.get(code)
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
for j in range(real_place_order_info[0] - 1, index, -1):
|
data = total_datas[j]
|
val = data['val']
|
if data["index"] in watch_indexes:
|
continue
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(data["index"])
|
l2_log.l_cancel_debug(code, f"L后有成交重新囊括:成交索引-{index} 囊括索引-{data['index']}")
|
break
|
# 从真实下单位置往后囊括大单
|
try:
|
left_count_after = 0
|
for j in range(real_place_order_info[0] + 1, total_datas[-1]["index"]):
|
data = total_datas[j]
|
val = data['val']
|
if left_count_after > 10:
|
break
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
left_count_after += 1
|
if l2_data_util.is_big_money(val, is_ge_code) and j not in watch_indexes:
|
watch_indexes.add(j)
|
l2_log.l_cancel_debug(code, f"L后有成交后半段增加囊括:{j}")
|
self.__set_cancel_l_down_after_place_order_index(code, j, left_count_after - 1)
|
break
|
except:
|
pass
|
|
self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes)
|
|
def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
|
"""
|
L后撤单;
|
撤单计算规则:计算撤单比例时,将真实下单位置之后的数据按权重(离下单位最近的权重越大)加入分子,不加入分母(总囊括手数)计算
|
@param code:
|
@param buy_exec_index:
|
@param start_index:
|
@param end_index:
|
@param total_data:
|
@param is_first_code:
|
@return:
|
"""
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return False, None
|
|
# 这是下单位置之后的索引: key为字符串
|
after_place_order_index_dict = self.__get_cancel_l_down_after_place_order_index_dict(code)
|
if after_place_order_index_dict is None:
|
after_place_order_index_dict = {}
|
after_place_order_index_by_dict = self.__l_down_after_by_big_order_weight_dict.get(code)
|
if after_place_order_index_by_dict is None:
|
after_place_order_index_by_dict = {}
|
|
|
watch_indexes = set([int(i) for i in watch_indexes_info[2]])
|
try:
|
# 将备用订单加进去
|
watch_indexes_by = self.__l_down_after_by_big_order_dict.get(code)
|
if watch_indexes_by:
|
# 是否是下单30分钟内
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
if real_place_order_info and tool.trade_time_sub(total_data[-1]["val"]["time"],
|
total_data[real_place_order_info[0]]["val"][
|
"time"]) < 30 * 60:
|
# 下单30分钟内有效
|
watch_indexes |= watch_indexes_by
|
else:
|
# 清除备用大单
|
watch_indexes_by.clear()
|
except Exception as e:
|
l2_log.l_cancel_debug(code, "将L2后后半段备用大单加入计算出错:{}", str(e))
|
# 计算监听的总条数
|
total_num = 0
|
max_num = 0
|
thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_data[buy_exec_index]["val"]["time"])
|
|
for wi in watch_indexes:
|
if str(wi) in after_place_order_index_dict:
|
# 如果加红就需要计算分母
|
if must_buy:
|
total_num += total_data[wi]["val"]["num"] * (
|
10 - after_place_order_index_dict[str(wi)]) // 10
|
continue
|
elif str(wi) in after_place_order_index_by_dict:
|
if must_buy:
|
total_num += total_data[wi]["val"]["num"] * (
|
10 - after_place_order_index_by_dict[str(wi)]) // 10
|
continue
|
|
|
total_num += total_data[wi]["val"]["num"] * total_data[wi]["re"]
|
if total_data[wi]["val"]["num"] > max_num:
|
max_num = total_data[wi]["val"]["num"]
|
# 判断撤单中是否有监听中的索引
|
need_compute = False
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_index in watch_indexes:
|
need_compute = True
|
break
|
if need_compute:
|
|
# 计算撤单比例
|
watch_indexes_list = list(watch_indexes)
|
watch_indexes_list.sort()
|
canceled_num = 0
|
# 记录撤单索引
|
canceled_indexes = []
|
for wi in watch_indexes:
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
wi,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if cancel_data:
|
if str(wi) in after_place_order_index_dict:
|
# 真实下单位置之后的按照权重比例来计算
|
canceled_num += total_data[wi]["val"]["num"] * (
|
10 - after_place_order_index_dict[str(wi)]) // 10
|
elif str(wi) in after_place_order_index_by_dict:
|
canceled_num += total_data[wi]["val"]["num"] * (
|
10 - after_place_order_index_by_dict[str(wi)]) // 10
|
else:
|
canceled_num += total_data[wi]["val"]["num"]
|
|
canceled_indexes.append(cancel_data["index"])
|
# if wi == watch_indexes_list[-1] and left_count == 0:
|
# # 离下单位置最近的一个撤单,必须触发撤单
|
# l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},临近撤单:{wi}")
|
# return True, total_data[-1]
|
|
rate = round(canceled_num / total_num, 3)
|
|
# 除开最大单的影响权重
|
if not must_buy:
|
temp_thresh_hold_rate = round((total_num - max_num) * 0.9 / total_num, 2)
|
thresh_hold_rate = min(thresh_hold_rate, temp_thresh_hold_rate)
|
l2_log.l_cancel_debug(code,
|
f"L后计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}, 下单位之后的索引:{after_place_order_index_dict}")
|
if rate >= thresh_hold_rate:
|
canceled_indexes.sort()
|
l2_log.l_cancel_debug(code, f"L后撤单,撤单位置:{canceled_indexes[-1]}")
|
return True, total_data[canceled_indexes[-1]]
|
|
return False, None
|
|
def __compute_near_by_trade_progress_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data,
|
is_first_code):
|
# L前守护时间为3分钟
|
if tool.trade_time_sub(total_data[-1]['val']['time'],
|
total_data[buy_exec_index]['val']['time']) > constant.L_CANCEL_UP_EXPIRE_TIME:
|
return False, None
|
|
watch_indexes = self.__get_near_by_trade_progress_indexes_cache(code)
|
if not watch_indexes:
|
return False, None
|
|
# 监听范围小于5笔不生效
|
if len(watch_indexes) < 5:
|
return False, None
|
|
# 计算监听的总条数
|
# 权重
|
WATCH_INDEX_WEIGHTS = [3, 2, 1]
|
total_count_weight = 0
|
for wi in range(0, len(watch_indexes)):
|
if wi < len(WATCH_INDEX_WEIGHTS):
|
total_count_weight += WATCH_INDEX_WEIGHTS[wi]
|
else:
|
total_count_weight += WATCH_INDEX_WEIGHTS[-1]
|
# 判断撤单中是否有监听中的索引
|
need_compute = False
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_index in watch_indexes:
|
need_compute = True
|
break
|
if need_compute:
|
watch_indexes_list = list(watch_indexes)
|
watch_indexes_list.sort()
|
# 计算撤单比例
|
canceled_count_weight = 0
|
canceled_indexes = []
|
for wi in watch_indexes:
|
canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
wi,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if canceled_data:
|
canceled_indexes.append(canceled_data["index"])
|
# 获取索引权重
|
pos_index = watch_indexes_list.index(wi)
|
if pos_index < len(WATCH_INDEX_WEIGHTS):
|
canceled_count_weight += WATCH_INDEX_WEIGHTS[pos_index]
|
else:
|
canceled_count_weight += WATCH_INDEX_WEIGHTS[-1]
|
rate = round(canceled_count_weight / total_count_weight, 3)
|
thresh_cancel_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_data[buy_exec_index]["val"]["time"],
|
is_up=True)
|
l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},L前已撤单比例:{rate}/{thresh_cancel_rate}")
|
if rate >= thresh_cancel_rate:
|
# 计算成交进度位置到当前下单位置的纯买额
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
trade_progress_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_progress_index is None:
|
trade_progress_index = 0
|
if real_place_order_index_info and trade_progress_index:
|
total_num = 0
|
thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
thresh_hold_money = thresh_hold_money * 3
|
# 阈值为2倍m值
|
thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
|
for i in range(trade_progress_index + 1, real_place_order_index_info[0]):
|
data = total_data[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
i,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if not canceled_data:
|
# 没有撤单
|
total_num += val["num"] * data["re"]
|
if total_num > thresh_hold_num:
|
# 成交位到下单位还有足够的单没撤
|
l2_log.l_cancel_debug(code,
|
f"L上撤阻断: 成交位-{trade_progress_index} 真实下单位-{real_place_order_index_info[0]} 阈值-{thresh_hold_money}")
|
return False, None
|
|
canceled_indexes.sort()
|
l2_log.l_cancel_debug(code, f"L上撤单,撤单位置:{canceled_indexes[-1]}")
|
return True, total_data[canceled_indexes[-1]]
|
|
return False, None
|
|
# L后是否还有可能撤单
|
def __is_l_down_can_cancel(self, code, buy_exec_index):
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return True
|
trade_index, is_default = TradeBuyQueue().get_traded_index(code)
|
if trade_index is None:
|
trade_index = 0
|
if trade_index is None:
|
return True
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
if not real_place_order_index_info:
|
return True
|
|
# 计算已经成交的比例
|
total_datas = local_today_datas.get(code)
|
total_deal_nums = 0
|
total_nums = 1
|
for index in watch_indexes_info[2]:
|
# 不能计算L后后半段
|
if index > real_place_order_index_info[0]:
|
continue
|
data = total_datas[index]
|
val = data["val"]
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
total_nums += val["num"]
|
if left_count > 0 and index < trade_index:
|
total_deal_nums += val["num"]
|
thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_datas[buy_exec_index]["val"]["time"])
|
if total_deal_nums / total_nums > 1 - thresh_hold_rate - 0.05:
|
return False
|
return True
|
|
def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
|
if buy_exec_index is None:
|
return False, None, "尚未找到下单位置"
|
# 守护S撤以外的数据
|
if int(tool.get_now_time_str().replace(":", "")) > int("145700") and not constant.TEST:
|
return False, None, ""
|
# 下单位临近撤
|
can_cancel, cancel_data = False, None
|
try:
|
can_cancel, cancel_data = self.__compute_need_cancel(code, buy_exec_index, start_index, end_index,
|
total_data,
|
is_first_code)
|
except Exception as e:
|
logger_l2_l_cancel.exception(e)
|
raise e
|
extra_msg = "L后"
|
if not can_cancel:
|
# 成交位临近撤
|
try:
|
can_cancel, cancel_data = self.__compute_near_by_trade_progress_need_cancel(code, buy_exec_index,
|
start_index, end_index,
|
total_data,
|
is_first_code)
|
extra_msg = "L前"
|
except Exception as e:
|
logger_l2_l_cancel.exception(e)
|
raise e
|
|
try:
|
if self.__need_update_l_down_after(code, start_index, end_index):
|
# 更新后半段
|
watch_indexes = self.__compute_l_down_watch_index_after_real_place_order_index(code)
|
if watch_indexes:
|
l2_log.l_cancel_debug(code, "L后后半段囊括:{}", watch_indexes)
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if watch_indexes_info and watch_indexes_info[2]:
|
# 没有囊括
|
watch_indexes |= set(watch_indexes_info[2])
|
self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes)
|
except Exception as e:
|
l2_log.l_cancel_debug(code, "L后后半段计算出错:{}", str(e))
|
|
return can_cancel, cancel_data, extra_msg
|
|
def place_order_success(self, code):
|
self.clear(code)
|
|
def cancel_success(self, code):
|
self.clear(code)
|
|
def test(self):
|
code = "000333"
|
self.__set_cancel_l_down_after_place_order_index(code, 121, 0)
|
time.sleep(6)
|
self.__set_cancel_l_down_after_place_order_index(code, 121, 0),
|
time.sleep(6)
|
self.__set_cancel_l_down_after_place_order_index(code, 122, 1)
|
|
print(self.__get_cancel_l_down_after_place_order_index_dict(code))
|
|
|
# --------------------------------H撤-------------------------------
|
class HourCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__tradeBuyQueue = TradeBuyQueue()
|
__SCancelBigNumComputer = SCancelBigNumComputer()
|
|
# 计算触发位置
|
__start_compute_index_dict = {}
|
|
# 成交位置
|
__transaction_progress_index_dict = {}
|
# 成交位置更新时间
|
__transaction_progress_index_updatetime_dict = {}
|
# 缓存
|
__cancel_watch_indexs_cache = {}
|
|
# L撤触发的代码
|
__l_cancel_triggered_codes = set()
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
if val:
|
val = set(val)
|
else:
|
val = set()
|
CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "h_cancel_transaction_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = int(val)
|
CodeDataCacheUtil.set_cache(cls.__transaction_progress_index_dict, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存成交位置到执行位置的揽括范围数据
|
def __save_watch_index_set(self, code, buy_single_index, indexes):
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H,
|
buy_single_index,
|
list(indexes)))
|
CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, indexes)
|
key = f"h_cancel_watch_indexs-{code}"
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(),
|
json.dumps(list(indexes)))
|
|
# 保存成交进度
|
def __get_watch_index_set(self, code):
|
key = f"h_cancel_watch_indexs-{code}"
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None
|
val = json.loads(val)
|
return val
|
|
def __get_watch_index_set_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return set()
|
|
def __save_transaction_index(self, code, index):
|
CodeDataCacheUtil.set_cache(self.__transaction_progress_index_dict, code, index)
|
key = f"h_cancel_transaction_index-{code}"
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), index)
|
|
def __clear_data(self, code):
|
CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__transaction_progress_index_dict, code)
|
CodeDataCacheUtil.clear_cache(self.__start_compute_index_dict, code)
|
self.__l_cancel_triggered_codes.discard(code)
|
ks = [f"h_cancel_watch_indexs-{code}", f"h_cancel_transaction_index-{code}"]
|
for key in ks:
|
RedisUtils.delete_async(self.__db, key)
|
|
# 计算观察索引,倒序计算
|
|
def __compute_watch_index(self, code, buy_single_index):
|
"""
|
@param code:
|
@param buy_single_index:
|
@return:
|
"""
|
if buy_single_index is None:
|
return
|
if self.__cancel_watch_indexs_cache.get(code):
|
return
|
real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code)
|
if not real_place_order_index:
|
l2_log.h_cancel_debug(code, "尚未找到真实下单位置")
|
return
|
start_compute_index = self.__start_compute_index_dict.get(code)
|
if not start_compute_index:
|
l2_log.h_cancel_debug(code, "尚未找到开始计算位置")
|
return
|
transaction_index = self.__transaction_progress_index_dict.get(code)
|
if transaction_index:
|
# 不能计算成交进度以前的数据
|
start_compute_index = transaction_index + 1 # max(transaction_index + 1, start_compute_index)
|
total_datas = local_today_datas.get(code)
|
|
# h撤计算必须超过3分钟
|
if tool.trade_time_sub(total_datas[-1]["val"]["time"],
|
total_datas[real_place_order_index]["val"]["time"]) < 180:
|
l2_log.h_cancel_debug(code, "180s内囊括计算H撤")
|
return
|
# -----------------计算H上-------------------
|
watch_indexes_up = set()
|
for i in range(real_place_order_index - 1, start_compute_index + 1, -1):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < 50 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes_up.add(i)
|
if len(watch_indexes_up) >= 3:
|
# 最多取3笔
|
break
|
|
# ------------------计算H下-----------------------
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if not limit_up_price:
|
return
|
# 计算结束位置
|
total_num = 0
|
# 获取m值数据
|
thresh_hold_money = Buy1PriceManager().get_latest_buy1_money(code)
|
# 封单额的1/10
|
thresh_hold_money = thresh_hold_money / 10
|
thresh_hold_num = thresh_hold_money // (float(limit_up_price) * 100)
|
end_index = real_place_order_index + 1
|
watch_indexes = set()
|
for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
|
# 看是否撤单
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 50 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
total_num += left_count * val["num"]
|
count = len(watch_indexes)
|
# 最小5笔,最大10笔
|
if (total_num > thresh_hold_num and count >= 5) or count >= 10:
|
break
|
if watch_indexes or watch_indexes_up:
|
watch_indexes |= watch_indexes_up
|
self.__save_watch_index_set(code, buy_single_index, watch_indexes)
|
l2_log.h_cancel_debug(code, f"设置监听范围, 数据范围:{real_place_order_index}-{end_index} 监听范围-{watch_indexes}")
|
# 设置真实下单位置
|
|
def __need_compute_watch_indexes(self, code, transaction_index):
|
"""
|
1.成交进度位距离真实下单位置<=5笔触发囊括
|
2.成交到(下单那一刻的成交进度位)到(真实下单位置)的后1/10处也要触发囊括
|
3.L撤无法生效触发囊括
|
@param code:
|
@param transaction_index:
|
@return:
|
"""
|
|
start_compute_index = self.__start_compute_index_dict.get(code)
|
if start_compute_index is None:
|
return False
|
real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code)
|
total_datas = local_today_datas.get(code)
|
if real_place_order_index and real_place_order_index > transaction_index:
|
# 成交位置离我们真实下单的位置只有5笔没撤的大单就需要计算H撤的囊括范围了
|
total_left_count = 0
|
for index in range(transaction_index + 1, real_place_order_index):
|
data = total_datas[index]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_left_count += 1
|
if total_left_count > 5:
|
break
|
if total_left_count <= 5:
|
return True
|
|
# 成交位到开始计算位置没有买单之后
|
for index in range(transaction_index + 1, start_compute_index):
|
data = total_datas[index]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
# 中间还有未撤的买单
|
return False
|
return True
|
|
# 设置成交进度
|
def set_transaction_index(self, code, buy_single_index, index):
|
try:
|
# 每3s或者成交进度变化就囊括一次
|
if index == self.__transaction_progress_index_dict.get(code):
|
if code in self.__transaction_progress_index_updatetime_dict and time.time() - self.__transaction_progress_index_updatetime_dict.get(
|
code) < 3:
|
return
|
self.__transaction_progress_index_dict[code] = index
|
self.__transaction_progress_index_updatetime_dict[code] = time.time()
|
if self.__need_compute_watch_indexes(code, index):
|
self.__compute_watch_index(code, buy_single_index)
|
except Exception as e:
|
l2_log.h_cancel_debug(code, "设置成交进度位置出错:{}", str(e))
|
async_log_util.exception(logger_l2_h_cancel, e)
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index, buy_single_index):
|
if buy_single_index is None:
|
return
|
try:
|
# 计算触发位置
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
# 统计净涨停买的数量
|
not_cancel_indexes = []
|
total_datas = local_today_datas.get(code)
|
for j in range(buy_single_index, index):
|
data = total_datas[j]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
not_cancel_indexes.append(j)
|
if not_cancel_indexes:
|
temp_count = len(not_cancel_indexes)
|
temp_index = int(temp_count * 9 / 10)
|
if temp_index + 1 >= temp_count:
|
temp_index = temp_count - 1
|
self.__start_compute_index_dict[code] = not_cancel_indexes[temp_index]
|
except Exception as e:
|
async_log_util.exception(logger_l2_h_cancel, e)
|
l2_log.h_cancel_debug(code, "设置真实下单位置出错:{}", str(e))
|
|
def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
|
buy_volume_index, volume_index,
|
is_first_code):
|
if buy_single_index is None or buy_exec_index is None:
|
return False, "尚未找到下单位置"
|
|
if int(tool.get_now_time_str().replace(":", "")) > int("145000"):
|
return False, None
|
# 设置成交进度
|
if code not in self.__transaction_progress_index_dict:
|
return False, "没找到成交进度"
|
watch_index_set = self.__get_watch_index_set_cache(code)
|
if not watch_index_set:
|
# 是否有涨停买撤
|
need_compute = False
|
# 有涨停买撤才会计算位置
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data['val']
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
need_compute = True
|
break
|
if need_compute:
|
if self.__need_compute_watch_indexes(code, self.__transaction_progress_index_dict.get(code)):
|
self.__compute_watch_index(code, buy_single_index)
|
watch_index_set = self.__get_watch_index_set_cache(code)
|
if not watch_index_set:
|
return False, "没有监听索引"
|
l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
if watch_index_set:
|
cancel_num = 0
|
total_num = 0
|
for i in watch_index_set:
|
if i is None:
|
l2_log.h_cancel_debug(code, f"空值:{watch_index_set}")
|
continue
|
data = total_data[i]
|
val = data['val']
|
total_num += val['num'] * data['re']
|
# 判断是否撤单
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
cancel_num += val['num'] * (data['re'] - left_count)
|
rate = round(cancel_num / total_num, 4)
|
threshold_rate = constant.H_CANCEL_RATE_WITH_LDOWN_CANT_INVALID if code in self.__l_cancel_triggered_codes else constant.H_CANCEL_RATE
|
try:
|
must_buy = gpcode_manager.MustBuyCodesManager().is_in_cache(code)
|
if must_buy:
|
threshold_rate = constant.H_CANCEL_RATE_WITH_MUST_BUY
|
except Exception as e:
|
async_log_util.error(logger_l2_h_cancel, str(e))
|
if rate >= threshold_rate:
|
l2_log.h_cancel_debug(code, f"撤单比例:{rate}")
|
return True, total_data[-1]
|
return False, None
|
|
# 下单成功
|
def place_order_success(self, code, buy_single_index, buy_exec_index, total_data):
|
self.__clear_data(code)
|
|
# 获取H撤监听的数据索引范围
|
# 返回监听范围与已撤单索引
|
def get_watch_index_dict(self, code):
|
return {}, set()
|
|
def start_compute_watch_indexes(self, code, buy_single_index):
|
"""
|
L后撤无法生效触发的囊括
|
@param code:
|
@param buy_single_index:
|
@return:
|
"""
|
self.__l_cancel_triggered_codes.add(code)
|
self.__compute_watch_index(code, buy_single_index)
|