import datetime
|
import json
|
import logging
|
import random
|
import time as t
|
|
import big_money_num_manager
|
import code_data_util
|
import constant
|
import global_util
|
import gpcode_manager
|
import industry_codes_sort
|
import l2_data_log
|
import l2_data_manager
|
import l2_data_util
|
import l2_trade_factor
|
import l2_trade_test
|
import l2_trade_util
|
import limit_up_time_manager
|
import redis_manager
|
import ths_industry_util
|
import tool
|
import trade_manager
|
import trade_queue_manager
|
import trade_data_manager
|
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
|
local_today_num_operate_map
|
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
|
logger_l2_error
|
|
# TODO l2数据管理
|
from trade_data_manager import CodeActualPriceProcessor
|
|
|
class L2DataManager:
|
# 格式化数据
|
def format_data(self, datas):
|
format_datas = []
|
for data in datas:
|
format_datas.append({"val": data, "re": 1})
|
return format_datas
|
|
# 获取新增数据
|
def get_add_datas(self, format_datas):
|
pass
|
|
# 从数据库加载数据
|
def load_data(self, code=None, force=False):
|
pass
|
|
# 保存数据
|
def save_datas(self, add_datas, datas):
|
pass
|
|
|
# m值大单处理
|
class L2BigNumForMProcessor:
|
|
def __init__(self):
|
self._redis_manager = redis_manager.RedisManager(1)
|
|
def __get_redis(self):
|
return self._redis_manager.getRedis()
|
|
# 保存计算开始位置
|
def set_begin_pos(self, code, index):
|
if self.__get_begin_pos(code) is None:
|
# 保存位置
|
key = "m_big_money_begin-{}".format(code)
|
self.__get_redis().setex(key, tool.get_expire(), index)
|
|
# 获取计算开始位置
|
def __get_begin_pos(self, code):
|
key = "m_big_money_begin-{}".format(code)
|
val = self.__get_redis().get(key)
|
if val is None:
|
return None
|
return int(val)
|
|
# 清除已经处理的数据
|
def clear_processed_end_index(self, code):
|
key = "m_big_money_process_index-{}".format(code)
|
self.__get_redis().delete(key)
|
|
# 添加已经处理过的单
|
def __set_processed_end_index(self, code, index):
|
key = "m_big_money_process_index-{}".format(code)
|
self.__get_redis().setex(key, tool.get_expire(), index)
|
|
# 是否已经处理过
|
def __get_processed_end_index(self, code):
|
key = "m_big_money_process_index-{}".format(code)
|
val = self.__get_redis().get(key)
|
if val is None:
|
return None
|
return int(val)
|
|
# 处理大单
|
def process(self, code, start_index, end_index, limit_up_price):
|
|
begin_pos = self.__get_begin_pos(code)
|
if begin_pos is None:
|
# 没有获取到开始买入信号
|
return
|
# 上次处理到的坐标
|
processed_index = self.__get_processed_end_index(code)
|
if processed_index is None:
|
processed_index = 0
|
if processed_index >= end_index:
|
return
|
|
start_time = round(t.time() * 1000)
|
total_datas = local_today_datas[code]
|
|
num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price),
|
round(30000 / limit_up_price)]
|
total_num = 0
|
for i in range(max(start_index, processed_index), end_index + 1):
|
data = total_datas[i]
|
if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy(
|
data["val"]):
|
continue
|
# 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
# 获取买入信号
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(code))
|
if buy_index is not None and buy_index < begin_pos:
|
continue
|
|
# 计算成交金额
|
num = int(data["val"]["num"])
|
temp = 0
|
if num < num_splites[0]:
|
pass
|
elif num < num_splites[1]:
|
temp = 1
|
elif num < num_splites[2]:
|
temp = round(4 / 3, 3)
|
elif num < num_splites[3]:
|
temp = 2
|
else:
|
temp = 4
|
count = int(temp * data["re"] * 1000)
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
count = 0 - count
|
total_num += count
|
self.__set_processed_end_index(code, end_index)
|
big_money_num_manager.add_num(code, total_num)
|
|
print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index,
|
round(t.time() * 1000) - start_time))
|
|
|
class L2TradeDataProcessor:
|
unreal_buy_dict = {}
|
random_key = {}
|
l2BigNumForMProcessor = L2BigNumForMProcessor()
|
__codeActualPriceProcessor = CodeActualPriceProcessor()
|
buy1PriceManager = trade_queue_manager.Buy1PriceManager()
|
__ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
|
|
@classmethod
|
def debug(cls, code, content, *args):
|
logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def cancel_debug(cls, code, content, *args):
|
logger_l2_trade_cancel.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def buy_debug(cls, code, content, *args):
|
logger_l2_trade_buy.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
# 数据处理入口
|
# datas: 本次截图数据
|
# capture_timestamp:截图时间戳
|
def process(cls, code, datas, capture_timestamp):
|
cls.random_key[code] = random.randint(0, 100000)
|
__start_time = round(t.time() * 1000)
|
try:
|
if len(datas) > 0:
|
# 判断价格区间是否正确
|
if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
|
raise L2DataException(L2DataException.CODE_PRICE_ERROR,
|
"股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
|
# 加载历史数据
|
l2_data_manager.load_l2_data(code)
|
# 纠正数据
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if local_today_datas.get(code) is not None and len(
|
local_today_datas[code]) > 0:
|
_start_index = local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
# -------------数据增量处理------------
|
try:
|
cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
|
finally:
|
# 保存数据
|
l2_data_manager.save_l2_data(code, datas, add_datas)
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
|
"保存数据时间({})".format(len(add_datas)))
|
finally:
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
|
@classmethod
|
def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
|
if len(add_datas) > 0:
|
now_time_str = tool.get_now_time_str()
|
# 拼接数据
|
local_today_datas[code].extend(add_datas)
|
l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
|
# ---------- 判断是否需要计算大单 -----------
|
try:
|
average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
|
code, local_today_datas[code][-1])
|
# 计算平均大单
|
if average_need:
|
end_index = local_today_datas[code][-1]["index"]
|
if len(add_datas) > 0:
|
end_index = add_datas[-1]["index"]
|
AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
|
end_index)
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average(
|
code, local_today_datas[code][-1])
|
# 计算平均大单
|
if average_need:
|
end_index = local_today_datas[code][-1]["index"]
|
if len(add_datas) > 0:
|
end_index = add_datas[-1]["index"]
|
SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
|
end_index)
|
except Exception as e:
|
logging.exception(e)
|
|
# 第1条数据是否为09:30:00
|
if add_datas[0]["val"]["time"] == "09:30:00":
|
if global_util.cuurent_prices.get(code):
|
price_data = global_util.cuurent_prices.get(code)
|
if price_data[1]:
|
# 当前涨停价,设置涨停时间
|
logger_l2_process.info("开盘涨停:{}", code)
|
# 保存涨停时间
|
limit_up_time_manager.save_limit_up_time(code, "09:30:00")
|
|
total_datas = local_today_datas[code]
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
|
|
if len(add_datas) > 0:
|
latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
|
# 时间差不能太大才能处理
|
if l2_data_manager.L2DataUtil.is_same_time(now_time_str,
|
latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(
|
code):
|
# 判断是否已经挂单
|
state = trade_manager.get_trade_state(code)
|
start_index = len(total_datas) - len(add_datas)
|
end_index = len(total_datas) - 1
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
|
# 已挂单
|
cls.__process_order(code, start_index, end_index, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, start_index, end_index, capture_timestamp)
|
|
logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
|
add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
|
capture_timestamp)
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
|
|
# 处理未挂单
|
@classmethod
|
def __process_not_order(cls, code, start_index, end_index, capture_time):
|
__start_time = round(t.time() * 1000)
|
# 获取阈值
|
threshold_money, msg = cls.__get_threshmoney(code)
|
if round(t.time() * 1000) - __start_time > 10:
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时")
|
cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
|
|
# 测试专用
|
@classmethod
|
def process_order(cls, code, start_index, end_index, capture_time, new_add=True):
|
cls.__process_order(code, start_index, end_index, capture_time, new_add)
|
|
# 处理已挂单
|
@classmethod
|
def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
|
if start_index < 0:
|
start_index = 0
|
|
if end_index < start_index:
|
return
|
# 获取买入信号起始点
|
buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
|
code)
|
|
# 撤单计算,只看买1
|
cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
|
buy_single_index, buy_exec_index)
|
|
# 撤单计算,看秒级大单撤单
|
try:
|
b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
|
buy_exec_index, start_index,
|
end_index)
|
if b_need_cancel and not cancel_data:
|
cancel_data = b_cancel_data
|
cancel_msg = "申报时间截至大单撤销比例触发阈值"
|
except Exception as e:
|
logging.exception(e)
|
|
# 撤单计算,看分钟级大单撤单
|
try:
|
b_need_cancel, b_cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index,
|
start_index, end_index)
|
if b_need_cancel and not cancel_data:
|
cancel_data = b_cancel_data
|
cancel_msg = "1分钟内大单撤销比例触发阈值"
|
except Exception as e:
|
logging.exception(e)
|
|
if not cancel_data:
|
# 统计板上卖
|
try:
|
cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, end_index,
|
buy_exec_index)
|
except Exception as e:
|
logging.exception(e)
|
|
# 计算m值大单
|
cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
|
gpcode_manager.get_limit_up_price(code))
|
if cancel_data:
|
if cancel_data["index"] == 175:
|
print("进入调试")
|
cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
|
# 撤单
|
if cls.cancel_buy(code, cancel_msg):
|
# 撤单成功,继续计算下单
|
cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
|
else:
|
# 撤单尚未成功
|
pass
|
|
else:
|
# 如果有虚拟下单需要真实下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
|
unreal_buy_info[0])
|
|
# 判断是否需要计算长大单的信息
|
try:
|
LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index)
|
except Exception as e:
|
logging.exception(e)
|
|
@classmethod
|
def __buy(cls, code, capture_timestamp, last_data, last_data_index):
|
can, reason = cls.__can_buy(code)
|
# 删除虚拟下单
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
|
if not can:
|
cls.debug(code, "不可以下单,原因:{}", reason)
|
if not reason.startswith("买1价不为涨停价"):
|
# 中断买入
|
trade_manager.break_buy(code, reason)
|
return
|
else:
|
cls.debug(code, "可以下单,原因:{}", reason)
|
try:
|
cls.debug(code, "开始执行买入")
|
trade_manager.start_buy(code, capture_timestamp, last_data,
|
last_data_index)
|
trade_data_manager.placeordercountmanager.place_order(code)
|
# 获取买入位置信息
|
try:
|
buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
|
code)
|
SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
|
AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
|
LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_error.exception(e)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
cls.debug(code, "执行买入成功")
|
except Exception as e:
|
cls.debug(code, "执行买入异常:{}", str(e))
|
pass
|
finally:
|
cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
|
|
# 是否可以取消
|
@classmethod
|
def __can_cancel(cls, code):
|
if constant.TEST:
|
return True, ""
|
# 暂时注释掉
|
# 14点后如果是板块老大就不需要取消了
|
# now_time_str = tool.get_now_time_str()
|
# if int(now_time_str.replace(":", "")) >= 140000:
|
# industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
|
# if industry is None:
|
# return True, "没有获取到行业"
|
# codes_index = industry_codes_sort.sort_codes(codes, code)
|
# if codes_index is not None and codes_index.get(code) is not None:
|
# # 同一板块中老二后面的不能买
|
# if codes_index.get(code) == 0:
|
# return False, "14:00后老大不能撤单"
|
# elif codes_index.get(code) == 1:
|
# # 判断老大是否都是09:30:00涨停的
|
# # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
|
# first_count = 0
|
# for key in codes_index:
|
# if codes_index[key] == 0:
|
# first_count += 1
|
# if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
|
# first_count -= 1
|
# if first_count == 0:
|
# return False, "14:00后老大都开盘涨停,老二不能撤单"
|
|
return True, ""
|
|
# 是否可以买
|
@classmethod
|
def __can_buy(cls, code):
|
# 买1价格必须为涨停价才能买
|
# buy1_price = cls.buy1PriceManager.get_price(code)
|
# if buy1_price is None:
|
# return False, "买1价尚未获取到"
|
# limit_up_price = gpcode_manager.get_limit_up_price(code)
|
# if limit_up_price is None:
|
# return False, "尚未获取到涨停价"
|
# if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
|
# return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
|
# 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
|
total_datas = local_today_datas[code]
|
try:
|
sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
|
cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
|
if sell1_time is not None and sell1_volumn > 0:
|
# 获取执行位信息
|
|
buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
|
code)
|
buy_nums = num
|
for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
|
_val = total_datas[i]["val"]
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
if buy_nums < sell1_volumn * 0.49:
|
return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
|
except Exception as e:
|
logging.exception(e)
|
|
# 量比超过1.3的不能买
|
volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
|
if volumn_rate >= 1.3:
|
return False, "最大量比超过1.3不能买"
|
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
|
limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second(
|
"14:30:00"):
|
return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
|
|
# 同一板块中老二后面的不能买
|
industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
|
if industry is None:
|
return True, "没有获取到行业"
|
|
codes_index = industry_codes_sort.sort_codes(codes, code)
|
if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
|
# 当老大老二当前没涨停
|
return False, "同一板块中老三,老四,...不能买"
|
|
if cls.__codeActualPriceProcessor.is_under_water(code,total_datas[-1]["val"]["time"]):
|
# 水下捞且板块中的票小于16不能买
|
# if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
|
# industry) <= 16:
|
# return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
|
# 水下捞自由流通市值大于老大的不要买
|
if codes_index.get(code) != 0:
|
# 获取老大的市值
|
for c in codes_index:
|
if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
|
return False, "水下捞,不是老大,且自由流通市值大于老大"
|
|
# 13:30后涨停,本板块中涨停票数<29不能买
|
# if limit_up_time is not None:
|
# if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
|
# if global_util.industry_hot_num.get(industry) < 16:
|
# return False, "13:30后涨停,本板块中涨停票数<16不能买"
|
|
if codes_index.get(code) is not None and codes_index.get(code) == 1:
|
# 如果老大已经买成功了, 老二就不需要买了
|
first_codes = []
|
for key in codes_index:
|
if codes_index.get(key) == 0:
|
first_codes.append(key)
|
# 暂时注释掉
|
# for key in first_codes:
|
# state = trade_manager.get_trade_state(key)
|
# if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
|
# # 老大已经买成功了
|
# return False, "老大{}已经买成功,老二无需购买".format(key)
|
#
|
# # 有9点半涨停的老大才能买老二,不然不能买
|
# # 获取老大的涨停时间
|
# for key in first_codes:
|
# # 找到了老大
|
# time_ = limit_up_time_manager.get_limit_up_time(key)
|
# if time_ == "09:30:00":
|
# return True, "9:30涨停的老大,老二可以下单"
|
# return False, "老大非9:30涨停,老二不能下单"
|
|
# 过时 老二,本板块中涨停票数<29 不能买
|
# if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
|
# industry) is not None:
|
# if global_util.industry_hot_num.get(industry) < 29:
|
# return False, "老二,本板块中涨停票数<29不能买"
|
# 可以下单
|
return True, None
|
|
@classmethod
|
def __cancel_buy(cls, code):
|
try:
|
cls.debug(code, "开始执行撤单")
|
trade_manager.start_cancel_buy(code)
|
# 取消买入标识
|
l2_data_manager.TradePointManager.delete_buy_point(code)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
|
l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
|
# 删除大群撤事件的大单
|
l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
|
cls.debug(code, "执行撤单成功")
|
except Exception as e:
|
logging.exception(e)
|
cls.debug(code, "执行撤单异常:{}", str(e))
|
|
@classmethod
|
def cancel_buy(cls, code, msg=None, source="l2"):
|
# 是否是交易队列触发
|
if source == "trade_queue":
|
# 交易队列触发的需要下单后5s
|
buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
|
code)
|
total_datas = local_today_datas[code]
|
if buy_exec_index is not None and buy_exec_index > 0:
|
now_time_str = tool.get_now_time_str()
|
if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
|
return False
|
|
l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
# 取消买入标识
|
l2_data_manager.TradePointManager.delete_buy_point(code)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
|
l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
|
else:
|
can_cancel, reason = cls.__can_cancel(code)
|
if not can_cancel:
|
# 不能取消
|
cls.cancel_debug(code, "撤单中断,原因:{}", reason)
|
cls.debug(code, "撤单中断,原因:{}", reason)
|
return False
|
cls.__cancel_buy(code)
|
|
l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
|
cls.debug(code, "执行撤单成功,原因:{}", msg)
|
return True
|
|
# 虚拟下单
|
@classmethod
|
def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
|
cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
|
SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
|
AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
|
# 删除之前的板上卖信息
|
L2LimitUpSellStatisticUtil.delete(code)
|
|
@classmethod
|
def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
|
new_add=True):
|
if compute_end_index < compute_start_index:
|
return
|
_start_time = round(t.time() * 1000)
|
total_datas = local_today_datas[code]
|
# 获取买入信号计算起始位置
|
buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
|
code)
|
|
# 是否为新获取到的位置
|
if buy_single_index is None:
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
continue_count = 3
|
# 前2次的信号连续笔数为3,后面为2
|
if place_order_count > 2:
|
continue_count = 2
|
# 有买入信号
|
has_single, _index = cls.__compute_order_begin_pos(code, max(
|
compute_start_index - 2 if new_add else compute_start_index, 0), continue_count, compute_end_index)
|
buy_single_index = _index
|
if has_single:
|
num = 0
|
count = 0
|
cls.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index,
|
compute_end_index, total_datas[buy_single_index])
|
# 如果是今天第一次有下单开始信号,需要设置大单起始点
|
cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
|
|
_start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "下单信号计算时间")
|
|
if buy_single_index is None:
|
# 未获取到买入信号,终止程序
|
return None
|
|
# 计算m值大单
|
cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
|
gpcode_manager.get_limit_up_price(code))
|
|
_start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "计算m值大单")
|
|
threshold_money, msg = cls.__get_threshmoney(code)
|
# 买入纯买额统计
|
compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,compute_start_index),compute_end_index,num,count,threshold_money,buy_single_index,max_num_set)
|
_start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "纯买额统计时间")
|
|
cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
|
|
# 买入信号位与计算位置间隔2s及以上了
|
if rebegin_buy_pos is not None:
|
# 需要重新计算纯买额
|
cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
|
return
|
|
if compute_index is not None:
|
cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
|
buy_count,
|
total_datas[compute_index])
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
|
max_num_set_new)
|
# 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
|
limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
|
# 虚拟下单
|
cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
|
# 删除之前的所有撤单信号
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
|
# 涨停封单额计算
|
L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index,
|
buy_exec_index, False)
|
|
_start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据")
|
|
# 数据是否处理完毕
|
if compute_index >= compute_end_index:
|
need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
|
compute_index,
|
buy_single_index, compute_index,
|
True)
|
# 分钟级大单计算
|
# need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
|
# buy_single_index, compute_index, True)
|
cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
|
# 数据已经处理完毕,如果还没撤单就实际下单
|
if need_cancel:
|
if cls.cancel_buy(code, "S级大单撤销"):
|
# 执行撤单成功
|
pass
|
else:
|
cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
|
else:
|
# AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
|
# buy_single_index, compute_index, False)
|
SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
|
buy_single_index, compute_index, False)
|
|
# 数据尚未处理完毕,进行下一步处理
|
cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
|
# 处理撤单步骤
|
cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
|
else:
|
# 未达到下单条件,保存纯买额,设置纯买额
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count,
|
max_num_set_new)
|
print("保存大单时间", round((t.time() - _start_time) * 1000))
|
_start_time = t.time()
|
pass
|
|
# 获取下单起始信号
|
@classmethod
|
def __get_order_begin_pos(cls, code):
|
buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
|
code)
|
return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set
|
|
# 保存下单起始信号
|
@classmethod
|
def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set):
|
TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
|
max_num_set)
|
|
# 计算下单起始信号
|
# compute_data_count 用于计算的l2数据数量
|
@classmethod
|
def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
if end_index - start_index + 1 < continue_count:
|
return False, None
|
__time = None
|
|
last_index = None
|
count = 0
|
start = None
|
|
for i in range(start_index, end_index + 1):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
|
if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]):
|
if start is None:
|
start = i
|
last_index = i
|
count += datas[i]["re"]
|
if count >= continue_count:
|
return True, start
|
else:
|
# 本条数据作为起点
|
last_index = i
|
count = datas[i]["re"]
|
start = i
|
|
elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val):
|
# 剔除卖与卖撤
|
last_index = None
|
count = 0
|
start = None
|
|
return False, None
|
|
@classmethod
|
def __get_threshmoney(cls, code):
|
return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
|
|
# 是否为万手哥
|
@classmethod
|
def __is_big_money(cls, limit_up_price, val):
|
if int(val["num"]) >= constant.BIG_MONEY_NUM:
|
return True
|
if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
|
return True
|
return False
|
|
# 计算万手哥笔数
|
@classmethod
|
def __compute_big_money_count(cls, total_datas, start_index, end_index):
|
count = 0
|
for i in range(start_index, end_index + 1):
|
if L2DataUtil.is_limit_up_price_buy(total_datas[i]["val"]):
|
count += total_datas[i]["re"]
|
elif L2DataUtil.is_limit_up_price_buy_cancel(total_datas[i]["val"]):
|
count -= total_datas[i]["re"]
|
return count
|
|
# 统计买入净买量,不计算在买入信号之前的买撤单
|
@classmethod
|
def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
|
threshold_money, buy_single_index, max_num_set):
|
def get_threshold_count():
|
count = threshold_count - sub_threshold_count
|
if count < 3:
|
count = 3
|
count = round(count * buy1_factor)
|
# 最高30笔,最低8笔
|
if count > 21:
|
count = 21
|
if count < 8:
|
count = 8
|
return count
|
|
_start_time = t.time()
|
total_datas = local_today_datas[code]
|
# 计算从买入信号开始到计算开始位置的大单数量
|
sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1)
|
if sub_threshold_count < 0:
|
sub_threshold_count = 0
|
|
buy_nums = origin_num
|
buy_count = origin_count
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
buy1_price = cls.buy1PriceManager.get_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
# 目标手数
|
threshold_num = threshold_money / (limit_up_price * 100)
|
|
buy1_factor = 1
|
# 获取买1是否为涨停价
|
if buy1_price is None:
|
buy1_factor = 1.3
|
elif limit_up_price is None:
|
buy1_factor = 1.3
|
elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
|
print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
|
buy1_factor = 1.3
|
|
# 目标订单数量
|
threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
|
|
buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
|
|
# 可以触发买,当有涨停买信号时才会触发买
|
trigger_buy = True
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
if place_order_count > 3:
|
place_order_count = 3
|
# 间隔最大时间依次为:3,9,27,81
|
max_space_time = pow(3, place_order_count + 1) - 1
|
# 最大买量
|
max_buy_num = 0
|
max_buy_num_set = set(max_num_set)
|
for i in range(compute_start_index, compute_end_index + 1):
|
data = total_datas[i]
|
_val = total_datas[i]["val"]
|
trigger_buy = False
|
# 必须为连续3秒内的数据
|
if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time:
|
TradePointManager.delete_buy_point(code)
|
if i == compute_end_index:
|
# 数据处理完毕
|
return None, buy_nums, buy_count, None, max_buy_num_set
|
else:
|
# 计算买入信号,不能同一时间开始计算
|
for ii in range(buy_single_index + 1, compute_end_index + 1):
|
if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
|
return None, buy_nums, buy_count, ii, max_buy_num_set
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
if cls.__is_big_money(limit_up_price, _val):
|
sub_threshold_count += int(total_datas[i]["re"])
|
max_buy_num_set.add(i)
|
if round(int(_val["num"]) * float(_val["price"])) >= 5900:
|
trigger_buy = True
|
# 只统计59万以上的金额
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
buy_count += int(total_datas[i]["re"])
|
if buy_nums >= threshold_num and buy_count >= get_threshold_count():
|
logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
|
i,
|
buy_nums,
|
threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
if cls.__is_big_money(limit_up_price, _val):
|
sub_threshold_count -= int(total_datas[i]["re"])
|
if round(int(_val["num"]) * float(_val["price"])) >= 5900:
|
# 只统计59万以上的金额
|
# 涨停买撤
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index >= buy_single_index:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
buy_count -= int(data["re"])
|
cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
|
else:
|
cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
|
if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,当作买入信号之后处理
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
buy_count -= int(data["re"])
|
cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
buy_count -= int(total_datas[i]["re"])
|
cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
|
buy_nums, threshold_num)
|
# 有撤单信号,且小于阈值
|
if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(max_buy_num_set)>1:
|
return i, buy_nums, buy_count, None, max_buy_num_set
|
|
cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
|
compute_start_index,
|
buy_nums,
|
threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
|
|
return None, buy_nums, buy_count, None, max_buy_num_set
|
|
@classmethod
|
def test(cls):
|
code = "002556"
|
l2_trade_test.clear_trade_data(code)
|
load_l2_data(code, True)
|
|
_start = t.time()
|
if True:
|
state = trade_manager.get_trade_state(code)
|
cls.random_key[code] = random.randint(0, 100000)
|
capture_timestamp = 1999988888
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, 1552, 1641, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, 1552, 1641, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
print("处理时间", round((t.time() - _start) * 1000))
|
return
|
|
# 按s批量化数据
|
total_datas = local_today_datas[code]
|
start_time = total_datas[0]["val"]["time"]
|
start_index = 0
|
for i in range(0, len(total_datas)):
|
if total_datas[i]["val"]["time"] != start_time:
|
cls.random_key[code] = random.randint(0, 100000)
|
# 处理数据
|
start = start_index
|
# if start != 201:
|
# continue
|
end = i - 1
|
print("处理进度:{},{}".format(start, end))
|
capture_timestamp = 1999999999
|
state = trade_manager.get_trade_state(code)
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, start, end, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, start, end, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
# t.sleep(1)
|
start_index = i
|
start_time = total_datas[i]["val"]["time"]
|
|
print("时间花费:", round((t.time() - _start) * 1000))
|
|
@classmethod
|
def test1(cls):
|
code = "002556"
|
l2_trade_test.clear_trade_data(code)
|
l2_data_manager.local_latest_datas[code] = []
|
load_l2_data(code, True)
|
_start = t.time()
|
capture_timestamp = 1999999999
|
cls.process(code, l2_data_manager.local_today_datas[code][1552:1641], capture_timestamp)
|
print("时间花费:", round((t.time() - _start) * 1000))
|
pass
|
|
@classmethod
|
def test2(cls):
|
code = "002864"
|
load_l2_data(code)
|
limit_up_time_manager.load_limit_up_time()
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
|
limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second(
|
"14:30:00"):
|
return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
|
|
# 同一板块中老二后面的不能买
|
industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
|
if industry is None:
|
return True, "没有获取到行业"
|
codes_index = industry_codes_sort.sort_codes(codes, code)
|
if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
|
return False, "同一板块中老三,老四,...不能买"
|
|
if cls.__codeActualPriceProcessor.is_under_water(code):
|
# 水下捞且板块中的票小于21不能买
|
if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
|
industry) <= 16:
|
return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
|
|
if codes_index.get(code) != 0:
|
return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
|
|
# 13:30后涨停,本板块中涨停票数<29不能买
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None:
|
if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
|
if global_util.industry_hot_num.get(industry) < 16:
|
return False, "13:30后涨停,本板块中涨停票数<16不能买"
|
|
if codes_index.get(code) is not None and codes_index.get(code) == 1:
|
# ----此条注释-----
|
# 如果老大已经买成功了,老二就不需要买了
|
# first_codes = []
|
# for key in codes_index:
|
# if codes_index.get(key) == 0:
|
# first_codes.append(key)
|
#
|
# for key in first_codes:
|
# state = trade_manager.get_trade_state(key)
|
# if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
|
# # 老大已经买成功了
|
# return False, "老大{}已经买成功,老二无需购买".format(key)
|
# ----此条注释-----
|
|
# ----此条注释-----
|
# 有9点半涨停的老大才能买老二,不然不能买
|
# 获取老大的涨停时间
|
# for key in first_codes:
|
# # 找到了老大
|
# time_ = limit_up_time_manager.get_limit_up_time(key)
|
# if time_ == "09:30:00":
|
# return True, "9:30涨停的老大,老二可以下单"
|
# return False, "老大非9:30涨停,老二不能下单"
|
# ----此条注释-----
|
|
return True, "老二可以下单"
|
|
@classmethod
|
def test3(cls):
|
code = "002094"
|
load_l2_data(code, True)
|
cls.random_key[code] = random.randint(0, 100000)
|
buy_single_begin_index, buy_exec_index = 426, 479
|
|
L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
|
buy_single_begin_index, buy_exec_index, False)
|
L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
|
buy_single_begin_index, buy_exec_index, False)
|
|
@classmethod
|
def test_can_buy(cls):
|
code = "002923"
|
load_l2_data(code, True)
|
limit_up_time_manager.load_limit_up_time()
|
can, msg = cls.__can_buy(code)
|
print(can, msg)
|
|
|
# 涨停封单额统计
|
class L2LimitUpMoneyStatisticUtil:
|
_redisManager = redis_manager.RedisManager(1)
|
_thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
|
|
@classmethod
|
def __get_redis(cls):
|
return cls._redisManager.getRedis()
|
|
# 设置l2的每一秒涨停封单额数据
|
@classmethod
|
def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
|
old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
|
if old_num is None:
|
old_num = num
|
old_from = from_index
|
old_to = to_index
|
else:
|
old_num += num
|
old_to = to_index
|
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
|
cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
|
|
@classmethod
|
def __get_l2_second_money_record(cls, code, time):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
|
val = cls.__get_redis().get(key)
|
return cls.__format_second_money_record_val(val)
|
|
@classmethod
|
def __format_second_money_record_val(cls, val):
|
if val is None:
|
return None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
@classmethod
|
def __get_l2_second_money_record_keys(cls, code, time_regex):
|
key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
|
keys = cls.__get_redis().keys(key)
|
return keys
|
|
# 设置l2最新的封单额数据
|
@classmethod
|
def __set_l2_latest_money_record(cls, code, index, num):
|
key = "l2_limit_up_money-{}".format(code)
|
cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
|
|
# 返回数量,索引
|
@classmethod
|
def __get_l2_latest_money_record(cls, code):
|
key = "l2_limit_up_money-{}".format(code)
|
result = cls.__get_redis().get(key)
|
if result:
|
result = json.loads(result)
|
return result[0], result[1]
|
else:
|
return 0, -1
|
|
# 矫正数据
|
# 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
|
@classmethod
|
def verify_num(cls, code, num, time_str):
|
# 记录买1矫正日志
|
logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
|
time_ = time_str.replace(":", "")
|
key = None
|
for i in range(4, -2, -2):
|
# 获取本(分钟/小时/天)内秒分布数据
|
time_regex = "{}*".format(time_[:i])
|
keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
|
if keys_ and len(keys_) > 1:
|
# 需要排序
|
keys = []
|
for k in keys_:
|
keys.append(k)
|
keys.sort(key=lambda tup: int(tup.split("-")[-1]))
|
# 有2个元素
|
for index in range(0, len(keys) - 1):
|
time_1 = keys[index].split("-")[-1]
|
time_2 = keys[index + 1].split("-")[-1]
|
if int(time_1) <= int(time_) <= int(time_2):
|
# 在此时间范围内
|
if time_ == time_2:
|
key = keys[index + 1]
|
else:
|
key = keys[index]
|
break
|
if key:
|
val = cls.__get_redis().get(key)
|
old_num, old_from, old_to = cls.__format_second_money_record_val(val)
|
end_index = old_to
|
# 保存最近的数据
|
cls.__set_l2_latest_money_record(code, end_index, num)
|
logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
|
break
|
|
# 计算量,用于涨停封单量的计算
|
@classmethod
|
def __compute_num(cls, code, data, buy_single_data):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
|
# 涨停买撤与卖
|
return 0 - int(data["val"]["num"]) * data["re"]
|
else:
|
# 卖撤
|
if L2DataUtil.is_sell_cancel(data["val"]):
|
# 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
|
if l2_data_util.is_sell_index_before_target(data, buy_single_data,
|
local_today_num_operate_map.get(code)):
|
return 0
|
|
return int(data["val"]["num"]) * data["re"]
|
|
@classmethod
|
def clear(cls, code):
|
key = "l2_limit_up_money-{}".format(code)
|
cls.__get_redis().delete(key)
|
|
# 返回取消的标志数据
|
# with_cancel 是否需要判断是否撤销
|
@classmethod
|
def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True):
|
if buy_single_begin_index is None or buy_exec_index is None:
|
return None, None
|
start_time = round(t.time() * 1000)
|
total_datas = local_today_datas[code]
|
time_dict_num = {}
|
# 记录计算的坐标
|
time_dict_num_index = {}
|
# 坐标-量的map
|
num_dict = {}
|
# 统计时间分布
|
time_dict = {}
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict:
|
time_dict[time_] = i
|
|
for i in range(start_index, end_index + 1):
|
data = total_datas[i]
|
val = data["val"]
|
time_ = val["time"]
|
if time_ not in time_dict_num:
|
time_dict_num[time_] = 0
|
time_dict_num_index[time_] = {"s": i, "e": i}
|
time_dict_num_index[time_]["e"] = i
|
num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
|
num_dict[i] = num
|
time_dict_num[time_] = time_dict_num[time_] + num
|
for t_ in time_dict_num:
|
cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
|
time_dict_num_index[t_]["e"])
|
|
print("保存涨停封单额时间:", round(t.time() * 1000) - start_time)
|
|
# 累计最新的金额
|
total_num, index = cls.__get_l2_latest_money_record(code)
|
record_msg = f"同花顺买1信息 {total_num},{index}"
|
|
if index == -1:
|
# 没有获取到最新的矫正封单额,需要从买入信号开始点计算
|
index = buy_single_begin_index - 1
|
total_num = 0
|
|
cancel_index = None
|
cancel_msg = None
|
# 待计算量
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
min_volumn = round(10000000 / (limit_up_price * 100))
|
min_volumn_big = min_volumn * 5
|
# 不同时间的数据开始坐标
|
time_start_index_dict = {}
|
# 数据时间分布
|
time_list = []
|
# 到当前时间累积的买1量
|
time_total_num_dict = {}
|
|
# 大单撤销笔数
|
cancel_big_num_count = 0
|
buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
|
|
# 获取最大封单额
|
max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
|
|
# 从同花顺买1矫正过后的位置开始计算,到end_index结束
|
|
for i in range(index + 1, end_index + 1):
|
data = total_datas[i]
|
# 统计撤销数量
|
try:
|
if big_money_num_manager.is_big_num(data["val"]):
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
cancel_big_num_count += int(data["re"])
|
# TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
|
# 获取是否在买入执行信号周围2s
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and buy_data is not None:
|
# 相差1s
|
buy_time = buy_data["val"]["time"]
|
if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
|
cancel_big_num_count += int(data["re"])
|
|
elif L2DataUtil.is_limit_up_price_buy(data["val"]):
|
cancel_big_num_count -= int(data["re"])
|
except Exception as e:
|
logging.exception(e)
|
|
threshold_rate = 0.5
|
if cancel_big_num_count >= 0:
|
if cancel_big_num_count < 10:
|
threshold_rate = threshold_rate - cancel_big_num_count * 0.01
|
else:
|
threshold_rate = threshold_rate - 10 * 0.01
|
|
time_ = data["val"]["time"]
|
if time_ not in time_start_index_dict:
|
# 记录每一秒的开始位置
|
time_start_index_dict[time_] = i
|
# 记录时间分布
|
time_list.append(time_)
|
# 上一段时间的总数
|
time_total_num_dict[time_] = total_num
|
|
exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
|
|
val = num_dict.get(i)
|
if val is None:
|
val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
|
total_num += val
|
# 在处理数据的范围内,就需要判断是否要撤单了
|
if start_index <= i <= end_index:
|
# 如果是减小项
|
if val < 0:
|
# 当前量小于最大量的24%则需要取消
|
if exec_time_offset >= 30:
|
if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
|
cancel_index = i
|
cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
|
break
|
# 累计封单金额小于1000万
|
if total_num < min_volumn:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "封单金额小于1000万,为{}".format(total_num)
|
break
|
# 相邻2s内的数据减小50%
|
# 上1s的总数
|
last_second_total_volumn = time_total_num_dict.get(time_list[-1])
|
if last_second_total_volumn > 0 and (
|
last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
# 相邻2s内的数据减小50%
|
cancel_index = i
|
cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
|
total_num)
|
break
|
# 记录中有上2个数据
|
if len(time_list) >= 2:
|
# 倒数第2个数据
|
last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
|
if last_2_second_total_volumn > 0:
|
if last_2_second_total_volumn > last_second_total_volumn > total_num:
|
dif = last_2_second_total_volumn - total_num
|
if dif / last_2_second_total_volumn >= threshold_rate:
|
# 与执行位相隔>=5s时规则生效
|
if exec_time_offset >= 5:
|
cancel_index = i
|
cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
|
last_2_second_total_volumn,
|
last_second_total_volumn,
|
total_num)
|
break
|
# ------大单撤处理-------
|
# if total_num < min_volumn_big:
|
if exec_time_offset < 31:
|
pass
|
# try:
|
# b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, i, i)
|
# if b_need_cancel:
|
# cancel_index = b_cancel_index
|
# cancel_msg = "1分钟内大单撤销比例触发阈值"
|
# break
|
# except Exception as e:
|
# logging.exception(e)
|
# 30s外才执行
|
elif 31 <= exec_time_offset:
|
try:
|
b_need_cancel, b_cancel_index = LongAverageBigNumComputer.need_cancel(code, buy_exec_index, i,
|
i)
|
if b_need_cancel:
|
cancel_index = b_cancel_index
|
cancel_msg = "30s后内大单撤销比例触发阈值"
|
break
|
except Exception as e:
|
logging.exception(e)
|
|
# ------大单撤处理结束-------
|
if not with_cancel:
|
cancel_index = None
|
|
print("封单额计算时间:", round(t.time() * 1000) - start_time)
|
process_end_index = end_index
|
if cancel_index:
|
process_end_index = cancel_index
|
# 保存最新累计金额
|
# cls.__set_l2_latest_money_record(code, process_end_index, total_num)
|
l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间",
|
False)
|
if cancel_index:
|
L2TradeDataProcessor.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
|
total_num)
|
return total_datas[cancel_index], cancel_msg
|
return None, None
|
|
|
# 涨停卖统计
|
class L2LimitUpSellStatisticUtil:
|
_redisManager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls._redisManager.getRedis()
|
|
# 新增卖数据
|
@classmethod
|
def __incre_sell_data(cls, code, num):
|
key = "limit_up_sell_num-{}".format(code)
|
cls.__get_redis().incrby(key, num)
|
|
@classmethod
|
def __get_sell_data(cls, code):
|
key = "limit_up_sell_num-{}".format(code)
|
val = cls.__get_redis().get(key)
|
if val is None:
|
return 0
|
return int(val)
|
|
@classmethod
|
def __save_process_index(cls, code, index):
|
key = "limit_up_sell_index-{}".format(code)
|
cls.__get_redis().setex(key, tool.get_expire(), index)
|
|
@classmethod
|
def __get_process_index(cls, code):
|
key = "limit_up_sell_index-{}".format(code)
|
val = cls.__get_redis().get(key)
|
if val is None:
|
return -1
|
return int(val)
|
|
# 清除数据,当取消成功与买入之前需要清除数据
|
@classmethod
|
def delete(cls, code):
|
key = "limit_up_sell_num-{}".format(code)
|
cls.__get_redis().delete(key)
|
key = "limit_up_sell_index-{}".format(code)
|
cls.__get_redis().delete(key)
|
|
@classmethod
|
def clear(cls):
|
keys = cls.__get_redis().keys("limit_up_sell_num-*")
|
for k in keys:
|
cls.__get_redis().delete(k)
|
|
# 处理数据,返回是否需要撤单
|
# 处理范围:买入执行位-当前最新位置
|
@classmethod
|
def process(cls, code, start_index, end_index, buy_exec_index):
|
# 获取涨停卖的阈值
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
|
# 大于自由流通市值的4.8%
|
threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
|
total_num = cls.__get_sell_data(code)
|
cancel_index = None
|
process_index = cls.__get_process_index(code)
|
total_datas = local_today_datas.get(code)
|
for i in range(start_index, end_index + 1):
|
if i < buy_exec_index:
|
continue
|
if i <= process_index:
|
continue
|
if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
|
num = int(total_datas[i]["val"]["num"])
|
cls.__incre_sell_data(code, num)
|
total_num += num
|
if total_num > threshold_num:
|
cancel_index = i
|
break
|
if cancel_index is not None:
|
process_index = cancel_index
|
else:
|
process_index = end_index
|
L2TradeDataProcessor.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
|
threshold_num)
|
|
cls.__save_process_index(code, process_index)
|
if cancel_index is not None:
|
return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
|
return None, ""
|
|
@classmethod
|
def test(cls):
|
code = "003005"
|
load_l2_data(code)
|
L2TradeDataProcessor.random_key[code] = 123123
|
cls.process(code, 126, 171, 126)
|
|
|
# s级平均大单计算
|
# 计算范围到申报时间的那一秒
|
class SecondAverageBigNumComputer:
|
__redis_manager = redis_manager.RedisManager(0)
|
__place_order_time_dict = {}
|
|
@classmethod
|
def __getRedis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
|
key = "s_average_big_num-{}".format(code)
|
cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
|
L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
|
average_up_count,
|
start_index,
|
end_index))
|
|
@classmethod
|
def __get_average_data(cls, code):
|
key = "s_average_big_num-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2], val[3]
|
|
# 保存买撤数据
|
@classmethod
|
def __save_cancel_data(cls, code, cancel_index):
|
key = "s_average_big_num_comput_info-{}".format(code)
|
cls.__getRedis().sadd(key, cancel_index)
|
|
# 获取买撤的数据
|
@classmethod
|
def __get_cancel_datas(cls, code):
|
key = "s_average_big_num_comput_info-{}".format(code)
|
val = cls.__getRedis().smembers(key)
|
return val
|
|
# 保存买撤数据
|
@classmethod
|
def __save_apply_time(cls, code, time_str):
|
key = "s_average_big_num_apply_time-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(), time_str)
|
|
# 获取买撤的数据
|
@classmethod
|
def __get_apply_time(cls, code):
|
key = "s_average_big_num_apply_time-{}".format(code)
|
val = cls.__getRedis().get(key)
|
return val
|
|
# 保存结束位置
|
@classmethod
|
def __save_end_index(cls, code, end_index):
|
key = "s_average_big_num_end_index_set-{}".format(code)
|
cls.__getRedis().sadd(key, end_index)
|
|
@classmethod
|
def __list_end_indexs(cls, code):
|
key = "s_average_big_num_end_index_set-{}".format(code)
|
vals = cls.__getRedis().smembers(key)
|
if vals is None:
|
return None
|
results = []
|
for val in vals:
|
results.append(int(val))
|
results.sort()
|
return results
|
|
@classmethod
|
def __clear_data(cls, code):
|
ks = ["s_average_big_num_comput_info-{}".format(code), "s_average_big_num-{}".format(code),
|
"s_average_big_num_end_index_set-{}".format(code)]
|
for key in ks:
|
cls.__getRedis().delete(key)
|
|
@classmethod
|
def clear_data(cls):
|
ks = ["s_average_big_num_comput_info-*", "s_average_big_num-*", "s_average_big_num_end_index_set-*"]
|
for key in ks:
|
keys = cls.__getRedis().keys(key)
|
for k in keys:
|
cls.__getRedis().delete(k)
|
|
# 计算平均手数
|
# 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
|
@classmethod
|
def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
|
cls.__save_end_index(code, end_index)
|
# 保存结束位置
|
end_indexs = cls.__list_end_indexs(code)
|
print("compute_average_big_num", code, buy_single_index, start_index, end_index)
|
L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
|
total_data = local_today_datas[code]
|
num = 0
|
count = 0
|
apply_time = cls.get_apply_time(code)
|
apply_time_second = int(apply_time.replace(":", ""))
|
for ei in end_indexs:
|
if int(total_data[ei]["val"]["time"].replace(":", "")) >= apply_time_second:
|
end_index = ei
|
break
|
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
# if int(val["time"].replace(":", "")) > apply_time_second:
|
# # 重新设置计算结束位置
|
# end_index = i - 1
|
# break
|
|
if L2DataUtil.is_limit_up_price_buy(val): # and float(val["price"]) * int(val["num"]) > 7500:
|
# 75万以上的才参与计算平均大单
|
count += data["re"]
|
num += int(val["num"])
|
# 如果没有找到75万以上的单就不添加75w的筛选条件
|
if count == 0:
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val):
|
if int(val["time"].replace(":", "")) > apply_time_second:
|
break
|
# 75万以上的才参与计算平均大单
|
count += data["re"]
|
num += int(val["num"])
|
|
average_num = num // count
|
average_num = min(constant.BIG_MONEY_NUM,
|
round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
|
average_up_count = 0
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val):
|
if int(val["time"].replace(":", "")) > apply_time_second:
|
break
|
if int(val["num"]) >= average_num:
|
average_up_count += data["re"]
|
print("平均手数:", average_num, "大单总数:", average_up_count)
|
# 保存数据
|
cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
|
|
# 是否需要撤单
|
@classmethod
|
def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
|
average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
|
L2TradeDataProcessor.cancel_debug(code, "s级是否需要撤单,数据范围:{}-{} 平均大单信息-({},{},{},{})", start_index, end_index,
|
average_num, average_up_count, a_start_index, a_end_index)
|
if average_num is None:
|
return False, None
|
total_data = local_today_datas[code]
|
|
# 只守护30s
|
if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
|
return False, None
|
|
# 如果start_index与buy_single_index相同,即是下单后的第一次计算
|
# 需要查询买入信号之前的同1s是否有涨停撤的数据
|
if buy_single_index == start_index:
|
for i in range(buy_single_index - 1, 0, -1):
|
data = total_data[i]
|
val = data["val"]
|
if val["time"] != total_data[buy_single_index]["val"]["time"]:
|
break
|
if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
|
# 涨停买撤销且撤销的间隔时间为0
|
# 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
# 在买入信号之后
|
cls.__save_cancel_data(code, i)
|
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
# print("处理进度", i)
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
|
# 查询买入位置
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
cls.__save_cancel_data(code, i)
|
else:
|
# 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只判断S级撤销,只有s级撤销才有可能相等
|
if max_space - min_space <= 1:
|
buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
|
if int(total_data[a_start_index]["val"]["time"].replace(":", "")) <= int(
|
buy_time.replace(":", "")) <= int(
|
total_data[a_end_index]["val"]["time"].replace(":", "")):
|
cls.__save_cancel_data(code, i)
|
if need_cancel:
|
# 计算买撤大单暂比
|
cancel_datas = cls.__get_cancel_datas(code)
|
if cancel_datas is not None and len(cancel_datas) > 0:
|
L2TradeDataProcessor.cancel_debug(code, "s级大单 取消数量:{}", len(cancel_datas))
|
cancel_rate_threshold = 0.49
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
if place_order_count <= 1:
|
cancel_rate_threshold = 0.49
|
elif place_order_count <= 2:
|
cancel_rate_threshold = 0.59
|
else:
|
cancel_rate_threshold = 0.69
|
cancel_indexs = []
|
for index in cancel_datas:
|
cancel_indexs.append(int(index))
|
cancel_indexs.sort()
|
# print("取消的数据", cancel_indexs)
|
cancel_count = 0
|
for index in cancel_indexs:
|
data = total_data[index]
|
if int(data["val"]["num"]) >= average_num:
|
cancel_count += data["re"]
|
if cancel_count / average_up_count > cancel_rate_threshold:
|
return True, total_data[index]
|
|
return False, None
|
|
# 是否需要计算
|
@classmethod
|
def is_need_compute_average(cls, code, latest_data):
|
total_datas = local_today_datas[code]
|
data = cls.__place_order_time_dict.get(code)
|
if data is None:
|
return False, None, None
|
elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5:
|
# 有5s时间上传申报时间
|
return True, data[1], data[2]
|
else:
|
cls.__place_order_time_dict.pop(code)
|
return False, None, None
|
|
# 设置申报时间
|
@classmethod
|
def set_apply_time(cls, code, time_str, force=False):
|
old_time_str = cls.get_apply_time(code)
|
if not force:
|
if old_time_str is not None:
|
sub_time = tool.trade_time_sub(time_str, old_time_str)
|
if sub_time <= 0 or sub_time > 4:
|
# 申报时间与下单时间不能操过4s
|
return
|
cls.__save_apply_time(code, time_str)
|
|
@classmethod
|
def get_apply_time(cls, code):
|
return cls.__get_apply_time(code)
|
|
# 下单成功
|
@classmethod
|
def place_order_success(cls, code, buy_single_index, buy_exec_index):
|
cls.__clear_data(code)
|
cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
|
# 以防万一,先保存下单信息
|
total_data = local_today_datas[code]
|
cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True)
|
cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
|
|
@classmethod
|
def __test(cls, datas):
|
code = datas[0]
|
load_l2_data(code)
|
L2TradeDataProcessor.random_key[code] = 123123
|
# 先执行下单
|
buy_single_index = datas[1]
|
buy_exec_index = datas[2]
|
local_today_datas[code] = local_today_datas[code][0:datas[4]]
|
cls.place_order_success(code, buy_single_index, buy_exec_index)
|
# 执行是否需要计算average
|
cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
|
|
cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, buy_single_index, buy_exec_index,
|
False)
|
|
for i in range(buy_exec_index + 1, datas[4]):
|
cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, i, i)
|
if cancel:
|
print("需要撤单", cancel, cancel_data["index"])
|
break
|
|
@classmethod
|
def test(cls):
|
# cls.__test(("000909", 607, 646, 646, 694))
|
# 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置
|
# cls.__test(("002793", 292, 308, 314, 410))
|
cls.__save_end_index("000333", 200)
|
cls.__save_end_index("000333", 101)
|
cls.__save_end_index("000333", 99)
|
cls.__save_end_index("000333", 120)
|
cls.__save_end_index("000333", 126)
|
cls.__save_end_index("000333", 126)
|
print(cls.__list_end_indexs("000333"))
|
|
# 执行是否需要撤销
|
|
|
# 平均大单计算
|
class AverageBigNumComputer:
|
__redis_manager = redis_manager.RedisManager(0)
|
__place_order_time_dict = {}
|
|
@classmethod
|
def __getRedis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
|
key = "average_big_num-{}".format(code)
|
cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
|
L2TradeDataProcessor.cancel_debug(code, "保存短大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
|
average_up_count,
|
start_index,
|
end_index))
|
|
@classmethod
|
def __get_average_data(cls, code):
|
key = "average_big_num-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2], val[3]
|
|
# 保存买撤数据
|
@classmethod
|
def __save_cancel_data(cls, code, cancel_index):
|
key = "average_big_num_comput_info-{}".format(code)
|
cls.__getRedis().sadd(key, cancel_index)
|
|
# 获取买撤的数据
|
@classmethod
|
def __get_cancel_datas(cls, code):
|
key = "average_big_num_comput_info-{}".format(code)
|
val = cls.__getRedis().smembers(key)
|
return val
|
|
@classmethod
|
def __clear_data(cls, code):
|
key = "average_big_num_comput_info-{}".format(code)
|
cls.__getRedis().delete(key)
|
key = "average_big_num-{}".format(code)
|
cls.__getRedis().delete(key)
|
|
@classmethod
|
def clear_data(cls):
|
key = "average_big_num_comput_info-*"
|
keys = cls.__getRedis().keys(key)
|
for k in keys:
|
cls.__getRedis().delete(k)
|
key = "average_big_num-*"
|
keys = cls.__getRedis().keys(key)
|
for k in keys:
|
cls.__getRedis().delete(k)
|
|
# 计算平均手数
|
# 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
|
@classmethod
|
def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
|
print("compute_average_big_num", code, buy_single_index, start_index, end_index)
|
L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
|
total_data = local_today_datas[code]
|
num = 0
|
count = 0
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val) and float(val["price"]) * int(val["num"]) >= 5000:
|
# 75万以上的才参与计算平均大单
|
count += data["re"]
|
num += int(val["num"])
|
# 如果没有找到75万以上的单就不添加75w的筛选条件
|
if count == 0:
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val):
|
# 75万以上的才参与计算平均大单
|
count += data["re"]
|
num += int(val["num"])
|
|
average_num = num // count
|
# average_num = 0
|
average_num = min(constant.BIG_MONEY_NUM,
|
round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
|
average_up_count = 0
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val):
|
if int(val["num"]) >= average_num:
|
average_up_count += data["re"]
|
print("平均手数:", average_num, "大单总数:", average_up_count)
|
# 保存数据
|
cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
|
|
# 是否需要撤单
|
@classmethod
|
def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
|
# 暂时取消此撤单条件
|
return False, None
|
average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
|
if average_num is None:
|
return False, None
|
total_data = local_today_datas[code]
|
# 如果start_index与buy_single_index相同,即是下单后的第一次计算
|
# 需要查询买入信号之前的同1s是否有涨停撤的数据
|
if buy_single_index == start_index:
|
for i in range(buy_single_index - 1, 0, -1):
|
data = total_data[i]
|
val = data["val"]
|
if val["time"] != total_data[buy_single_index]["val"]["time"]:
|
break
|
if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
|
# 涨停买撤销且撤销的间隔时间为0
|
# 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
# 在买入信号之后
|
cls.__save_cancel_data(code, i)
|
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
# print("处理进度", i)
|
if L2DataUtil.is_limit_up_price_buy_cancel(val):
|
|
# 查询买入位置
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
cls.__save_cancel_data(code, i)
|
if need_cancel:
|
# 计算买撤大单暂比
|
cancel_datas = cls.__get_cancel_datas(code)
|
|
if cancel_datas is not None and len(cancel_datas) > 0:
|
cancel_rate_threshold = 0.49
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
|
if place_order_count <= 1:
|
cancel_rate_threshold = 0.49
|
elif place_order_count <= 2:
|
cancel_rate_threshold = 0.549
|
else:
|
cancel_rate_threshold = 0.59
|
cancel_indexs = []
|
for index in cancel_datas:
|
cancel_indexs.append(int(index))
|
cancel_indexs.sort()
|
# print("取消的数据", cancel_indexs)
|
cancel_count = 0
|
for index in cancel_indexs:
|
data = total_data[index]
|
if int(data["val"]["num"]) >= average_num:
|
cancel_count += data["re"]
|
if cancel_count / average_up_count > cancel_rate_threshold:
|
return True, total_data[index]
|
|
return False, None
|
|
# 是否需要计算
|
@classmethod
|
def is_need_compute_average(cls, code, latest_data):
|
total_datas = local_today_datas[code]
|
data = cls.__place_order_time_dict.get(code)
|
if data is None:
|
return False, None, None
|
elif tool.trade_time_sub(latest_data["val"]["time"], total_datas[data[2]]["val"]["time"]) < 3:
|
# 3s内的数据才需要计算average
|
return True, data[1], data[2]
|
else:
|
cls.__place_order_time_dict.pop(code)
|
return False, None, None
|
|
# 下单成功
|
@classmethod
|
def place_order_success(cls, code, buy_single_index, buy_exec_index):
|
cls.__clear_data(code)
|
cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
|
# 以防万一,先保存下单信息
|
total_data = local_today_datas[code]
|
cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
|
|
@classmethod
|
def __test(cls, datas):
|
code = datas[0]
|
load_l2_data(code)
|
L2TradeDataProcessor.random_key[code] = 123123
|
# 先执行下单
|
buy_single_index = datas[1]
|
buy_exec_index = datas[2]
|
local_today_datas[code] = local_today_datas[code][0:datas[4]]
|
cls.place_order_success(code, buy_single_index, buy_exec_index)
|
# 执行是否需要计算average
|
cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
|
for i in range(buy_single_index, datas[4]):
|
cancel, cancel_data = cls.need_cancel(code, i, i)
|
if cancel:
|
print("需要撤单", cancel, cancel_data["index"])
|
break
|
|
@classmethod
|
def test(cls):
|
cls.__test(("000716", 410, 420, 461, 536))
|
# 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置
|
# cls.__test(("002793", 292, 308, 314, 410))
|
|
# 执行是否需要撤销
|
|
|
# 平均大单计算
|
class LongAverageBigNumComputer:
|
__redis_manager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __getRedis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __save_average_data(cls, code, average_num, average_up_count, total_count, start_index, end_index):
|
L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 样本数量-{} 计算开始范围-{}:{}".format(average_num,
|
average_up_count,
|
total_count,
|
start_index,
|
end_index))
|
key = "l_average_big_num-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(),
|
json.dumps((average_num, average_up_count, total_count, start_index, end_index)))
|
|
@classmethod
|
def __get_average_data(cls, code):
|
key = "l_average_big_num-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, None, None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2], val[3], val[4]
|
|
@classmethod
|
def __save_compute_info(cls, code, cancel_count, process_index):
|
key = "l_average_big_num_comput_info-{}".format(code)
|
cls.__getRedis().setex(key, tool.get_expire(), json.dumps((cancel_count, process_index)))
|
|
@classmethod
|
def __get_compute_info(cls, code):
|
key = "l_average_big_num_comput_info-{}".format(code)
|
val = cls.__getRedis().get(key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], val[1]
|
|
@classmethod
|
def __clear_data(cls, code):
|
key = "l_average_big_num_comput_info-{}".format(code)
|
cls.__getRedis().delete(key)
|
key = "l_average_big_num-{}".format(code)
|
cls.__getRedis().delete(key)
|
|
# 计算平均手数
|
# 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
|
@classmethod
|
def compute_average_big_num(cls, code, buy_single_index, buy_exec_index):
|
total_data = local_today_datas[code]
|
latest_index = total_data[-1]["index"]
|
end_index = total_data[-1]["index"]
|
if end_index >= 434:
|
print("测试")
|
|
start_index = buy_exec_index
|
if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) < 3:
|
return
|
exec_time = total_data[buy_exec_index]["val"]["time"]
|
o_average_num, o_average_up_count, o_count, o_start_index, o_start_index = cls.__get_average_data(code)
|
if o_average_num is not None and o_count >= constant.H_CANCEL_BUY_COUNT:
|
return
|
# 获取买入执行位后2s的数据末位
|
for i in range(end_index, buy_exec_index, - 1):
|
time_ = total_data[i]["val"]["time"]
|
if tool.trade_time_sub(time_, exec_time) <= 2:
|
end_index = i
|
break
|
num = 0
|
count = 0
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900:
|
count += data["re"]
|
num += int(val["num"]) * data["re"]
|
# 如果小于30笔,需要再往后计算
|
if count < constant.H_CANCEL_BUY_COUNT:
|
for i in range(end_index + 1, latest_index + 1, 1):
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(val) and int(val["num"]) * float(val["price"]) >= 5900:
|
count += data["re"]
|
num += int(val["num"]) * data["re"]
|
if count >= constant.H_CANCEL_BUY_COUNT:
|
end_index = i
|
break
|
# 获取大单数量
|
average_up_count = 0
|
average_num = round(num / count)
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if int(val["num"]) >= average_num:
|
average_up_count += data["re"]
|
|
# 保存数据
|
cls.__save_average_data(code, average_num, average_up_count, count, start_index, end_index)
|
cls.__save_compute_info(code, 0, buy_exec_index)
|
|
# 是否需要撤单
|
@classmethod
|
def need_cancel(cls, code, buy_exec_index, start_index, end_index):
|
average_num, average_up_count, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
|
if average_num is None:
|
return False, None
|
cancel_count, process_index = cls.__get_compute_info(code)
|
total_data = local_today_datas[code]
|
# 14:30过后不再守护
|
if int(total_data[end_index]["val"]["time"].replace(":", "")) > int("143000"):
|
return False, None
|
try:
|
for i in range(min(start_index, buy_exec_index + 1), end_index + 1):
|
if i <= buy_exec_index:
|
continue
|
if process_index >= i:
|
continue
|
data = total_data[i]
|
val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
|
# 查询买入位置
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(
|
code))
|
if buy_index is not None and a_start_index <= buy_index <= a_end_index:
|
# 买入位置要在平均值计算范围内
|
cancel_count += data["re"]
|
process_index = i
|
sj = 0 # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
|
print("h平均大单计算结果:", "取消数量", cancel_count, "大单总数", average_up_count, sj)
|
if cancel_count / (average_up_count - sj) >= 0.75:
|
return True, i
|
finally:
|
cls.__save_compute_info(code, cancel_count, process_index)
|
return False, None
|
|
# 下单成功
|
@classmethod
|
def place_order_success(cls, code, buy_single_index, buy_exec_index):
|
cls.__clear_data(code)
|
|
@classmethod
|
def __test(cls, datas):
|
code = datas[0]
|
load_l2_data(code)
|
L2TradeDataProcessor.random_key[code] = random.randint(0, 100000)
|
# 先执行下单
|
buy_single_index = datas[1]
|
buy_exec_index = datas[2]
|
cls.__clear_data(code)
|
cls.place_order_success(code, buy_single_index, buy_exec_index)
|
|
# 执行是否需要计算average
|
|
cls.compute_average_big_num(code, buy_single_index, buy_exec_index)
|
for i in range(buy_exec_index + 1, datas[4]):
|
cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
|
if cancel:
|
print("需要撤单", cancel, index)
|
break
|
|
@classmethod
|
def test(cls):
|
# 代码 买入信号起始点 买入信息执行位置 计算末位 最远计算位置
|
cls.__test(("002793", 292, 308, 332, 410))
|
|
# 执行是否需要撤销
|
|
|
if __name__ == "__main__":
|
# trade_manager.start_cancel_buy("000637")
|
# t.sleep(10)
|
# AverageBigNumComputer.test()
|
# LongAverageBigNumComputer.test()
|
# L2TradeDataProcessor.test()
|
SecondAverageBigNumComputer.test()
|
# load_l2_data("600213")
|
#
|
# buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
|
# local_today_num_operate_map.get(
|
# "600213"))
|
# print(buy_index, buy_data)
|