"""
|
L2撤单策略
|
"""
|
# ---------------------------------S撤-------------------------------
|
|
# s级平均大单计算
|
# 计算范围到申报时间的那一秒
|
import json
|
import logging
|
import random
|
import time
|
|
import constant
|
from code_attribute import big_money_num_manager, gpcode_manager
|
import l2_data_util
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from l2.code_price_manager import Buy1PriceManager
|
from l2.huaxin import l2_huaxin_util
|
from l2.l2_data_manager import OrderBeginPosInfo
|
from l2.l2_sell_manager import L2LimitUpSellManager
|
from l2.l2_transaction_data_manager import HuaXinTransactionDataManager
|
from log_module import async_log_util
|
from trade.deal_big_money_manager import DealOrderNoManager
|
from utils import tool
|
from l2.transaction_progress import TradeBuyQueue
|
from trade import trade_queue_manager, l2_trade_factor, trade_record_log_util
|
from l2 import l2_log, l2_data_source_util
|
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas, local_today_buyno_map, \
|
local_today_canceled_buyno_map
|
from log_module.log import logger_buy_1_volumn, logger_l2_l_cancel, logger_l2_h_cancel
|
from utils.tool import CodeDataCacheUtil
|
|
|
def set_real_place_position(code, index, buy_single_index=None, is_default=True):
|
# DCancelBigNumComputer().set_real_order_index(code, index)
|
SecondCancelBigNumComputer().set_real_place_order_index(code, index)
|
LCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index=buy_single_index,
|
is_default=is_default)
|
HourCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index)
|
GCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index, is_default)
|
FCancelBigNumComputer().set_real_order_index(code, index, is_default)
|
|
|
class SecondCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__sCancelParamsManager = l2_trade_factor.SCancelParamsManager
|
__s_big_num_cancel_compute_data_cache = {}
|
__s_cancel_real_place_order_index_cache = {}
|
# 成交位置
|
__s_cancel_transaction_index_cache = {}
|
# H撤是否初始化数据,当真实下单位置与成交位到来时才进行赋值
|
__s_cancel_inited_data = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(SecondCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "s_big_num_cancel_compute_data-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "s_cancel_real_place_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = int(val)
|
tool.CodeDataCacheUtil.set_cache(cls.__s_cancel_real_place_order_index_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存结束位置
|
def __save_compute_data(self, code, process_index, buy_num, cancel_num):
|
CodeDataCacheUtil.set_cache(self.__s_big_num_cancel_compute_data_cache, code,
|
(process_index, buy_num, cancel_num))
|
key = "s_big_num_cancel_compute_data-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(),
|
json.dumps((process_index, buy_num, cancel_num)))
|
|
def __get_compute_data(self, code):
|
key = "s_big_num_cancel_compute_data-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return -1, 0, 0
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
def __get_compute_data_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__s_big_num_cancel_compute_data_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return -1, 0, 0
|
|
def __del_compute_data_cache(self, code):
|
CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
|
key = "s_big_num_cancel_compute_data-{}".format(code)
|
RedisUtils.delete_async(self.__db, key)
|
|
# 设置真实下单位置
|
def __save_real_place_order_index(self, code, index):
|
CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, index)
|
key = "s_cancel_real_place_order_index-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), index)
|
|
def __get_real_place_order_index(self, code):
|
key = "s_cancel_real_place_order_index-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None
|
return int(val)
|
|
def __get_real_place_order_index_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def get_real_place_order_index_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__s_cancel_real_place_order_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def __clear_data(self, code):
|
CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__s_cancel_real_place_order_index_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__s_cancel_transaction_index_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__s_cancel_inited_data, code)
|
ks = ["s_big_num_cancel_compute_data-{}".format(code), "s_cancel_real_place_order_index-{}".format(code)]
|
for key in ks:
|
RedisUtils.delete_async(self.__db, key)
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index):
|
self.__save_real_place_order_index(code, index)
|
|
# 设置成交进度位
|
def set_transaction_index(self, code, index):
|
self.__s_cancel_transaction_index_cache[code] = index
|
|
def clear_data(self):
|
ks = ["s_big_num_cancel_compute_data-*", "s_cancel_real_place_order_index-*"]
|
for key in ks:
|
keys = RedisUtils.keys(self.__get_redis(), key)
|
for k in keys:
|
code = k.split("-")[1]
|
self.__clear_data(code)
|
|
# 计算净大单
|
def __compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, volume_rate_index):
|
# 点火大单数量
|
fire_count = self.__sCancelParamsManager.get_max_exclude_count(volume_rate_index)
|
return self.compute_left_big_num(code, buy_single_index, start_index, end_index, total_data, fire_count,
|
constant.S_CANCEL_MIN_MONEY)
|
|
# 计算未撤的总手数
|
def compute_left_big_num(self, code, buy_single_index, start_index, end_index, total_data, fire_count, min_money_w):
|
# 获取大单的最小手数
|
left_big_num = 0
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
# 去除非大单
|
if val["num"] * float(val["price"]) <= min_money_w * 100:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
if i - buy_single_index < fire_count:
|
# 点火大单不算
|
left_big_num += 0
|
else:
|
left_big_num += val["num"] * data["re"]
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and start_index <= buy_index <= end_index:
|
if buy_index - buy_single_index < fire_count:
|
left_big_num -= 0
|
else:
|
left_big_num -= val["num"] * data["re"]
|
elif buy_index is None:
|
# 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只判断S级撤销,只有s级撤销才有可能相等
|
if max_space - min_space <= 1:
|
buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
|
if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
|
buy_time.replace(":", "")) <= int(
|
total_data[end_index]["val"]["time"].replace(":", "")):
|
left_big_num -= val["num"] * data["re"]
|
return left_big_num
|
|
def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, is_first_code,
|
buy_volume_rate_index,
|
volume_rate_index,
|
need_cancel=True):
|
|
if buy_single_index is None or buy_exec_index is None:
|
return False, "尚未找到下单位置"
|
|
# 只守护30s
|
buy_exec_time = total_data[buy_exec_index]["val"]["time"]
|
if tool.trade_time_sub(total_data[start_index]["val"]["time"],
|
buy_exec_time) > constant.S_CANCEL_EXPIRE_TIME:
|
return False, None
|
l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
l2_log.s_cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
real_place_order_index = self.__s_cancel_real_place_order_index_cache.get(code)
|
transaction_index = self.__s_cancel_transaction_index_cache.get(code)
|
|
if real_place_order_index and transaction_index:
|
# S撤计算范围:成交位-真实下单位
|
if not self.__s_cancel_inited_data.get(code):
|
l2_log.s_cancel_debug(code, "S撤初始化,成交位:{} 下单位:{}", transaction_index, real_place_order_index)
|
# 清除之前的计算数据
|
self.__s_cancel_inited_data[code] = True
|
self.__del_compute_data_cache(code)
|
# 计算未撤单的订单手数
|
left_big_num = 0
|
for i in range(transaction_index + 1, real_place_order_index):
|
data = total_data[i]
|
val = data["val"]
|
if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
|
continue
|
# 获取
|
if L2DataUtil.is_limit_up_price_buy(val):
|
left_big_num += val["num"] * data["re"]
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and transaction_index + 1 <= buy_index <= real_place_order_index and i < start_index:
|
left_big_num -= val["num"] * data["re"]
|
l2_log.s_cancel_debug(code, "S撤初始化结果,left_big_num:{}", left_big_num)
|
self.__save_compute_data(code, real_place_order_index, left_big_num, 0)
|
# 保存信息
|
process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
|
process_index = process_index_old
|
cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
|
try:
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
process_index = i
|
if process_index_old >= i:
|
# 已经处理过的数据不需要处理
|
continue
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
|
continue
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and transaction_index < buy_index < real_place_order_index:
|
cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
|
if need_cancel:
|
rate__ = round(cancel_num / max(buy_num, 1), 2)
|
if rate__ > cancel_rate_threshold:
|
return True, total_data[i]
|
finally:
|
# 保存处理进度与数据
|
self.__save_compute_data(code, process_index, buy_num, cancel_num)
|
else:
|
# S测计算位置为信号起始位置执行位1s之后
|
if tool.trade_time_sub(total_data[end_index]["val"]["time"],
|
total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
|
# 结束位置超过了执行位置60s,需要重新确认结束位置
|
for i in range(end_index, start_index - 1, -1):
|
if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
|
end_index = i
|
break
|
# 获取处理进度
|
process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
|
|
# 如果start_index与buy_single_index相同,即是下单后的第一次计算
|
# 需要查询买入信号之前的同1s是否有涨停撤的数据
|
process_index = process_index_old
|
|
if process_index_old == -1:
|
# 第1次计算需要计算买入信号-执行位的净值
|
left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
|
total_data, volume_rate_index)
|
buy_num += left_big_num
|
# 设置买入信号-买入执行位的数据不需要处理
|
process_index = buy_exec_index
|
# 强制固定为1s
|
range_seconds = 1 # self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
|
# 获取真实下单位置
|
place_order_index = self.__get_real_place_order_index_cache(code)
|
|
cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
|
try:
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
process_index = i
|
if process_index_old >= i:
|
# 已经处理过的数据不需要处理
|
continue
|
if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
|
if place_order_index is not None and place_order_index < data["index"]:
|
# 不能比下单位置后
|
continue
|
# 如果在囊括时间范围内就可以计算买
|
if tool.trade_time_sub(val["time"], buy_exec_time) <= range_seconds:
|
buy_num += data["re"] * int(val["num"])
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_single_index <= buy_index:
|
if place_order_index and place_order_index >= buy_index:
|
cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
|
# 买入时间在囊括范围内
|
elif tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
|
total_data[buy_index]["val"]["time"]) >= 0:
|
cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
|
elif buy_index is None:
|
# 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只判断S级撤销,只有s级撤销才有可能相等
|
if max_space - min_space <= 1:
|
buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
|
if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
|
buy_time.replace(":", "")):
|
# 买入时间在囊括范围内
|
if tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
|
buy_time) >= 0:
|
cancel_num += data["re"] * int(val["num"])
|
|
# 保存数据
|
if need_cancel:
|
rate__ = round(cancel_num / max(buy_num, 1), 2)
|
if rate__ > cancel_rate_threshold:
|
return True, total_data[i]
|
finally:
|
|
l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{} 目标比例:{} 计算时间范围:{}", start_index, end_index,
|
cancel_num,
|
buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold,
|
range_seconds)
|
|
# 保存处理进度与数据
|
self.__save_compute_data(code, process_index, buy_num, cancel_num)
|
return False, None
|
|
# 撤单成功
|
def cancel_success(self, code):
|
self.__clear_data(code)
|
|
# 下单成功
|
def place_order_success(self, code):
|
self.__clear_data(code)
|
|
|
# --------------------------------H撤-------------------------------
|
class HourCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__tradeBuyQueue = TradeBuyQueue()
|
__SecondCancelBigNumComputer = SecondCancelBigNumComputer()
|
|
# 计算触发位置
|
__start_compute_index_dict = {}
|
|
# 成交位置
|
__transaction_progress_index_dict = {}
|
# 成交位置更新时间
|
__transaction_progress_index_updatetime_dict = {}
|
# 缓存
|
__cancel_watch_indexs_cache = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HourCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "h_cancel_watch_indexs-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
if val:
|
val = set(val)
|
else:
|
val = set()
|
CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "h_cancel_transaction_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = int(val)
|
CodeDataCacheUtil.set_cache(cls.__transaction_progress_index_dict, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存成交位置到执行位置的揽括范围数据
|
def __save_watch_index_set(self, code, buy_single_index, indexes):
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H,
|
buy_single_index,
|
list(indexes)))
|
CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, indexes)
|
key = f"h_cancel_watch_indexs-{code}"
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(),
|
json.dumps(list(indexes)))
|
|
# 保存成交进度
|
def __get_watch_index_set(self, code):
|
key = f"h_cancel_watch_indexs-{code}"
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None
|
val = json.loads(val)
|
return val
|
|
def __get_watch_index_set_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_indexs_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return set()
|
|
def __save_transaction_index(self, code, index):
|
CodeDataCacheUtil.set_cache(self.__transaction_progress_index_dict, code, index)
|
key = f"h_cancel_transaction_index-{code}"
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), index)
|
|
def __clear_data(self, code):
|
CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
|
CodeDataCacheUtil.clear_cache(self.__transaction_progress_index_dict, code)
|
CodeDataCacheUtil.clear_cache(self.__start_compute_index_dict, code)
|
ks = [f"h_cancel_watch_indexs-{code}", f"h_cancel_transaction_index-{code}"]
|
for key in ks:
|
RedisUtils.delete_async(self.__db, key)
|
|
# 计算观察索引,倒序计算
|
|
def __compute_watch_index(self, code, buy_single_index):
|
if buy_single_index is None:
|
return
|
if self.__cancel_watch_indexs_cache.get(code):
|
return
|
real_place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
|
if not real_place_order_index:
|
l2_log.h_cancel_debug(code, "尚未找到真实下单位置")
|
return
|
start_compute_index = self.__start_compute_index_dict.get(code)
|
if not start_compute_index:
|
l2_log.h_cancel_debug(code, "尚未找到开始计算位置")
|
return
|
transaction_index = self.__transaction_progress_index_dict.get(code)
|
if transaction_index:
|
# 不能计算成交进度以前的数据
|
start_compute_index = transaction_index + 1 # max(transaction_index + 1, start_compute_index)
|
total_datas = local_today_datas.get(code)
|
|
# h撤计算必须超过5s
|
if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[buy_single_index]["val"]) < 5:
|
l2_log.h_cancel_debug(code, "5s内囊括计算H撤")
|
return
|
# -----------------计算H上-------------------
|
watch_indexes_up = set()
|
for i in range(real_place_order_index - 1, start_compute_index + 1, -1):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < 50 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes_up.add(i)
|
if len(watch_indexes_up) >= 3:
|
break
|
|
# ------------------计算H下-----------------------
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if not limit_up_price:
|
return
|
# 计算结束位置
|
total_num = 0
|
# 获取m值数据
|
thresh_hold_money = Buy1PriceManager().get_latest_buy1_money(code)
|
thresh_hold_money = thresh_hold_money
|
thresh_hold_num = thresh_hold_money // (float(limit_up_price) * 100)
|
end_index = real_place_order_index + 1
|
watch_indexes = set()
|
for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
|
# 看是否撤单
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 50 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
total_num += left_count * val["num"]
|
count = len(watch_indexes)
|
# 最小5笔,最大10笔
|
if (total_num > thresh_hold_num and count >= 5) or count >= 10:
|
break
|
if watch_indexes or watch_indexes_up:
|
watch_indexes |= watch_indexes_up
|
self.__save_watch_index_set(code, buy_single_index, watch_indexes)
|
l2_log.h_cancel_debug(code, f"设置监听范围, 数据范围:{real_place_order_index}-{end_index} 监听范围-{watch_indexes}")
|
# 设置真实下单位置
|
|
def __need_compute_watch_indexes(self, code, transaction_index):
|
start_compute_index = self.__start_compute_index_dict.get(code)
|
if start_compute_index is None:
|
return False
|
real_place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
|
total_datas = local_today_datas.get(code)
|
if real_place_order_index and real_place_order_index > transaction_index:
|
# 成交位置离我们真实下单的位置只有5笔没撤的大单就需要计算H撤的囊括范围了
|
total_left_count = 0
|
for index in range(transaction_index + 1, real_place_order_index):
|
data = total_datas[index]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_left_count += 1
|
if total_left_count > 5:
|
break
|
if total_left_count <= 5:
|
return True
|
|
# 成交位到开始计算位置没有买单之后
|
for index in range(transaction_index + 1, start_compute_index):
|
data = total_datas[index]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if float(val['price']) * val['num'] < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
# 中间还有未撤的买单
|
return False
|
return True
|
|
# 设置成交进度
|
def set_transaction_index(self, code, buy_single_index, index):
|
try:
|
# 每3s或者成交进度变化就囊括一次
|
if index == self.__transaction_progress_index_dict.get(code):
|
if code in self.__transaction_progress_index_updatetime_dict and time.time() - self.__transaction_progress_index_updatetime_dict.get(
|
code) < 3:
|
return
|
self.__transaction_progress_index_dict[code] = index
|
self.__transaction_progress_index_updatetime_dict[code] = time.time()
|
if self.__need_compute_watch_indexes(code, index):
|
self.__compute_watch_index(code, buy_single_index)
|
except Exception as e:
|
l2_log.h_cancel_debug(code, "设置成交进度位置出错:{}", str(e))
|
async_log_util.exception(logger_l2_h_cancel, e)
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index, buy_single_index):
|
if buy_single_index is None:
|
return
|
try:
|
# 计算触发位置
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
# 统计净涨停买的数量
|
not_cancel_indexes = []
|
total_datas = local_today_datas.get(code)
|
for j in range(buy_single_index, index):
|
data = total_datas[j]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
not_cancel_indexes.append(j)
|
if not_cancel_indexes:
|
temp_count = len(not_cancel_indexes)
|
temp_index = int(temp_count * 9 / 10)
|
if temp_index + 1 >= temp_count:
|
temp_index = temp_count - 1
|
self.__start_compute_index_dict[code] = not_cancel_indexes[temp_index]
|
except Exception as e:
|
async_log_util.exception(logger_l2_h_cancel, e)
|
l2_log.h_cancel_debug(code, "设置真实下单位置出错:{}", str(e))
|
|
def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
|
buy_volume_index, volume_index,
|
is_first_code):
|
if buy_single_index is None or buy_exec_index is None:
|
return False, "尚未找到下单位置"
|
|
if int(tool.get_now_time_str().replace(":", "")) > int("145000"):
|
return False, None
|
# 设置成交进度
|
if code not in self.__transaction_progress_index_dict:
|
return False, "没找到成交进度"
|
watch_index_set = self.__get_watch_index_set_cache(code)
|
if not watch_index_set:
|
# 是否有涨停买撤
|
need_compute = False
|
# 有涨停买撤才会计算位置
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data['val']
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
need_compute = True
|
break
|
if need_compute:
|
if self.__need_compute_watch_indexes(code, self.__transaction_progress_index_dict.get(code)):
|
self.__compute_watch_index(code, buy_single_index)
|
watch_index_set = self.__get_watch_index_set_cache(code)
|
if not watch_index_set:
|
return False, "没有监听索引"
|
l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
if watch_index_set:
|
cancel_num = 0
|
total_num = 0
|
for i in watch_index_set:
|
if i is None:
|
l2_log.h_cancel_debug(code, f"空值:{watch_index_set}")
|
continue
|
data = total_data[i]
|
val = data['val']
|
total_num += val['num'] * data['re']
|
# 判断是否撤单
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
cancel_num += val['num'] * (data['re'] - left_count)
|
rate = round(cancel_num / total_num, 4)
|
must_buy_cancel_rate = constant.H_CANCEL_RATE
|
try:
|
temp_rate = gpcode_manager.MustBuyCodesManager().get_cancel_rate_cache(code)
|
if temp_rate:
|
must_buy_cancel_rate = temp_rate
|
except Exception as e:
|
async_log_util.error(logger_l2_h_cancel, str(e))
|
if rate >= must_buy_cancel_rate:
|
l2_log.h_cancel_debug(code, f"撤单比例:{rate}")
|
return True, total_data[-1]
|
return False, None
|
|
# 下单成功
|
def place_order_success(self, code, buy_single_index, buy_exec_index, total_data):
|
self.__clear_data(code)
|
|
# 获取H撤监听的数据索引范围
|
# 返回监听范围与已撤单索引
|
def get_watch_index_dict(self, code):
|
return {}, set()
|
|
def start_compute_watch_indexes(self, code, buy_single_index):
|
self.__compute_watch_index(code, buy_single_index)
|
|
|
# ---------------------------------D撤-------------------------------
|
# 计算 成交位->真实下单位置 总共还剩下多少手没有撤单
|
# 成交位变化之后重新计算
|
class DCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__cancel_real_order_index_cache = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(DCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "d_cancel_real_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val))
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def __set_real_order_index(self, code, index):
|
CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index)
|
RedisUtils.setex_async(self.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
|
|
def __del_real_order_index(self, code):
|
CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code)
|
RedisUtils.delete_async(self.__db, f"d_cancel_real_order_index-{code}")
|
|
def __get_real_order_index(self, code):
|
val = RedisUtils.get(self.__db, f"d_cancel_real_order_index-{code}")
|
if val:
|
return int(val)
|
return None
|
|
def __get_real_order_index_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def clear(self, code=None):
|
if code:
|
self.__del_real_order_index(code)
|
else:
|
keys = RedisUtils.keys(self.__get_redis(), "d_cancel_real_order_index-*")
|
if keys:
|
for k in keys:
|
code = k.replace("d_cancel_real_order_index-", "")
|
self.__del_real_order_index(code)
|
|
# 设置成交位
|
def set_trade_progress(self, code, index, buy_exec_index, total_data, m_base_value,
|
limit_up_price):
|
# 离下单执行位2分钟内的有效
|
sub_time = tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time'])
|
if sub_time > constant.D_CANCEL_EXPIRE_TIME or sub_time < constant.D_CANCEL_START_TIME:
|
return False, "超过D撤守护时间"
|
|
real_order_index = self.__get_real_order_index_cache(code)
|
if not real_order_index:
|
return False, "尚未获取到真实下单位置"
|
|
left_num = 0
|
for i in range(index + 1, real_order_index):
|
data = total_data[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val['num'] * val['price'] < 5900:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
left_num += val['num'] * left_count
|
# 剩下的不足动态M值的1/2
|
rate = round(float(limit_up_price) * left_num * 100 / m_base_value, 3)
|
l2_log.d_cancel_debug(code,
|
f"成交进度({index})到下单位置({real_order_index})的剩余笔数:{left_num},撤单比例:{rate},m值:{m_base_value}")
|
if rate < constant.D_CANCEL_RATE:
|
l2_log.cancel_debug(code, "D撤撤单,比例为:{},目标比例{}", rate, constant.D_CANCEL_RATE)
|
return True, f"D撤比例为:{rate}"
|
return False, ""
|
|
# 设置真实的下单位置
|
def set_real_order_index(self, code, index):
|
self.__set_real_order_index(code, index)
|
l2_log.d_cancel_debug(code, f"下单位置设置:{index}")
|
|
def place_order_success(self, code):
|
self.clear(code)
|
|
def cancel_success(self, code):
|
self.clear(code)
|
|
|
# ---------------------------------L撤-------------------------------
|
class LCancelRateManager:
|
__block_limit_up_count_dict = {}
|
__big_num_deal_rate_dict = {}
|
__MustBuyCodesManager = gpcode_manager.MustBuyCodesManager()
|
|
# 获取撤单比例,返回(撤单比例,是否必买)
|
@classmethod
|
def get_cancel_rate(cls, code, buy_exec_time, is_up=False, is_l_down_recomputed=False):
|
# 下单15s内撤单比例为设置为29% -- 暂时不生效
|
# if not is_up and tool.trade_time_sub(tool.get_now_time_str(), buy_exec_time) <= 15:
|
# return 0.29, False
|
try:
|
must_buy_cancel_rate = cls.__MustBuyCodesManager.get_cancel_rate_cache(code)
|
if must_buy_cancel_rate is not None:
|
return must_buy_cancel_rate, True
|
except Exception as e:
|
async_log_util.error(logger_l2_l_cancel, str(e))
|
|
base_rate = constant.L_CANCEL_RATE
|
if is_up:
|
base_rate = constant.L_CANCEL_RATE_UP
|
|
try:
|
block_rate = 0
|
if code in cls.__block_limit_up_count_dict:
|
count = cls.__block_limit_up_count_dict[code]
|
rates = [0, 0.03, 0.06, 0.08, 0.12]
|
if count >= len(rates):
|
block_rate = rates[-1]
|
else:
|
block_rate = rates[count]
|
|
deal_rate = 0
|
if code in cls.__big_num_deal_rate_dict:
|
temp_rate = cls.__big_num_deal_rate_dict[code]
|
if temp_rate >= 1:
|
if temp_rate > 3:
|
temp_rate = 3
|
deal_rate = round((temp_rate * 3.5 - 2.5) / 100, 4) if is_up else round(
|
(temp_rate * 5.25 - 3.75) / 100, 4)
|
|
base_rate += block_rate
|
base_rate += deal_rate
|
except Exception as e:
|
l2_log.l_cancel_debug(code, f"计算撤单比例出错:{e}")
|
return round(base_rate, 2), False
|
|
# 设置板块涨停数量(除开自己)
|
@classmethod
|
def set_block_limit_up_count(cls, reason_codes_dict):
|
for reason in reason_codes_dict:
|
codes = reason_codes_dict[reason]
|
for c in codes:
|
cls.__block_limit_up_count_dict[c] = len(codes) - 1
|
|
# 设置大单成交金额➗固定m值比例
|
@classmethod
|
def set_big_num_deal_rate(cls, code, rate):
|
cls.__big_num_deal_rate_dict[code] = rate
|
l2_log.l_cancel_debug(code, f"设置大单成交金额比值:{rate}")
|
|
@classmethod
|
def compute_big_num_deal_rate(cls, code):
|
total_datas = local_today_datas.get(code)
|
order_no_map = local_today_buyno_map.get(code)
|
# 获取成交大单手数
|
total_deal_nums = DealOrderNoManager().get_deal_nums(code, order_no_map)
|
# 获取板上卖手数
|
total_sell_nums = 0
|
sell_indexs = L2LimitUpSellManager().get_limit_up_sell_indexes(code)
|
if sell_indexs:
|
for index in sell_indexs:
|
total_sell_nums += total_datas[int(index)]["val"]["num"]
|
l2_log.l_cancel_debug(code, f"大单买-{total_deal_nums} 板上卖-{total_sell_nums}")
|
total_deal_nums -= total_sell_nums
|
if total_deal_nums < 0:
|
total_deal_nums = 0
|
thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price:
|
rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
|
cls.set_big_num_deal_rate(code, rate)
|
|
|
# 计算成交位置之后的大单(特定笔数)的撤单比例
|
class LCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__last_trade_progress_dict = {}
|
__real_place_order_index_dict = {}
|
__cancel_watch_index_info_cache = {}
|
# 成交位附近临近大单索引
|
__near_by_trade_progress_index_cache = {}
|
|
__SecondCancelBigNumComputer = SecondCancelBigNumComputer()
|
|
# L后囊括范围未撤单/未成交的总手数
|
__total_l_down_not_deal_num_dict = {}
|
# L后最近的成交数信息
|
__l_down_latest_deal_info_dict = {}
|
|
__last_l_up_compute_info = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(LCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "l_cancel_watch_index_info-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
if val:
|
val = json.loads(val)
|
val[2] = set(val[2])
|
CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_info_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "l_cancel_real_place_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
CodeDataCacheUtil.set_cache(cls.__real_place_order_index_dict, code, val)
|
|
keys = RedisUtils.keys(__redis, "l_cancel_near_by_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
try:
|
val = set(json.loads(val))
|
CodeDataCacheUtil.set_cache(cls.__near_by_trade_progress_index_cache, code, val)
|
except:
|
pass
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def __set_watch_indexes(self, code, buy_single_index, re_compute: int, indexes):
|
self.__cancel_watch_index_info_cache[code] = (buy_single_index, re_compute, indexes)
|
RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
|
RedisUtils.setex_async(self.__db, f"l_cancel_watch_index_info-{code}", tool.get_expire(),
|
json.dumps((buy_single_index, re_compute, list(indexes))))
|
if indexes:
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN,
|
buy_single_index,
|
list(indexes)))
|
|
def __get_watch_indexes_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_info_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def __set_near_by_trade_progress_indexes(self, code, buy_single_index, indexes):
|
if indexes:
|
trade_record_log_util.add_cancel_watch_indexes_log(code,
|
trade_record_log_util.CancelWatchIndexesInfo(
|
trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_UP,
|
buy_single_index,
|
list(indexes)))
|
self.__near_by_trade_progress_index_cache[code] = indexes
|
RedisUtils.setex_async(self.__db, f"l_cancel_near_by_index-{code}", tool.get_expire(),
|
json.dumps(list(indexes)))
|
|
def __get_near_by_trade_progress_indexes(self, code):
|
val = RedisUtils.get(self.__get_redis(), f"l_cancel_near_by_index-{code}")
|
if val is None:
|
return None
|
return set(json.loads(val))
|
|
def __get_near_by_trade_progress_indexes_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__near_by_trade_progress_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def del_watch_index(self, code):
|
CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_info_cache, code)
|
RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
|
|
def clear(self, code=None):
|
if code:
|
self.del_watch_index(code)
|
if code in self.__real_place_order_index_dict:
|
self.__real_place_order_index_dict.pop(code)
|
RedisUtils.delete_async(self.__db, f"l_cancel_real_place_order_index-{code}")
|
else:
|
keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index_info-*")
|
for k in keys:
|
code = k.replace("l_cancel_watch_index_info-", "")
|
if code in self.__last_trade_progress_dict:
|
self.__last_trade_progress_dict.pop(code)
|
if code in self.__real_place_order_index_dict:
|
self.__real_place_order_index_dict.pop(code)
|
self.del_watch_index(code)
|
keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_real_place_order_index-*")
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
|
# 重新计算L上
|
|
def re_compute_l_down_watch_indexes(self, code):
|
watch_index_info = self.__cancel_watch_index_info_cache.get(code)
|
if not watch_index_info or watch_index_info[1] > 0:
|
return
|
# 获取成交进度位与真实下单位置
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
last_trade_progress_index = self.__last_trade_progress_dict.get(code)
|
if not real_place_order_index_info or not last_trade_progress_index:
|
return
|
self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1,
|
real_place_order_index_info[0],
|
re_compute=1)
|
|
# 计算观察索引,倒序计算
|
# re_compute:是否是重新计算的
|
def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0):
|
try:
|
l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
|
total_datas = local_today_datas.get(code)
|
if total_datas:
|
# 计算的上截至位距离下截至位纯买额要小于2.5倍m值
|
# thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
# thresh_hold_money = int(thresh_hold_money * 2.5)
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
# 统计净涨停买的数量
|
not_cancel_indexes_with_num = []
|
re_start_index = start_index
|
MAX_COUNT = 5
|
for j in range(start_index, end_index):
|
data = total_datas[j]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
|
if val["num"] < min_num:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
not_cancel_indexes_with_num.append((j, val["num"]))
|
min_num = 0
|
if not_cancel_indexes_with_num:
|
temp_count = len(not_cancel_indexes_with_num)
|
# 取后1/5的数据
|
if temp_count >= 30:
|
temp_index = int(temp_count * 4 / 5)
|
re_start_index = not_cancel_indexes_with_num[temp_index][0]
|
MAX_COUNT = len(not_cancel_indexes_with_num[temp_index:])
|
else:
|
not_cancel_indexes_with_num.sort(key=lambda x: x[1])
|
if temp_count >= 10:
|
# 获取中位数
|
min_num = not_cancel_indexes_with_num[temp_count // 2][1]
|
MIN_MONEYS = [300, 200, 100, 50]
|
watch_indexes = set()
|
for min_money in MIN_MONEYS:
|
for i in range(end_index, re_start_index - 1, -1):
|
try:
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < min_money * 100:
|
continue
|
|
if val['num'] < min_num:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
|
code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
except Exception as e:
|
logger_l2_l_cancel.error(f"{code}: 范围: {start_index}-{end_index} 位置:{i}")
|
logger_l2_l_cancel.exception(e)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
if watch_indexes:
|
##判断监听的数据中是否有大单##
|
has_big_num = False
|
for i in watch_indexes:
|
# 是否有大单
|
data = total_datas[i]
|
val = data['val']
|
if float(val['price']) * val['num'] > 100 * 100:
|
has_big_num = True
|
break
|
if not has_big_num:
|
# 无大单,需要找大单
|
for i in range(re_start_index, start_index, -1):
|
data = total_datas[i]
|
val = data['val']
|
# 涨停买,且未撤单
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < 100 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(
|
code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
break
|
self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes)
|
l2_log.l_cancel_debug(code,
|
f"设置监听范围{'(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
|
except Exception as e:
|
l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}")
|
async_log_util.exception(logger_l2_l_cancel, e)
|
|
# 设置真实下单位置
|
def set_real_place_order_index(self, code, index, buy_single_index=None, is_default=False):
|
l2_log.l_cancel_debug(code, f"设置真实下单位-{index},buy_single_index-{buy_single_index}")
|
self.__real_place_order_index_dict[code] = (index, is_default)
|
RedisUtils.setex_async(self.__db, f"l_cancel_real_place_order_index-{code}", tool.get_expire(), index)
|
if buy_single_index is not None:
|
self.compute_watch_index(code, buy_single_index, buy_single_index, index)
|
if self.__last_trade_progress_dict.get(code):
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index,
|
self.__last_trade_progress_dict.get(code) + 1, index)
|
else:
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index, buy_single_index, index)
|
|
# 重新计算L上
|
def re_compute_l_up_watch_indexes(self, code, buy_single_index):
|
if code not in self.__last_trade_progress_dict:
|
return
|
if code not in self.__real_place_order_index_dict:
|
return
|
if code not in self.__last_l_up_compute_info or time.time() - self.__last_l_up_compute_info[code][0] >= 3:
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index,
|
self.__last_trade_progress_dict.get(code) + 1,
|
self.__real_place_order_index_dict.get(code)[0])
|
|
# 计算范围内的成交位临近未撤大单
|
def __compute_trade_progress_near_by_indexes(self, code, buy_single_index, start_index, end_index):
|
if start_index is None or end_index is None:
|
return
|
total_datas = local_today_datas.get(code)
|
MIN_MONEY = 99 * 100
|
MAX_COUNT = 15
|
watch_indexes = set()
|
total_num = 0
|
# thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
# threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
|
for i in range(start_index, end_index):
|
data = total_datas[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
# 小金额过滤
|
if float(val['price']) * val['num'] < MIN_MONEY:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_num += val['num'] * left_count
|
watch_indexes.add(i)
|
if len(watch_indexes) >= MAX_COUNT:
|
break
|
|
changed = True
|
if code in self.__last_l_up_compute_info:
|
if self.__last_l_up_compute_info[code] == watch_indexes:
|
changed = False
|
# 保存数据
|
if changed:
|
l2_log.l_cancel_debug(code, f"设置成交位临近撤单监控范围:{watch_indexes} 计算范围:{start_index}-{end_index}")
|
self.__set_near_by_trade_progress_indexes(code, buy_single_index, watch_indexes)
|
self.__last_l_up_compute_info[code] = (time.time(), watch_indexes)
|
|
# 计算L后还没成交的手数
|
def __compute_total_l_down_not_deal_num(self, code):
|
# 只有真实获取到下单位置后才开始计算
|
|
try:
|
if code in self.__total_l_down_not_deal_num_dict and time.time() - \
|
self.__total_l_down_not_deal_num_dict[code][
|
1] < 1:
|
# 需要间隔1s更新一次
|
return
|
l_down_cancel_info = self.__cancel_watch_index_info_cache.get(code)
|
if not l_down_cancel_info:
|
return
|
|
trade_progress = self.__last_trade_progress_dict.get(code)
|
if trade_progress is None:
|
return
|
|
l_down_indexes = l_down_cancel_info[2]
|
# 统计还未成交的数据
|
total_left_num = 0
|
total_datas = local_today_datas.get(code)
|
for i in l_down_indexes:
|
# 已经成交了的不计算
|
if i < trade_progress:
|
continue
|
data = total_datas[i]
|
val = data['val']
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
fnum = val["num"]
|
if i == trade_progress:
|
# 需要减去已经成交的数据
|
dealing_info = HuaXinTransactionDataManager.get_dealing_order_info(code)
|
if dealing_info:
|
if str(val["orderNo"]) == str(dealing_info[0]):
|
fnum -= dealing_info[1] // 100
|
total_left_num += fnum
|
self.__total_l_down_not_deal_num_dict[code] = (total_left_num, time.time())
|
except Exception as e:
|
async_log_util.exception(logger_l2_l_cancel, e)
|
|
# 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
|
def set_trade_progress(self, code, buy_single_index, index, total_datas):
|
if self.__last_trade_progress_dict.get(code) == index:
|
self.__compute_total_l_down_not_deal_num(code)
|
return
|
self.__last_trade_progress_dict[code] = index
|
self.__compute_total_l_down_not_deal_num(code)
|
|
if not self.__is_l_down_can_cancel(code, buy_single_index):
|
# L后已经不能守护
|
HourCancelBigNumComputer().start_compute_watch_indexes(code, buy_single_index)
|
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
real_place_order_index = None
|
if real_place_order_index_info:
|
real_place_order_index = real_place_order_index_info[0]
|
|
# 重新计算成交位置临近大单撤单
|
self.__compute_trade_progress_near_by_indexes(code, buy_single_index, index + 1, real_place_order_index)
|
|
def add_transaction_datas(self, code, transaction_datas):
|
if not transaction_datas:
|
return False, "成交数据为空"
|
buyno_map = local_today_buyno_map.get(code)
|
if not buyno_map:
|
return False, "没找到买单字典"
|
total_datas = local_today_datas.get(code)
|
if not total_datas:
|
return False, "L2数据为空"
|
|
l_down_cancel_info = self.__cancel_watch_index_info_cache.get(code)
|
if not l_down_cancel_info:
|
return False, "L撤为空"
|
l_down_indexes = l_down_cancel_info[2]
|
|
for transaction_data in transaction_datas:
|
buy_data = buyno_map.get(str(transaction_data[6]))
|
if not buy_data:
|
continue
|
if buy_data["index"] not in l_down_indexes:
|
continue
|
# 统计数据
|
orgin_deal_data = self.__l_down_latest_deal_info_dict.get(code)
|
if not orgin_deal_data:
|
orgin_deal_data = [0, None]
|
time_str = l2_huaxin_util.convert_time(transaction_data[3])
|
if orgin_deal_data[1] != time_str:
|
orgin_deal_data[0] = transaction_data[2]
|
orgin_deal_data[1] = time_str
|
else:
|
orgin_deal_data[0] += transaction_data[2]
|
self.__l_down_latest_deal_info_dict[code] = orgin_deal_data
|
# 计算成交比例
|
total_l_down_not_deal_num = self.__total_l_down_not_deal_num_dict.get(code)
|
if total_l_down_not_deal_num is None:
|
return False, "L撤未成交统计为空"
|
orgin_deal_data = self.__l_down_latest_deal_info_dict.get(code)
|
if orgin_deal_data is None:
|
return False, "L后暂时无成交"
|
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
if real_place_order_index_info and real_place_order_index_info[1]:
|
return False, "没获取到真实的下单位"
|
|
threshold_rate = constant.L_CANCEL_FAST_DEAL_RATE
|
rate = orgin_deal_data[0] / (total_l_down_not_deal_num[0] * 100)
|
if rate > threshold_rate:
|
limit_up_price = float(gpcode_manager.get_limit_up_price(code))
|
deal_money = limit_up_price * orgin_deal_data[0]
|
if deal_money >= constant.L_CANCEL_FAST_DEAL_MIN_MONEY:
|
return True, f"达到撤单比例:{rate}/{threshold_rate} 成交详情:{orgin_deal_data}/{total_l_down_not_deal_num}"
|
else:
|
return False, f"已达到撤单比例,未达到撤单金额:{deal_money}"
|
else:
|
return False, "尚未达到撤单比例"
|
|
# 已经成交的索引
|
def add_deal_index(self, code, index, buy_single_index):
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return
|
watch_indexes = watch_indexes_info[2]
|
if index not in watch_indexes:
|
return
|
if buy_single_index is None:
|
return
|
# 重新囊括1笔
|
real_place_order_info = self.__real_place_order_index_dict.get(code)
|
if real_place_order_info and real_place_order_info[0] > index:
|
total_datas = local_today_datas.get(code)
|
min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
|
for j in range(index + 1, real_place_order_info[0]):
|
data = total_datas[j]
|
val = data['val']
|
if data["index"] in watch_indexes:
|
continue
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] < min_num:
|
continue
|
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
j,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(data["index"])
|
break
|
self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes)
|
|
def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return False, None
|
watch_indexes = set([int(i) for i in watch_indexes_info[2]])
|
# 计算监听的总条数
|
total_num = 0
|
max_num = 0
|
for wi in watch_indexes:
|
total_num += total_data[wi]["val"]["num"] * total_data[wi]["re"]
|
if total_data[wi]["val"]["num"] > max_num:
|
max_num = total_data[wi]["val"]["num"]
|
# 判断撤单中是否有监听中的索引
|
need_compute = False
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_index in watch_indexes:
|
need_compute = True
|
break
|
if need_compute:
|
# 计算撤单比例
|
watch_indexes_list = list(watch_indexes)
|
watch_indexes_list.sort()
|
canceled_num = 0
|
# 记录撤单索引
|
canceled_indexes = []
|
for wi in watch_indexes:
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
wi,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if cancel_data:
|
canceled_num += total_data[wi]["val"]["num"] * total_data[wi]["re"]
|
canceled_indexes.append(cancel_data["index"])
|
# if wi == watch_indexes_list[-1] and left_count == 0:
|
# # 离下单位置最近的一个撤单,必须触发撤单
|
# l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},临近撤单:{wi}")
|
# return True, total_data[-1]
|
|
rate = round(canceled_num / total_num, 3)
|
thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_data[buy_exec_index]["val"]["time"])
|
# 除开最大单的影响权重
|
if not must_buy:
|
temp_thresh_hold_rate = round((total_num - max_num) * 0.9 / total_num, 2)
|
thresh_hold_rate = min(thresh_hold_rate, temp_thresh_hold_rate)
|
l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},已撤单比例:{rate}/{thresh_hold_rate}")
|
if rate >= thresh_hold_rate:
|
canceled_indexes.sort()
|
l2_log.l_cancel_debug(code, f"L下撤单,撤单位置:{canceled_indexes[-1]}")
|
return True, total_data[canceled_indexes[-1]]
|
|
return False, None
|
|
def __compute_near_by_trade_progress_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data,
|
is_first_code):
|
watch_indexes = self.__get_near_by_trade_progress_indexes_cache(code)
|
if not watch_indexes:
|
return False, None
|
|
# 监听范围小于5笔不生效
|
if len(watch_indexes) < 5:
|
return False, None
|
|
# 计算监听的总条数
|
# 权重
|
WATCH_INDEX_WEIGHTS = [3, 2, 1]
|
total_count_weight = 0
|
for wi in range(0, len(watch_indexes)):
|
if wi < len(WATCH_INDEX_WEIGHTS):
|
total_count_weight += WATCH_INDEX_WEIGHTS[wi]
|
else:
|
total_count_weight += WATCH_INDEX_WEIGHTS[-1]
|
# 判断撤单中是否有监听中的索引
|
need_compute = False
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_index in watch_indexes:
|
need_compute = True
|
break
|
if need_compute:
|
watch_indexes_list = list(watch_indexes)
|
watch_indexes_list.sort()
|
# 计算撤单比例
|
canceled_count_weight = 0
|
canceled_indexes = []
|
for wi in watch_indexes:
|
canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
wi,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if canceled_data:
|
canceled_indexes.append(canceled_data["index"])
|
canceled_count_weight += total_data[wi]["re"]
|
pos_index = watch_indexes_list.index(wi)
|
if pos_index < len(WATCH_INDEX_WEIGHTS):
|
canceled_count_weight += WATCH_INDEX_WEIGHTS[pos_index]
|
else:
|
canceled_count_weight += WATCH_INDEX_WEIGHTS[-1]
|
rate = round(canceled_count_weight / total_count_weight, 3)
|
thresh_cancel_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_data[buy_exec_index]["val"]["time"],
|
is_up=True)
|
l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},成交位临近已撤单比例:{rate}/{thresh_cancel_rate}")
|
if rate >= thresh_cancel_rate:
|
# 计算成交进度位置到当前下单位置的纯买额
|
real_place_order_index_info = self.__real_place_order_index_dict.get(code)
|
trade_progress_index = self.__last_trade_progress_dict.get(code)
|
if real_place_order_index_info and trade_progress_index:
|
total_num = 0
|
thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
|
thresh_hold_money = thresh_hold_money * 3
|
# 阈值为2倍m值
|
thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
|
for i in range(trade_progress_index + 1, real_place_order_index_info[0]):
|
data = total_data[i]
|
val = data['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
|
i,
|
total_data,
|
local_today_canceled_buyno_map.get(
|
code))
|
if not canceled_data:
|
# 没有撤单
|
total_num += val["num"] * data["re"]
|
if total_num > thresh_hold_num:
|
# 成交位到下单位还有足够的单没撤
|
l2_log.l_cancel_debug(code,
|
f"L上撤阻断: 成交位-{trade_progress_index} 真实下单位-{real_place_order_index_info[0]} 阈值-{thresh_hold_money}")
|
return False, None
|
|
canceled_indexes.sort()
|
l2_log.l_cancel_debug(code, f"L上撤单,撤单位置:{canceled_indexes[-1]}")
|
return True, total_data[canceled_indexes[-1]]
|
|
return False, None
|
|
# L后是否还有可能撤单
|
def __is_l_down_can_cancel(self, code, buy_exec_index):
|
watch_indexes_info = self.__get_watch_indexes_cache(code)
|
if not watch_indexes_info:
|
return True
|
trade_index = self.__last_trade_progress_dict.get(code)
|
if trade_index is None:
|
return True
|
# 计算已经成交的比例
|
total_datas = local_today_datas.get(code)
|
total_deal_nums = 0
|
total_nums = 1
|
for index in watch_indexes_info[2]:
|
data = total_datas[index]
|
val = data["val"]
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
total_nums += val["num"]
|
if left_count > 0 and index < trade_index:
|
total_deal_nums += val["num"]
|
thresh_hold_rate, must_buy = LCancelRateManager.get_cancel_rate(code,
|
total_datas[buy_exec_index]["val"]["time"])
|
if total_deal_nums / total_nums > 1 - thresh_hold_rate - 0.05:
|
return False
|
return True
|
|
def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
|
if buy_exec_index is None:
|
return False, None, "尚未找到下单位置"
|
# 守护S撤以外的数据
|
if int(tool.get_now_time_str().replace(":", "")) > int("145700") and not constant.TEST:
|
return False, None, ""
|
# 下单位临近撤
|
can_cancel, cancel_data = False, None
|
try:
|
can_cancel, cancel_data = self.__compute_need_cancel(code, buy_exec_index, start_index, end_index,
|
total_data,
|
is_first_code)
|
except Exception as e:
|
logger_l2_l_cancel.exception(e)
|
raise e
|
extra_msg = "L后"
|
if not can_cancel:
|
# 成交位临近撤
|
try:
|
can_cancel, cancel_data = self.__compute_near_by_trade_progress_need_cancel(code, buy_exec_index,
|
start_index, end_index,
|
total_data,
|
is_first_code)
|
extra_msg = "L前"
|
except Exception as e:
|
logger_l2_l_cancel.exception(e)
|
raise e
|
return can_cancel, cancel_data, extra_msg
|
|
def place_order_success(self, code):
|
self.clear(code)
|
|
def cancel_success(self, code):
|
self.clear(code)
|
|
|
# 新F撤,根据成交数据来撤
|
class FCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__real_order_index_cache = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(FCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "f_cancel_real_order_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
CodeDataCacheUtil.set_cache(cls.__real_order_index_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
def __set_real_order_index(self, code, index, is_default):
|
CodeDataCacheUtil.set_cache(self.__real_order_index_cache, code, (index, is_default))
|
RedisUtils.setex_async(self.__db, f"f_cancel_real_order_index-{code}", tool.get_expire(), json.dumps((index, is_default)))
|
|
def __del_real_order_index(self, code):
|
CodeDataCacheUtil.clear_cache(self.__real_order_index_cache, code)
|
RedisUtils.delete_async(self.__db, f"f_cancel_real_order_index-{code}")
|
|
def __get_real_order_index(self, code):
|
val = RedisUtils.get(self.__db, f"f_cancel_real_order_index-{code}")
|
if val:
|
val = json.loads(val)
|
return val[0]
|
return None
|
|
def __get_real_order_index_cache(self, code):
|
cache_result = CodeDataCacheUtil.get_cache(self.__real_order_index_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def clear(self, code=None):
|
if code:
|
self.__del_real_order_index(code)
|
else:
|
keys = RedisUtils.keys(self.__get_redis(), "f_cancel_real_order_index-*")
|
if keys:
|
for k in keys:
|
code = k.split("-")[1]
|
self.__del_real_order_index(code)
|
|
# 是否需要撤单
|
def need_cancel(self, transaction_data):
|
if not transaction_data:
|
return False, "成交数据为空"
|
if transaction_data[2] < 10000:
|
return False, "成交量小于10000"
|
# 成交100万以上才算
|
if transaction_data[1] * transaction_data[2] < 1000000:
|
return False, "金额不满足要求"
|
code = transaction_data[0]
|
real_order_index = self.__real_order_index_cache.get(code)
|
if not real_order_index:
|
return False, "真实下单位置没找到"
|
# 守护15s
|
now_time = l2_huaxin_util.convert_time(transaction_data[3])
|
total_datas = local_today_datas.get(code)
|
if not total_datas:
|
return False, "L2数据为空"
|
order_time = total_datas[real_order_index]["val"]["time"]
|
if tool.trade_time_sub(now_time, order_time) > 15:
|
return False, "只守护15s"
|
buyno_map = local_today_buyno_map.get(code)
|
if not buyno_map:
|
return False, "没找到买单字典"
|
buy_data = buyno_map.get(str(transaction_data[6]))
|
if not buy_data:
|
return False, f"没有找到对应买单({transaction_data[6]})"
|
if not l2_data_util.is_big_money(buy_data["val"]):
|
return False, f"不为大单"
|
# 计算成交比例
|
if transaction_data[2] > buy_data["val"]["num"] * 100 * 0.5:
|
return True, "快速成交了50%以上"
|
else:
|
return False, "快速成交了50%以下"
|
|
# 设置真实的下单位置
|
def set_real_order_index(self, code, index, is_default):
|
self.__set_real_order_index(code, index, is_default)
|
|
def place_order_success(self, code):
|
self.clear(code)
|
|
def cancel_success(self, code):
|
self.clear(code)
|
|
# 是否成交太快需要撤单
|
def need_cancel_for_deal_fast(self, code, trade_index):
|
# 判断是否具有真实的下单位置
|
real_order_index_info = self.__get_real_order_index_cache(code)
|
if not real_order_index_info:
|
return False, "没找到真实下单位"
|
if real_order_index_info[1]:
|
return False, "真实下单位为默认"
|
if real_order_index_info[0] <= trade_index:
|
return False, "真实下单位在成交位之前"
|
real_order_index = real_order_index_info[0]
|
# 统计未撤订单的数量与金额
|
total_datas = local_today_datas.get(code)
|
# 是否是下单5分钟内
|
if tool.trade_time_sub(tool.get_now_time_str(),total_datas[real_order_index]['val']['time']) > 5*60:
|
return False, "下单超过5分钟"
|
|
total_left_count = 0
|
total_left_num = 0
|
for i in range(trade_index + 1, real_order_index_info[0]):
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] * float(val["price"]) < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_left_count += left_count
|
total_left_num += val["num"] * left_count
|
if total_left_count < 10:
|
return True, f"剩余笔数不足({total_left_count}),成交进度:{trade_index},真实下单位置:{real_order_index}"
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price and total_left_num * float(limit_up_price) < 1000*100:
|
return True, f"剩余金额不足({round(total_left_num * float(limit_up_price)*100)}),成交进度:{trade_index},真实下单位置:{real_order_index}"
|
return False, "不满足撤单条件"
|
|
|
|
|
|
# ---------------------------------G撤-------------------------------
|
class GCancelBigNumComputer:
|
__real_place_order_index_dict = {}
|
__trade_progress_index_dict = {}
|
__watch_indexes_dict = {}
|
__watch_indexes_by_dict = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(GCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
return cls.__instance
|
|
def set_real_place_order_index(self, code, index, buy_single_index, is_default):
|
self.__real_place_order_index_dict[code] = (index, is_default)
|
start_index = buy_single_index
|
if code in self.__trade_progress_index_dict:
|
start_index = self.__trade_progress_index_dict.get(code)
|
self.__commpute_watch_indexes(code, start_index, (index, is_default), from_real_order_index_changed=True)
|
|
def clear(self, code=None):
|
if code:
|
if code in self.__real_place_order_index_dict:
|
self.__real_place_order_index_dict.pop(code)
|
if code in self.__watch_indexes_dict:
|
self.__watch_indexes_dict.pop(code)
|
if code in self.__watch_indexes_by_dict:
|
self.__watch_indexes_by_dict.pop(code)
|
if code in self.__trade_progress_index_dict:
|
self.__trade_progress_index_dict.pop(code)
|
|
else:
|
self.__real_place_order_index_dict.clear()
|
self.__watch_indexes_dict.clear()
|
self.__trade_progress_index_dict.clear()
|
self.__watch_indexes_by_dict.clear()
|
|
def __commpute_watch_indexes(self, code, traded_index, real_order_index_info, from_real_order_index_changed=False):
|
if traded_index is None or real_order_index_info is None:
|
return
|
real_order_index, is_default = real_order_index_info[0], real_order_index_info[1]
|
origin_watch_index = self.__watch_indexes_dict.get(code)
|
if origin_watch_index is None:
|
origin_watch_index = set()
|
origin_watch_index_by = self.__watch_indexes_by_dict.get(code)
|
if origin_watch_index_by is None:
|
origin_watch_index_by = set()
|
|
start_index = traded_index
|
if traded_index in origin_watch_index or traded_index in origin_watch_index_by:
|
start_index = traded_index + 1
|
|
total_datas = local_today_datas.get(code)
|
watch_indexes = set()
|
for i in range(start_index, real_order_index):
|
# 判断是否有未撤的大单
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] * float(val["price"]) < 29900:
|
continue
|
# 是否已撤单
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
watch_indexes.add(i)
|
if watch_indexes:
|
# 还有300万以上的大单没有撤单
|
if from_real_order_index_changed:
|
# 真实下单位改变后才会更新
|
final_watch_indexes = origin_watch_index | watch_indexes
|
self.__watch_indexes_dict[code] = final_watch_indexes
|
l2_log.g_cancel_debug(code, f"大单监听:{final_watch_indexes}")
|
# 有大单监听,需要移除之前的小单监听
|
if code in self.__watch_indexes_by_dict:
|
self.__watch_indexes_by_dict[code].clear()
|
else:
|
l2_log.g_cancel_debug(code, f"没有大单监听,开始计算小单:{start_index}-{real_order_index}")
|
# 没有300万以上的大单了,计算备用
|
# 只有备用单成交了或者没有备用单,才会再次寻找备用单
|
need_find_by = False
|
if not origin_watch_index_by:
|
need_find_by = True
|
else:
|
# 备用单是否成交
|
need_find_by = True
|
for i in origin_watch_index_by:
|
if i >= start_index:
|
# 在成交位置之后
|
need_find_by = False
|
break
|
if need_find_by and (not is_default or not watch_indexes):
|
l2_log.g_cancel_debug(code, f"启动小单备用监听:{start_index}-{real_order_index}")
|
temp_list = []
|
for i in range(start_index, real_order_index):
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] * float(val["price"]) < 5000:
|
continue
|
temp_list.append((val["num"], data))
|
if len(temp_list) > 15:
|
break
|
temp_list.sort(key=lambda x: x[0], reverse=True)
|
if temp_list:
|
l2_log.g_cancel_debug(code, f"小单备用监听位置:{temp_list[0][1]['index']}")
|
watch_indexes.add(temp_list[0][1]["index"])
|
self.__watch_indexes_by_dict[code] = origin_watch_index_by | watch_indexes
|
|
def set_trade_progress(self, code, buy_single_index, index):
|
if self.__trade_progress_index_dict.get(code) != index:
|
self.__trade_progress_index_dict[code] = index
|
self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code))
|
|
def need_cancel(self, code, buy_exec_index, start_index, end_index):
|
if code not in self.__real_place_order_index_dict:
|
return False, None, "没有找到真实下单位"
|
real_place_order_index, is_default = self.__real_place_order_index_dict.get(code)
|
total_datas = local_today_datas.get(code)
|
# 30s内有效
|
if tool.trade_time_sub(total_datas[end_index]["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) > 15:
|
return False, None, "下单15s内才生效"
|
|
watch_indexes = self.__watch_indexes_dict.get(code)
|
if watch_indexes is None:
|
watch_indexes = set()
|
watch_indexes_by = self.__watch_indexes_by_dict.get(code)
|
if watch_indexes_by is None:
|
watch_indexes_by = set()
|
need_compute = False
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy_cancel(val):
|
continue
|
if val["num"] * float(val["price"]) < 5000:
|
continue
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None and buy_index < real_place_order_index and (
|
buy_index in watch_indexes or buy_index in watch_indexes_by):
|
if buy_index in watch_indexes_by:
|
# 备用撤单,直接撤
|
return True, data, f"次大单撤:{buy_index}"
|
elif buy_index in watch_indexes:
|
# 大单撤需要重新计算大单撤单比例
|
need_compute = True
|
break
|
if need_compute and watch_indexes:
|
canceled_indexes = set()
|
for index in watch_indexes:
|
cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if cancel_data:
|
canceled_indexes.add(cancel_data["index"])
|
cancel_rate = round(len(canceled_indexes) / len(watch_indexes), 2)
|
if cancel_rate > constant.G_CANCEL_RATE:
|
canceled_indexes_list = list(canceled_indexes)
|
canceled_indexes_list.sort()
|
return True, total_datas[canceled_indexes_list[-1]], f"撤单比例:{cancel_rate}"
|
|
return False, None, ""
|
|
def place_order_success(self, code):
|
self.clear(code)
|
|
def cancel_success(self, code):
|
self.clear(code)
|
|
|
# ---------------------------------独苗撤-------------------------------
|
class UCancelBigNumComputer:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__cancel_real_order_index_cache = {}
|
__SecondCancelBigNumComputer = SecondCancelBigNumComputer()
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(UCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __load_datas(cls):
|
pass
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
# 是否可以撤单
|
def need_cancel(self, code, transaction_index, order_begin_pos: OrderBeginPosInfo,
|
current_limit_up_block_codes_dict, volume_rate):
|
if not order_begin_pos or not order_begin_pos.buy_exec_index or order_begin_pos.buy_exec_index < 0:
|
return False, "尚未下单"
|
if not current_limit_up_block_codes_dict:
|
return False, "涨停列表无数据"
|
# 获取涨停原因
|
block = None
|
for b in current_limit_up_block_codes_dict:
|
if code in current_limit_up_block_codes_dict[b]:
|
block = b
|
break
|
if not block:
|
return False, "没有找到代码的涨停原因"
|
if len(current_limit_up_block_codes_dict[block]) > 1:
|
return False, "有后排无需撤单"
|
total_datas = local_today_datas.get(code)
|
time_sub = tool.trade_time_sub(tool.get_now_time_str(),
|
total_datas[order_begin_pos.buy_exec_index]["val"]["time"])
|
if 2 < time_sub < 30 * 60:
|
real_place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
|
if not real_place_order_index:
|
return False, "尚未找到真实下单位置"
|
total_left_count = 0
|
for i in range(transaction_index, real_place_order_index):
|
data = total_datas[i]
|
val = data["val"]
|
if float(val['price']) * val['num'] < 50 * 100:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
|
total_datas,
|
local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_left_count += 1
|
if total_left_count > 5:
|
break
|
# 成交进度变化
|
# if len(current_limit_up_block_codes_dict[block]) == 1 and volume_rate < 0.6 and total_left_count <= 5:
|
# return True, "独苗下单30分钟无后排且成交位离我们很近且量小于60%"
|
# if time_sub > 10 * 60:
|
# if len(current_limit_up_block_codes_dict[block]) == 1 and volume_rate < 0.7:
|
# return True, f"独苗下单10分钟无后排且量({volume_rate})小于70%"
|
return False, "不需要撤单"
|
|
|
# --------------------------------封单额变化撤------------------------
|
# 涨停封单额统计
|
class L2LimitUpMoneyStatisticUtil:
|
_db = 1
|
_redisManager = redis_manager.RedisManager(1)
|
_thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(L2LimitUpMoneyStatisticUtil, cls).__new__(cls, *args, **kwargs)
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls._redisManager.getRedis()
|
|
# 设置l2的每一秒涨停封单额数据
|
|
def __set_l2_second_money_record(self, code, time, num, from_index, to_index):
|
old_num, old_from, old_to = self.__get_l2_second_money_record(code, time)
|
if old_num is None:
|
old_num = num
|
old_from = from_index
|
old_to = to_index
|
else:
|
old_num += num
|
old_to = to_index
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
|
|
def __get_l2_second_money_record(self, code, time):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
val = RedisUtils.get(self.__get_redis(), key)
|
return self.__format_second_money_record_val(val)
|
|
def __format_second_money_record_val(self, val):
|
if val is None:
|
return None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
def __get_l2_second_money_record_keys(self, code, time_regex):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
|
keys = RedisUtils.keys(self.__get_redis(), key)
|
return keys
|
|
# 设置l2最新的封单额数据
|
|
def __set_l2_latest_money_record(self, code, index, num):
|
key = "l2_limit_up_money-{}".format(code)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num, index)))
|
|
# 返回数量,索引
|
def __get_l2_latest_money_record(self, code):
|
key = "l2_limit_up_money-{}".format(code)
|
result = RedisUtils.get(self.__get_redis(), key)
|
if result:
|
result = json.loads(result)
|
return result[0], result[1]
|
else:
|
return 0, -1
|
|
# 矫正数据
|
# 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
|
def verify_num(self, code, num, time_str):
|
# 记录买1矫正日志
|
logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
|
time_ = time_str.replace(":", "")
|
key = None
|
# 获取矫正时间前1分钟的数据
|
keys = []
|
if not constant.TEST:
|
for i in range(0, 3600):
|
temp_time = tool.trade_time_add_second(time_str, 0 - i)
|
# 只处理9:30后的数据
|
if int(temp_time.replace(":", "")) < int("093000"):
|
break
|
keys_ = self.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
|
if len(keys_) > 0:
|
keys.append(keys_[0])
|
if len(keys) >= 1:
|
break
|
else:
|
keys_ = self.__get_l2_second_money_record_keys(code, "*")
|
key_list = []
|
for k in keys_:
|
time__ = k.split("-")[-1]
|
key_list.append((int(time__), k))
|
key_list.sort(key=lambda tup: tup[0])
|
for t in key_list:
|
if t[0] <= int(time_):
|
keys.append(t[1])
|
break
|
|
keys.sort(key=lambda tup: int(tup.split("-")[-1]))
|
if len(keys) > 0:
|
key = keys[0]
|
val = RedisUtils.get(self.__get_redis(), key)
|
old_num, old_from, old_to = self.__format_second_money_record_val(val)
|
end_index = old_to
|
# 保存最近的数据
|
self.__set_l2_latest_money_record(code, end_index, num)
|
logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
|
else:
|
logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
|
# 取消此种方法
|
#
|
# for i in range(4, -2, -2):
|
# # 获取本(分钟/小时/天)内秒分布数据
|
# time_regex = "{}*".format(time_[:i])
|
# keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
|
# if keys_ and len(keys_) > 1:
|
# # 需要排序
|
# keys = []
|
# for k in keys_:
|
# keys.append(k)
|
# keys.sort(key=lambda tup: int(tup.split("-")[-1]))
|
# # if i == 4:
|
# # keys=keys[:5]
|
# # 有2个元素
|
# for index in range(0, len(keys) - 1):
|
# time_1 = keys[index].split("-")[-1]
|
# time_2 = keys[index + 1].split("-")[-1]
|
# if int(time_1) <= int(time_) <= int(time_2):
|
# # 在此时间范围内
|
# if time_ == time_2:
|
# key = keys[index + 1]
|
# else:
|
# key = keys[index]
|
# break
|
# if key:
|
# break
|
# # 如果没有找到匹配的区间
|
# if not key:
|
# # 最后一条数据的时间为相应的区间
|
# total_datas = local_today_datas[code]
|
#
|
# if key:
|
# val = cls.__get_redis().get(key)
|
# old_num, old_from, old_to = cls.__format_second_money_record_val(val)
|
# end_index = old_to
|
# # 保存最近的数据
|
# cls.__set_l2_latest_money_record(code, end_index, num)
|
# logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
|
|
# 计算量,用于涨停封单量的计算
|
def __compute_num(self, code, data, buy_single_data):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
|
# 涨停买撤与卖
|
return 0 - int(data["val"]["num"]) * data["re"]
|
else:
|
# 卖撤
|
if L2DataUtil.is_sell_cancel(data["val"]):
|
# 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
|
if l2_data_util.is_sell_index_before_target(data, buy_single_data,
|
local_today_num_operate_map.get(code)):
|
return 0
|
|
return int(data["val"]["num"]) * data["re"]
|
|
def clear(self, code):
|
key = "l2_limit_up_money-{}".format(code)
|
RedisUtils.delete(self.__get_redis(), key)
|
|
# 返回取消的标志数据
|
# with_cancel 是否需要判断是否撤销
|
def process_data(self, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
|
with_cancel=True):
|
if buy_single_begin_index is None or buy_exec_index is None:
|
return None, None
|
start_time = round(time.time() * 1000)
|
total_datas = local_today_datas[code]
|
time_dict_num = {}
|
# 记录计算的坐标
|
time_dict_num_index = {}
|
# 坐标-量的map
|
num_dict = {}
|
# 统计时间分布
|
time_dict = {}
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict:
|
time_dict[time_] = i
|
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict_num:
|
time_dict_num[time_] = 0
|
time_dict_num_index[time_] = {"s": i, "e": i}
|
time_dict_num_index[time_]["e"] = i
|
num = self.__compute_num(code, data, total_datas[buy_single_begin_index])
|
num_dict[i] = num
|
time_dict_num[time_] = time_dict_num[time_] + num
|
for t_ in time_dict_num:
|
self.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
|
time_dict_num_index[t_]["e"])
|
|
print("保存涨停封单额时间:", round(time.time() * 1000) - start_time)
|
|
# 累计最新的金额
|
total_num, index = self.__get_l2_latest_money_record(code)
|
record_msg = f"同花顺买1信息 {total_num},{index}"
|
|
if index == -1:
|
# 没有获取到最新的矫正封单额,需要从买入信号开始点计算
|
index = buy_single_begin_index - 1
|
total_num = 0
|
|
cancel_index = None
|
cancel_msg = None
|
# 待计算量
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
min_volumn = round(10000000 / (limit_up_price * 100))
|
min_volumn_big = min_volumn * 5
|
# 不同时间的数据开始坐标
|
time_start_index_dict = {}
|
# 数据时间分布
|
time_list = []
|
# 到当前时间累积的买1量
|
time_total_num_dict = {}
|
|
# 大单撤销笔数
|
cancel_big_num_count = 0
|
buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
|
|
# 获取最大封单额
|
max_buy1_volume = self._thsBuy1VolumnManager.get_max_buy1_volume_cache(code)
|
|
# 从同花顺买1矫正过后的位置开始计算,到end_index结束
|
|
for i in range(index + 1, end_index + 1):
|
data = total_datas[i]
|
# 统计撤销数量
|
try:
|
if big_money_num_manager.is_big_num(data["val"]):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
cancel_big_num_count += int(data["re"])
|
# 获取是否在买入执行信号周围2s
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
|
local_today_buyno_map.get(
|
code))
|
if buy_index is not None:
|
# 相差1s
|
buy_time = total_datas[buy_index]["val"]["time"]
|
if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
|
cancel_big_num_count += int(data["re"])
|
|
elif L2DataUtil.is_limit_up_price_buy(data["val"]):
|
cancel_big_num_count -= int(data["re"])
|
except Exception as e:
|
logging.exception(e)
|
|
threshold_rate = 0.5
|
if cancel_big_num_count >= 0:
|
if cancel_big_num_count < 10:
|
threshold_rate = threshold_rate - cancel_big_num_count * 0.01
|
else:
|
threshold_rate = threshold_rate - 10 * 0.01
|
|
time_ = data["val"]["time"]
|
if time_ not in time_start_index_dict:
|
# 记录每一秒的开始位置
|
time_start_index_dict[time_] = i
|
# 记录时间分布
|
time_list.append(time_)
|
# 上一段时间的总数
|
time_total_num_dict[time_] = total_num
|
|
exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
|
|
val = num_dict.get(i)
|
if val is None:
|
val = self.__compute_num(code, data, total_datas[buy_single_begin_index])
|
total_num += val
|
# 在处理数据的范围内,就需要判断是否要撤单了
|
if start_index <= i <= end_index:
|
# 如果是减小项
|
if val < 0:
|
# 当前量小于最大量的24%则需要取消
|
if exec_time_offset >= constant.S_CANCEL_EXPIRE_TIME:
|
if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
|
cancel_index = i
|
cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
|
break
|
# 累计封单金额小于1000万
|
if total_num < min_volumn:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "封单金额小于1000万,为{}".format(total_num)
|
break
|
# 相邻2s内的数据减小50%
|
# 上1s的总数
|
last_second_total_volumn = time_total_num_dict.get(time_list[-1])
|
if last_second_total_volumn > 0 and (
|
last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
# 相邻2s内的数据减小50%
|
cancel_index = i
|
cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
|
total_num)
|
break
|
# 记录中有上2个数据
|
if len(time_list) >= 2:
|
# 倒数第2个数据
|
last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
|
if last_2_second_total_volumn > 0:
|
if last_2_second_total_volumn > last_second_total_volumn > total_num:
|
dif = last_2_second_total_volumn - total_num
|
if dif / last_2_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
|
last_2_second_total_volumn,
|
last_second_total_volumn,
|
total_num)
|
break
|
|
if not with_cancel:
|
cancel_index = None
|
|
print("封单额计算时间:", round(time.time() * 1000) - start_time)
|
process_end_index = end_index
|
if cancel_index:
|
process_end_index = cancel_index
|
# 保存最新累计金额
|
# cls.__set_l2_latest_money_record(code, process_end_index, total_num)
|
# l2_data_log.l2_time(code, round(time.time() * 1000) - start_time,
|
# "l2数据封单额计算时间",
|
# False)
|
if cancel_index:
|
l2_log.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
|
total_num)
|
return total_datas[cancel_index], cancel_msg
|
return None, None
|
|
|
# ---------------------------------板上卖-----------------------------
|
# 涨停卖统计
|
class L2LimitUpSellStatisticUtil:
|
__db = 0
|
__redisManager = redis_manager.RedisManager(0)
|
__limit_up_sell_num_cache = {}
|
__limit_up_sell_index_cache = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(L2LimitUpSellStatisticUtil, cls).__new__(cls, *args, **kwargs)
|
# 初始化
|
cls.load_data()
|
return cls.__instance
|
|
@classmethod
|
def load_data(cls):
|
redis_ = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(redis_, "limit_up_sell_num-*")
|
for k in keys:
|
code = k.split["-"][-1]
|
cls.__limit_up_sell_num_cache[code] = RedisUtils.get(redis_, k)
|
keys = RedisUtils.keys(redis_, "limit_up_sell_index-*")
|
for k in keys:
|
code = k.split["-"][-1]
|
cls.__limit_up_sell_index_cache[code] = RedisUtils.get(redis_, k)
|
finally:
|
RedisUtils.realse(redis_)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
# 新增卖数据
|
def __incre_sell_data(self, code, num):
|
if code not in self.__limit_up_sell_num_cache:
|
self.__limit_up_sell_num_cache[code] = 0
|
self.__limit_up_sell_num_cache[code] += num
|
key = "limit_up_sell_num-{}".format(code)
|
RedisUtils.incrby_async(self.__db, key, num)
|
RedisUtils.expire_async(self.__db, key, tool.get_expire())
|
|
def __get_sell_data(self, code):
|
key = "limit_up_sell_num-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return 0
|
return int(val)
|
|
def __get_sell_data_cache(self, code):
|
if code in self.__limit_up_sell_num_cache:
|
return int(self.__limit_up_sell_num_cache[code])
|
return 0
|
|
def __save_process_index(self, code, index):
|
tool.CodeDataCacheUtil.set_cache(self.__limit_up_sell_index_cache, code, index)
|
key = "limit_up_sell_index-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), index)
|
|
def __get_process_index(self, code):
|
key = "limit_up_sell_index-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return -1
|
return int(val)
|
|
def __get_process_index_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_sell_index_cache, code)
|
if cache_result[0]:
|
return int(cache_result[1])
|
return -1
|
|
# 清除数据,当取消成功与买入之前需要清除数据
|
|
def delete(self, code):
|
tool.CodeDataCacheUtil.clear_cache(self.__limit_up_sell_index_cache, code)
|
tool.CodeDataCacheUtil.clear_cache(self.__limit_up_sell_num_cache, code)
|
key = "limit_up_sell_num-{}".format(code)
|
RedisUtils.delete_async(self.__db, key)
|
key = "limit_up_sell_index-{}".format(code)
|
RedisUtils.delete_async(self.__db, key)
|
|
def clear(self):
|
keys = RedisUtils.keys(self.__get_redis(), "limit_up_sell_num-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
self.delete(code)
|
|
# 处理数据,返回是否需要撤单
|
# 处理范围:买入执行位-当前最新位置
|
|
def process(self, code, start_index, end_index, buy_exec_index):
|
# 获取涨停卖的阈值
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
|
# 大于自由流通市值的4.8%
|
threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
|
total_num = self.__get_sell_data_cache(code)
|
cancel_index = None
|
process_index = self.__get_process_index_cache(code)
|
total_datas = local_today_datas.get(code)
|
for i in range(start_index, end_index + 1):
|
if i < buy_exec_index:
|
continue
|
if i <= process_index:
|
continue
|
if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
|
num = int(total_datas[i]["val"]["num"])
|
self.__incre_sell_data(code, num)
|
total_num += num
|
if total_num > threshold_num:
|
cancel_index = i
|
break
|
if cancel_index is not None:
|
process_index = cancel_index
|
else:
|
process_index = end_index
|
l2_log.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
|
threshold_num)
|
|
self.__save_process_index(code, process_index)
|
if cancel_index is not None:
|
return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
|
return None, ""
|
|
|
class LatestCancelIndexManager:
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__latest_cancel_index_cache = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(LatestCancelIndexManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "latest_cancel_index-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = int(val)
|
CodeDataCacheUtil.set_cache(cls.__latest_cancel_index_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
def __save_latest_cancel_index(self, code, index):
|
RedisUtils.setex_async(self.__db, f"latest_cancel_index-{code}", tool.get_expire(), index)
|
|
def set_latest_cancel_index(self, code, index):
|
CodeDataCacheUtil.set_cache(self.__latest_cancel_index_cache, code, index)
|
self.__save_latest_cancel_index(code, index)
|
|
def get_latest_cancel_index_cache(self, code):
|
result = CodeDataCacheUtil.get_cache(self.__latest_cancel_index_cache, code)
|
if result[0]:
|
return result[1]
|
return None
|
|
pass
|
|
|
if __name__ == "__main__":
|
pass
|