"""
|
L2撤单策略
|
"""
|
# ---------------------------------S撤-------------------------------
|
|
# s级平均大单计算
|
# 计算范围到申报时间的那一秒
|
import json
|
import logging
|
import time
|
|
import big_money_num_manager
|
import constant
|
import gpcode_manager
|
import l2_data_util
|
from db import redis_manager
|
import tool
|
from l2.safe_count_manager import BuyL2SafeCountManager
|
from l2.transaction_progress import TradeBuyQueue
|
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
|
from l2 import l2_log, l2_data_log, l2_data_source_util
|
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
|
from log import logger_buy_1_volumn, logger_l2_h_cancel, logger_l2_s_cancel
|
|
|
class SecondCancelBigNumComputer:
|
__redis_manager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __getRedis(cls):
|
return cls.__redis_manager.getRedis()
|
|
# 保存结束位置
|
@classmethod
|
def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
|
key = "s_big_num_cancel_compute_data-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
|
|
@classmethod
|
def __get_compute_data(cls, code):
|
key = "s_big_num_cancel_compute_data-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return -1, 0, 0
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
@classmethod
|
def __clear_data(cls, code):
|
ks = ["s_big_num_cancel_compute_data-{}".format(code)]
|
for key in ks:
|
cls.__getRedis().delete(key)
|
|
@classmethod
|
def clear_data(cls):
|
ks = ["s_big_num_cancel_compute_data-*"]
|
for key in ks:
|
keys = cls.__getRedis().keys(key)
|
for k in keys:
|
cls.__getRedis().delete(k)
|
|
# 计算净大单
|
@classmethod
|
def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, place_order_count):
|
# 获取大单的最小手数
|
left_big_num = 0
|
# 点火大单数量
|
fire_count = 5
|
if place_order_count <= 4:
|
fire_count = 6 - place_order_count
|
else:
|
fire_count = 2
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
# 去除非大单
|
if not l2_data_util.is_big_money(val):
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
if i - buy_single_index < fire_count:
|
# 点火大单不算
|
left_big_num += 0
|
else:
|
left_big_num += val["num"] * data["re"]
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and start_index <= buy_index <= end_index:
|
if buy_index - buy_single_index < fire_count:
|
left_big_num -= 0
|
else:
|
left_big_num -= val["num"] * data["re"]
|
elif buy_index is None:
|
# 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只判断S级撤销,只有s级撤销才有可能相等
|
if max_space - min_space <= 1:
|
buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
|
if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
|
buy_time.replace(":", "")) <= int(
|
total_data[end_index]["val"]["time"].replace(":", "")):
|
left_big_num -= val["num"] * data["re"]
|
return left_big_num
|
|
@classmethod
|
def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
|
need_cancel=True):
|
if start_index >= 217:
|
print("进入调试")
|
# 只守护30s
|
if tool.trade_time_sub(total_data[start_index]["val"]["time"],
|
total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
|
return False, None
|
l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
|
|
if tool.trade_time_sub(total_data[end_index]["val"]["time"],
|
total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
|
# 结束位置超过了执行位置30s,需要重新确认结束位置
|
for i in range(end_index, start_index - 1, -1):
|
if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
|
end_index = i
|
break
|
|
# 获取处理进度
|
process_index_old, buy_num, cancel_num = cls.__get_compute_data(code)
|
|
# 如果start_index与buy_single_index相同,即是下单后的第一次计算
|
# 需要查询买入信号之前的同1s是否有涨停撤的数据
|
process_index = process_index_old
|
# 下单次数
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
|
if buy_single_index == start_index:
|
# 第1次计算需要计算买入信号-执行位的净值
|
left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
|
total_data, place_order_count)
|
buy_num += left_big_num
|
# 设置买入信号-买入执行位的数据不需要处理
|
start_index = end_index + 1
|
process_index = end_index
|
# for i in range(buy_single_index - 1, 0, -1):
|
# data = total_data[i]
|
# val = data["val"]
|
# if val["time"] != total_data[buy_single_index]["val"]["time"]:
|
# break
|
# if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
|
# # 涨停买撤销且撤销的间隔时间为0
|
# # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
|
# buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
# local_today_num_operate_map.get(
|
# code))
|
# if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
# # 在买入信号之后
|
# cls.__save_cancel_data(code, i)
|
|
try:
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
process_index = i
|
if process_index_old >= i:
|
# 已经处理过的数据不需要处理
|
continue
|
if not l2_data_util.is_big_money(val):
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
buy_num += data["re"] * int(val["num"])
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and buy_single_index <= buy_index:
|
cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
|
elif buy_index is None:
|
# 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只判断S级撤销,只有s级撤销才有可能相等
|
if max_space - min_space <= 1:
|
buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
|
if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
|
buy_time.replace(":", "")):
|
cancel_num += data["re"] * int(val["num"])
|
|
# 保存数据
|
|
if need_cancel:
|
cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
|
|
if place_order_count <= 1:
|
cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
|
elif place_order_count <= 2:
|
cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
|
else:
|
cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
|
if cancel_num / max(buy_num, 1) > cancel_rate_threshold:
|
return True, total_data[i]
|
finally:
|
|
l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
|
buy_num, round(cancel_num / max(buy_num, 1), 2))
|
|
# 保存处理进度与数据
|
cls.__save_compute_data(code, process_index, buy_num, cancel_num)
|
return False, None
|
|
# 下单成功
|
@classmethod
|
def cancel_success(cls, code):
|
cls.__clear_data(code)
|
|
|
# --------------------------------H撤-------------------------------
|
class HourCancelBigNumComputer:
|
__redis_manager = redis_manager.RedisManager(0)
|
__tradeBuyQueue = TradeBuyQueue()
|
__buyL2SafeCountManager = BuyL2SafeCountManager()
|
|
@classmethod
|
def __getRedis(cls):
|
return cls.__redis_manager.getRedis()
|
|
# 保存成交位置到执行位置的揽括范围数据
|
@classmethod
|
def __save_watch_index_set(cls, code, datas, process_index, finish):
|
key = f"h_cancel_watch_indexs-{code}"
|
cls.__getRedis().setex(key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
|
|
# 保存成交进度
|
@classmethod
|
def __get_watch_index_set(cls, code):
|
key = f"h_cancel_watch_indexs-{code}"
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, -1, False
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
# 保存执行位置后面的守护数据
|
@classmethod
|
def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished):
|
key = f"h_cancel_watch_indexs_exec-{code}"
|
cls.__getRedis().setex(key, tool.get_expire(),
|
json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
|
|
# 保存成交进度
|
@classmethod
|
def __get_watch_index_set_after_exec(cls, code):
|
key = f"h_cancel_watch_indexs_exec-{code}"
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return [], -1, 0, 0, False
|
val = json.loads(val)
|
return val[0], val[1], val[2], val[3], val[4]
|
|
# 保存成交进度
|
@classmethod
|
def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
|
key = "h_cancel_traded_progress-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index)))
|
|
@classmethod
|
def __get_traded_progress(cls, code):
|
key = "h_cancel_traded_progress-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], val[1]
|
|
# 保存结算位置
|
@classmethod
|
def __save_compute_data(cls, code, process_index, cancel_num):
|
key = "h_cancel_compute_data-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, cancel_num)))
|
|
@classmethod
|
def __get_compute_data(cls, code):
|
key = "h_cancel_compute_data-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return -1, 0
|
val = json.loads(val)
|
return val[0], val[1]
|
|
@classmethod
|
def __del_compute_data(cls, code):
|
key = "h_cancel_compute_data-{}".format(code)
|
cls.__getRedis().delete(key)
|
|
@classmethod
|
def __clear_data(cls, code):
|
ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
|
f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}"]
|
for key in ks:
|
cls.__getRedis().delete(key)
|
|
@classmethod
|
def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map):
|
time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
|
total_data[buy_exec_index]["val"]["time"])
|
if time_space >= constant.S_CANCEL_EXPIRE_TIME - 1:
|
# 开始计算需要监控的单
|
cls.__compute_watch_indexs_after_exec(code, buy_exec_index, total_data, local_today_num_operate_map)
|
|
# 守护30s以外的数据
|
if time_space <= constant.S_CANCEL_EXPIRE_TIME:
|
return False, None
|
# 获取成交进度
|
origin_progress_index, latest_progress_index = cls.__get_traded_progress(code)
|
# 监听的数据
|
watch_indexs_dict = {}
|
total_nums = 0
|
if origin_progress_index is not None:
|
# 获取成交位置到执行位置的监控数据
|
watch_indexs = cls.__get_watch_index_set(code)[0]
|
# 监听的总数
|
for indexs in watch_indexs:
|
index = indexs[0]
|
if index < latest_progress_index:
|
continue
|
# 只计算最近的执行位之后的数据
|
watch_indexs_dict[index] = indexs
|
total_nums += total_data[index]["val"]["num"] * indexs[2]
|
# 获取到执行位后的监听数据
|
datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec(code)
|
if datas:
|
for indexs in datas:
|
index = indexs[0]
|
watch_indexs_dict[index] = indexs
|
total_nums += total_data[index]["val"]["num"] * indexs[2]
|
|
processed_index, cancel_num = cls.__get_compute_data(code)
|
|
l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
|
# 获取下单次数
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
|
if place_order_count <= 1:
|
cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
|
elif place_order_count <= 2:
|
cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
|
else:
|
cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
|
process_index = start_index
|
try:
|
for i in range(start_index, end_index + 1):
|
if i <= processed_index:
|
# 已经处理过了
|
continue
|
process_index = i
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 查询买入位置
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
|
local_today_num_operate_map)
|
if buy_index is not None and buy_index in watch_indexs_dict:
|
cancel_num += data["re"] * val["num"]
|
if cancel_num / total_nums > cancel_rate_threshold:
|
return True, data
|
finally:
|
l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
|
process_index, cancel_num,
|
total_nums)
|
logger_l2_h_cancel.info(
|
f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
|
# 保存处理进度与数据
|
cls.__save_compute_data(code, process_index, cancel_num)
|
return False, None
|
|
# 下单成功
|
@classmethod
|
def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
|
cls.__clear_data(code)
|
|
# 设置成交进度
|
@classmethod
|
def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
|
cls.__tradeBuyQueue.set_traded_index(code, index)
|
# 如果获取时间与执行时间小于29则不需要处理
|
if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time,
|
total_data[buy_exec_index]["val"][
|
"time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
|
return
|
# 保存成交进度
|
origin_index, latest_index = cls.__get_traded_progress(code)
|
if origin_index is None:
|
cls.__save_traded_progress(code, index, index)
|
# 计算揽括范围
|
cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
|
local_today_num_operate_map)
|
else:
|
cls.__save_traded_progress(code, origin_index, index)
|
logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
|
|
# 涨停买是否撤单
|
@classmethod
|
def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
|
data = None
|
try:
|
data = total_data[index]
|
except:
|
print("")
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val):
|
# 判断当前买是否已经买撤
|
cancel_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(val["num"], "1", val["price"]))
|
canceled = False
|
if cancel_datas:
|
for cancel_data in cancel_datas:
|
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
|
local_today_num_operate_map)
|
if buy_index == index:
|
canceled = True
|
count = data["re"] - cancel_data["re"]
|
if count > 0:
|
return count
|
break
|
if not canceled:
|
count = data["re"]
|
return count
|
return 0
|
|
# 计算排名前N的大单
|
|
# 过时数据
|
@classmethod
|
def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count):
|
# 找到还未撤的TOPN大单
|
watch_set = set()
|
for i in range(start_index, total_data[-1]["index"] + 1):
|
not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
|
local_today_num_operate_map)
|
if not_cancel_count > 0:
|
watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
|
# 针按照手数排序
|
watch_list = list(watch_set)
|
watch_list.sort(key=lambda tup: tup[1])
|
watch_list.reverse()
|
watch_list = watch_list[:count]
|
watch_set = set(watch_list)
|
return watch_set
|
|
# 从成交位置到执行位置
|
@classmethod
|
def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data,
|
local_today_num_operate_map):
|
total_count = 0
|
watch_set = set()
|
big_num_count = 0
|
for i in range(progress_index, buy_exec_index):
|
left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
|
if left_count > 0:
|
data = total_data[i]
|
val = data["val"]
|
if val["num"] * float(val["price"]) <= 9900:
|
continue
|
total_count += left_count
|
watch_set.add((i, val["num"], left_count))
|
|
if l2_data_util.is_big_money(val):
|
big_num_count += data["re"]
|
|
final_watch_list = list(watch_set)
|
final_watch_list.sort(key=lambda x: x[0])
|
logger_l2_h_cancel.info(f"code-{code} H撤监控成交位到执行位:{final_watch_list}")
|
cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
|
# 删除原来的计算数据
|
# cls.__del_compute_data(code)
|
|
# 计算执行位置之后的需要监听的数据
|
@classmethod
|
def __compute_watch_indexs_after_exec(cls, code, buy_exec_index, total_data, local_today_num_operate_map):
|
watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec(
|
code)
|
if watch_list and finish:
|
# 已经计算完了不需要再进行计算
|
return
|
watch_set = set()
|
if watch_list:
|
for data in watch_list:
|
watch_set.add((data[0], data[1], data[2]))
|
|
# 暂时不需要使用
|
process_index = process_index_old
|
finished = False
|
big_num_count = big_num_count_old
|
total_count = total_count_old
|
# H撤单
|
MIN_H_COUNT = l2_trade_factor.L2TradeFactorUtil.get_h_cancel_min_count(code)
|
|
for i in range(buy_exec_index, total_data[-1]["index"] + 1):
|
if i <= process_index_old:
|
continue
|
process_index = i
|
left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
|
if left_count > 0:
|
data = total_data[i]
|
val = data["val"]
|
if val["num"] * float(val["price"]) <= 9900:
|
continue
|
total_count += left_count
|
watch_set.add((i, val["num"], left_count))
|
|
if l2_data_util.is_big_money(val):
|
big_num_count += data["re"]
|
|
# 判断是否达到阈值
|
if total_count >= MIN_H_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT: # and total_num >= threshold_num
|
finished = True
|
l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
|
total_data[-1]["index"])
|
break
|
|
final_watch_list = list(watch_set)
|
final_watch_list.sort(key=lambda x: x[0])
|
logger_l2_h_cancel.info(f"code-{code} H撤监控执行位相邻单:{final_watch_list}")
|
# 保存计算范围
|
cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
|
finished)
|
# 删除原来的计算数据
|
# cls.__del_compute_data(code)
|
|
|
# --------------------------------封单额变化撤------------------------
|
# 涨停封单额统计
|
class L2LimitUpMoneyStatisticUtil:
|
_redisManager = redis_manager.RedisManager(1)
|
_thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
|
|
@classmethod
|
def __get_redis(cls):
|
return cls._redisManager.getRedis()
|
|
# 设置l2的每一秒涨停封单额数据
|
@classmethod
|
def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
|
old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
|
if old_num is None:
|
old_num = num
|
old_from = from_index
|
old_to = to_index
|
else:
|
old_num += num
|
old_to = to_index
|
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
|
cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
|
|
@classmethod
|
def __get_l2_second_money_record(cls, code, time):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
val = cls.__get_redis().get(key)
|
return cls.__format_second_money_record_val(val)
|
|
@classmethod
|
def __format_second_money_record_val(cls, val):
|
if val is None:
|
return None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
@classmethod
|
def __get_l2_second_money_record_keys(cls, code, time_regex):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
|
keys = cls.__get_redis().keys(key)
|
return keys
|
|
# 设置l2最新的封单额数据
|
@classmethod
|
def __set_l2_latest_money_record(cls, code, index, num):
|
key = "l2_limit_up_money-{}".format(code)
|
cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
|
|
# 返回数量,索引
|
@classmethod
|
def __get_l2_latest_money_record(cls, code):
|
key = "l2_limit_up_money-{}".format(code)
|
result = cls.__get_redis().get(key)
|
if result:
|
result = json.loads(result)
|
return result[0], result[1]
|
else:
|
return 0, -1
|
|
# 矫正数据
|
# 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
|
@classmethod
|
def verify_num(cls, code, num, time_str):
|
# 记录买1矫正日志
|
logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
|
time_ = time_str.replace(":", "")
|
key = None
|
# 获取矫正时间前1分钟的数据
|
keys = []
|
if not constant.TEST:
|
for i in range(0, 3600):
|
temp_time = tool.trade_time_add_second(time_str, 0 - i)
|
# 只处理9:30后的数据
|
if int(temp_time.replace(":", "")) < int("093000"):
|
break
|
keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
|
if len(keys_) > 0:
|
keys.append(keys_[0])
|
if len(keys) >= 1:
|
break
|
else:
|
keys_ = cls.__get_l2_second_money_record_keys(code, "*")
|
key_list = []
|
for k in keys_:
|
time__ = k.split("-")[-1]
|
key_list.append((int(time__), k))
|
key_list.sort(key=lambda tup: tup[0])
|
for t in key_list:
|
if t[0] <= int(time_):
|
keys.append(t[1])
|
break
|
|
keys.sort(key=lambda tup: int(tup.split("-")[-1]))
|
if len(keys) > 0:
|
key = keys[0]
|
val = cls.__get_redis().get(key)
|
old_num, old_from, old_to = cls.__format_second_money_record_val(val)
|
end_index = old_to
|
# 保存最近的数据
|
cls.__set_l2_latest_money_record(code, end_index, num)
|
logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
|
else:
|
logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
|
# 取消此种方法
|
#
|
# for i in range(4, -2, -2):
|
# # 获取本(分钟/小时/天)内秒分布数据
|
# time_regex = "{}*".format(time_[:i])
|
# keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
|
# if keys_ and len(keys_) > 1:
|
# # 需要排序
|
# keys = []
|
# for k in keys_:
|
# keys.append(k)
|
# keys.sort(key=lambda tup: int(tup.split("-")[-1]))
|
# # if i == 4:
|
# # keys=keys[:5]
|
# # 有2个元素
|
# for index in range(0, len(keys) - 1):
|
# time_1 = keys[index].split("-")[-1]
|
# time_2 = keys[index + 1].split("-")[-1]
|
# if int(time_1) <= int(time_) <= int(time_2):
|
# # 在此时间范围内
|
# if time_ == time_2:
|
# key = keys[index + 1]
|
# else:
|
# key = keys[index]
|
# break
|
# if key:
|
# break
|
# # 如果没有找到匹配的区间
|
# if not key:
|
# # 最后一条数据的时间为相应的区间
|
# total_datas = local_today_datas[code]
|
#
|
# if key:
|
# val = cls.__get_redis().get(key)
|
# old_num, old_from, old_to = cls.__format_second_money_record_val(val)
|
# end_index = old_to
|
# # 保存最近的数据
|
# cls.__set_l2_latest_money_record(code, end_index, num)
|
# logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
|
|
# 计算量,用于涨停封单量的计算
|
@classmethod
|
def __compute_num(cls, code, data, buy_single_data):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
|
# 涨停买撤与卖
|
return 0 - int(data["val"]["num"]) * data["re"]
|
else:
|
# 卖撤
|
if L2DataUtil.is_sell_cancel(data["val"]):
|
# 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
|
if l2_data_util.is_sell_index_before_target(data, buy_single_data,
|
local_today_num_operate_map.get(code)):
|
return 0
|
|
return int(data["val"]["num"]) * data["re"]
|
|
@classmethod
|
def clear(cls, code):
|
key = "l2_limit_up_money-{}".format(code)
|
cls.__get_redis().delete(key)
|
|
# 返回取消的标志数据
|
# with_cancel 是否需要判断是否撤销
|
@classmethod
|
def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
|
with_cancel=True):
|
if buy_single_begin_index is None or buy_exec_index is None:
|
return None, None
|
start_time = round(time.time() * 1000)
|
total_datas = local_today_datas[code]
|
time_dict_num = {}
|
# 记录计算的坐标
|
time_dict_num_index = {}
|
# 坐标-量的map
|
num_dict = {}
|
# 统计时间分布
|
time_dict = {}
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict:
|
time_dict[time_] = i
|
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict_num:
|
time_dict_num[time_] = 0
|
time_dict_num_index[time_] = {"s": i, "e": i}
|
time_dict_num_index[time_]["e"] = i
|
num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
|
num_dict[i] = num
|
time_dict_num[time_] = time_dict_num[time_] + num
|
for t_ in time_dict_num:
|
cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
|
time_dict_num_index[t_]["e"])
|
|
print("保存涨停封单额时间:", round(time.time() * 1000) - start_time)
|
|
# 累计最新的金额
|
total_num, index = cls.__get_l2_latest_money_record(code)
|
record_msg = f"同花顺买1信息 {total_num},{index}"
|
|
if index == -1:
|
# 没有获取到最新的矫正封单额,需要从买入信号开始点计算
|
index = buy_single_begin_index - 1
|
total_num = 0
|
|
cancel_index = None
|
cancel_msg = None
|
# 待计算量
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
min_volumn = round(10000000 / (limit_up_price * 100))
|
min_volumn_big = min_volumn * 5
|
# 不同时间的数据开始坐标
|
time_start_index_dict = {}
|
# 数据时间分布
|
time_list = []
|
# 到当前时间累积的买1量
|
time_total_num_dict = {}
|
|
# 大单撤销笔数
|
cancel_big_num_count = 0
|
buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
|
|
# 获取最大封单额
|
max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
|
|
# 从同花顺买1矫正过后的位置开始计算,到end_index结束
|
|
for i in range(index + 1, end_index + 1):
|
data = total_datas[i]
|
# 统计撤销数量
|
try:
|
if big_money_num_manager.is_big_num(data["val"]):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
cancel_big_num_count += int(data["re"])
|
# 获取是否在买入执行信号周围2s
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None:
|
# 相差1s
|
buy_time = total_datas[buy_index]["val"]["time"]
|
if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
|
cancel_big_num_count += int(data["re"])
|
|
elif L2DataUtil.is_limit_up_price_buy(data["val"]):
|
cancel_big_num_count -= int(data["re"])
|
except Exception as e:
|
logging.exception(e)
|
|
threshold_rate = 0.5
|
if cancel_big_num_count >= 0:
|
if cancel_big_num_count < 10:
|
threshold_rate = threshold_rate - cancel_big_num_count * 0.01
|
else:
|
threshold_rate = threshold_rate - 10 * 0.01
|
|
time_ = data["val"]["time"]
|
if time_ not in time_start_index_dict:
|
# 记录每一秒的开始位置
|
time_start_index_dict[time_] = i
|
# 记录时间分布
|
time_list.append(time_)
|
# 上一段时间的总数
|
time_total_num_dict[time_] = total_num
|
|
exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
|
|
val = num_dict.get(i)
|
if val is None:
|
val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
|
total_num += val
|
# 在处理数据的范围内,就需要判断是否要撤单了
|
if start_index <= i <= end_index:
|
# 如果是减小项
|
if val < 0:
|
# 当前量小于最大量的24%则需要取消
|
if exec_time_offset >= constant.S_CANCEL_EXPIRE_TIME:
|
if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
|
cancel_index = i
|
cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
|
break
|
# 累计封单金额小于1000万
|
if total_num < min_volumn:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "封单金额小于1000万,为{}".format(total_num)
|
break
|
# 相邻2s内的数据减小50%
|
# 上1s的总数
|
last_second_total_volumn = time_total_num_dict.get(time_list[-1])
|
if last_second_total_volumn > 0 and (
|
last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
# 相邻2s内的数据减小50%
|
cancel_index = i
|
cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
|
total_num)
|
break
|
# 记录中有上2个数据
|
if len(time_list) >= 2:
|
# 倒数第2个数据
|
last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
|
if last_2_second_total_volumn > 0:
|
if last_2_second_total_volumn > last_second_total_volumn > total_num:
|
dif = last_2_second_total_volumn - total_num
|
if dif / last_2_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
|
last_2_second_total_volumn,
|
last_second_total_volumn,
|
total_num)
|
break
|
|
if not with_cancel:
|
cancel_index = None
|
|
print("封单额计算时间:", round(time.time() * 1000) - start_time)
|
process_end_index = end_index
|
if cancel_index:
|
process_end_index = cancel_index
|
# 保存最新累计金额
|
# cls.__set_l2_latest_money_record(code, process_end_index, total_num)
|
l2_data_log.l2_time(code, round(time.time() * 1000) - start_time,
|
"l2数据封单额计算时间",
|
False)
|
if cancel_index:
|
l2_log.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
|
total_num)
|
return total_datas[cancel_index], cancel_msg
|
return None, None
|
|
|
# ---------------------------------板上卖-----------------------------
|
# 涨停卖统计
|
class L2LimitUpSellStatisticUtil:
|
_redisManager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls._redisManager.getRedis()
|
|
# 新增卖数据
|
@classmethod
|
def __incre_sell_data(cls, code, num):
|
key = "limit_up_sell_num-{}".format(code)
|
cls.__get_redis().incrby(key, num)
|
|
@classmethod
|
def __get_sell_data(cls, code):
|
key = "limit_up_sell_num-{}".format(code)
|
val = cls.__get_redis().get(key)
|
if val is None:
|
return 0
|
return int(val)
|
|
@classmethod
|
def __save_process_index(cls, code, index):
|
key = "limit_up_sell_index-{}".format(code)
|
cls.__get_redis().setex(key, tool.get_expire(), index)
|
|
@classmethod
|
def __get_process_index(cls, code):
|
key = "limit_up_sell_index-{}".format(code)
|
val = cls.__get_redis().get(key)
|
if val is None:
|
return -1
|
return int(val)
|
|
# 清除数据,当取消成功与买入之前需要清除数据
|
@classmethod
|
def delete(cls, code):
|
key = "limit_up_sell_num-{}".format(code)
|
cls.__get_redis().delete(key)
|
key = "limit_up_sell_index-{}".format(code)
|
cls.__get_redis().delete(key)
|
|
@classmethod
|
def clear(cls):
|
keys = cls.__get_redis().keys("limit_up_sell_num-*")
|
for k in keys:
|
cls.__get_redis().delete(k)
|
|
# 处理数据,返回是否需要撤单
|
# 处理范围:买入执行位-当前最新位置
|
@classmethod
|
def process(cls, code, start_index, end_index, buy_exec_index):
|
# 获取涨停卖的阈值
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
|
# 大于自由流通市值的4.8%
|
threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
|
total_num = cls.__get_sell_data(code)
|
cancel_index = None
|
process_index = cls.__get_process_index(code)
|
total_datas = local_today_datas.get(code)
|
for i in range(start_index, end_index + 1):
|
if i < buy_exec_index:
|
continue
|
if i <= process_index:
|
continue
|
if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
|
num = int(total_datas[i]["val"]["num"])
|
cls.__incre_sell_data(code, num)
|
total_num += num
|
if total_num > threshold_num:
|
cancel_index = i
|
break
|
if cancel_index is not None:
|
process_index = cancel_index
|
else:
|
process_index = end_index
|
l2_log.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
|
threshold_num)
|
|
cls.__save_process_index(code, process_index)
|
if cancel_index is not None:
|
return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
|
return None, ""
|