"""
|
L2的数据处理
|
"""
|
import decimal
|
import json
|
import logging
|
import random
|
import time as t
|
from datetime import datetime
|
|
import big_money_num_manager
|
import code_data_util
|
import constant
|
import data_process
|
import global_data_loader
|
import global_util
|
import l2_data_log
|
import l2_data_util
|
|
import gpcode_manager
|
import l2_trade_factor
|
|
import redis_manager
|
import ths_industry_util
|
import tool
|
import trade_manager
|
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
|
import trade_data_manager
|
import limit_up_time_manager
|
|
_redisManager = redis_manager.RedisManager(1)
|
# l2数据管理
|
# 本地最新一次上传的数据
|
local_latest_datas = {}
|
# 本地今日数据
|
local_today_datas = {}
|
# 本地手数+操作那类型组成的临时变量
|
# 用于加快数据处理,用空换时间
|
local_today_num_operate_map = {}
|
|
|
class L2DataException(Exception):
|
# 价格不匹配
|
CODE_PRICE_ERROR = 1
|
# 无收盘价
|
CODE_NO_CLOSE_PRICE = 2
|
|
def __init__(self, code, msg):
|
super().__init__(self)
|
self.code = code
|
self.msg = msg
|
|
def __str__(self):
|
return self.msg
|
|
def get_code(self):
|
return self.code
|
|
|
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
|
class TradePointManager:
|
@staticmethod
|
def __get_redis():
|
return _redisManager.getRedis()
|
|
# 删除买入点数据
|
@staticmethod
|
def delete_buy_point(code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_compute_index_info-{}".format(code))
|
|
# 获取买入点信息
|
# 返回数据为:买入点 累计纯买额 已经计算的数据索引
|
@staticmethod
|
def get_buy_compute_start_data(code):
|
redis = TradePointManager.__get_redis()
|
_key = "buy_compute_index_info-{}".format(code)
|
_data_json = redis.get(_key)
|
if _data_json is None:
|
return None, None, None, 0, 0
|
_data = json.loads(_data_json)
|
return _data[0], _data[1], _data[2], _data[3], _data[4]
|
|
# 设置买入点的值
|
# buy_single_index 买入信号位
|
# buy_exec_index 买入执行位
|
# compute_index 计算位置
|
# nums 累计纯买额
|
@staticmethod
|
def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
_key = "buy_compute_index_info-{}".format(code)
|
if buy_single_index is not None:
|
redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
|
else:
|
_buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
|
code)
|
redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
|
|
# 获取撤买入开始计算的信息
|
# 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
|
@staticmethod
|
def get_buy_cancel_single_pos(code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("buy_cancel_single_pos-{}".format(code))
|
if info is None:
|
return None
|
else:
|
return int(info)
|
|
# 设置买撤点信息
|
# buy_num 纯买额 computed_index计算到的下标 index撤买信号起点
|
|
@classmethod
|
def set_buy_cancel_single_pos(cls, code, index):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
|
|
# 删除买撤点数据
|
@classmethod
|
def delete_buy_cancel_point(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_cancel_single_pos-{}".format(code))
|
|
# 设置买撤纯买额
|
@classmethod
|
def set_compute_info_for_cancel_buy(cls, code, index, nums):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
|
logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
|
|
# 获取买撤纯买额计算信息
|
@classmethod
|
def get_compute_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("compute_info_for_cancel_buy-{}".format(code))
|
if info is None:
|
return None, 0
|
else:
|
info = json.loads(info)
|
return info[0], info[1]
|
|
@classmethod
|
def delete_compute_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("compute_info_for_cancel_buy-{}".format(code))
|
|
# 从买入信号开始设置涨停买与涨停撤的单数
|
@classmethod
|
def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
|
logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
|
|
# 获取买撤纯买额计算信息
|
@classmethod
|
def get_count_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("count_info_for_cancel_buy-{}".format(code))
|
if info is None:
|
return None, 0, 0
|
else:
|
info = json.loads(info)
|
return info[0], info[1], info[2]
|
|
@classmethod
|
def delete_count_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("count_info_for_cancel_buy-{}".format(code))
|
|
|
def load_l2_data(code, force=False):
|
redis = _redisManager.getRedis()
|
# 加载最近的l2数据
|
if local_latest_datas.get(code) is None or force:
|
# 获取最近的数据
|
_data = redis.get("l2-data-latest-{}".format(code))
|
if _data is not None:
|
if code in local_latest_datas:
|
local_latest_datas[code] = json.loads(_data)
|
else:
|
local_latest_datas.setdefault(code, json.loads(_data))
|
# 获取今日的数据
|
|
if local_today_datas.get(code) is None or force:
|
datas = []
|
keys = redis.keys("l2-{}-*".format(code))
|
for k in keys:
|
value = redis.get(k)
|
_data = l2_data_util.l2_data_key_2_obj(k, value)
|
datas.append(_data)
|
# 排序
|
new_datas = sorted(datas,
|
key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
|
local_today_datas[code] = new_datas
|
# 根据今日数据加载
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
|
|
|
@tool.async_call
|
def saveL2Data(code, datas, msg=""):
|
start_time = round(t.time() * 1000)
|
# 查询票是否在待监听的票里面
|
if not gpcode_manager.is_in_gp_pool(code):
|
return None
|
# 验证股价的正确性
|
redis_instance = _redisManager.getRedis()
|
|
try:
|
if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
|
|
# 计算保留的时间
|
expire = tool.get_expire()
|
i = 0
|
for _data in datas:
|
i += 1
|
key = "l2-" + _data["key"]
|
value = redis_instance.get(key)
|
if value is None:
|
# 新增
|
try:
|
value = {"index": _data["index"], "re": _data["re"]}
|
redis_instance.setex(key, expire, json.dumps(value))
|
except:
|
logging.error("更正L2数据出错:{} key:{}".format(code, key))
|
else:
|
json_value = json.loads(value)
|
if json_value["re"] != _data["re"]:
|
json_value["re"] = _data["re"]
|
redis_instance.setex(key, expire, json.dumps(json_value))
|
finally:
|
redis_instance.delete("l2-save-{}".format(code))
|
|
print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
|
return datas
|
|
|
def parseL2Data(str):
|
day = datetime.now().strftime("%Y%m%d")
|
dict = json.loads(str)
|
data = dict["data"]
|
client = dict["client"]
|
code = data["code"]
|
channel = data["channel"]
|
capture_time = data["captureTime"]
|
process_time = data["processTime"]
|
data = data["data"]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
|
# 获取涨停价
|
return day, client, channel, code, capture_time, process_time, datas, data
|
|
|
# 保存l2数据
|
def save_l2_data(code, datas, add_datas):
|
redis = _redisManager.getRedis()
|
# 只有有新曾数据才需要保存
|
if len(add_datas) > 0:
|
# 保存最近的数据
|
__start_time = round(t.time() * 1000)
|
redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
|
l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
|
# 设置进内存
|
local_latest_datas[code] = datas
|
__set_l2_data_latest_count(code, len(datas))
|
saveL2Data(code, add_datas)
|
|
|
# 清除l2数据
|
def clear_l2_data(code):
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
keys = redis_l2.keys("l2-{}-*".format(code))
|
for k in keys:
|
redis_l2.delete(k)
|
|
redis_l2.delete("l2-data-latest-{}".format(code))
|
|
|
class L2DataUtil:
|
@classmethod
|
def is_same_time(cls, time1, time2):
|
if constant.TEST:
|
return True
|
time1_s = time1.split(":")
|
time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
|
time2_s = time2.split(":")
|
time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
|
if abs(time2_second - time1_second) < 3:
|
return True
|
else:
|
return False
|
|
# 获取增量数据
|
@classmethod
|
def get_add_data(cls, code, datas, _start_index):
|
if datas is not None and len(datas) < 1:
|
return []
|
last_data = None
|
latest_datas_ = local_latest_datas.get(code)
|
if latest_datas_ is not None and len(latest_datas_) > 0:
|
last_data = latest_datas_[-1]
|
|
count = 0
|
start_index = -1
|
# 如果原来没有数据
|
# 设置add_data的序号
|
for n in reversed(datas):
|
count += 1
|
if n["key"] == (last_data["key"] if last_data is not None else ""):
|
start_index = len(datas) - count
|
break
|
|
_add_datas = []
|
if last_data is not None:
|
if start_index < 0:
|
if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
|
last_data["val"]["time"]):
|
_add_datas = datas
|
else:
|
_add_datas = []
|
elif start_index + 1 >= len(datas):
|
_add_datas = []
|
else:
|
_add_datas = datas[start_index + 1:]
|
else:
|
_add_datas = datas[start_index + 1:]
|
for i in range(0, len(_add_datas)):
|
_add_datas[i]["index"] = _start_index + i
|
|
return _add_datas
|
|
# 纠正数据,将re字段替换为较大值
|
@classmethod
|
def correct_data(cls, code, _datas):
|
latest_data = local_latest_datas.get(code)
|
if latest_data is None:
|
latest_data = []
|
save_list = []
|
for data in _datas:
|
for _ldata in latest_data:
|
# 新数据条数比旧数据多才保存
|
if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
|
max_re = max(_ldata["re"], data["re"])
|
_ldata["re"] = max_re
|
data["re"] = max_re
|
# 保存到数据库,更新re的数据
|
save_list.append(_ldata)
|
if len(save_list) > 0:
|
saveL2Data(code, save_list, "保存纠正数据")
|
local_latest_datas[code] = latest_data
|
return _datas
|
|
# 处理l2数据
|
@classmethod
|
def format_l2_data(cls, data, code, limit_up_price):
|
datas = []
|
dataIndexs = {}
|
same_time_num = {}
|
for item in data:
|
# 解析数据
|
time = item["time"]
|
if time in same_time_num:
|
same_time_num[time] = same_time_num[time] + 1
|
else:
|
same_time_num[time] = 1
|
|
price = float(item["price"])
|
num = item["num"]
|
limitPrice = item["limitPrice"]
|
# 涨停价
|
if limit_up_price is not None:
|
if limit_up_price == tool.to_price(decimal.Decimal(price)):
|
limitPrice = 1
|
else:
|
limitPrice = 0
|
item["limitPrice"] = "{}".format(limitPrice)
|
operateType = item["operateType"]
|
# 不需要非涨停买与买撤
|
if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
|
continue
|
|
cancelTime = item["cancelTime"]
|
cancelTimeUnit = item["cancelTimeUnit"]
|
key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
|
cancelTimeUnit)
|
if key in dataIndexs:
|
# 数据重复次数+1
|
datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
|
else:
|
# 数据重复次数默认为1
|
datas.append({"key": key, "val": item, "re": 1})
|
dataIndexs.setdefault(key, len(datas) - 1)
|
# TODO 测试的时候开启,方便记录大单数据
|
# l2_data_util.save_big_data(code, same_time_num, data)
|
return datas
|
|
@classmethod
|
def get_time_as_second(cls, time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# @classmethod
|
# def get_time_as_str(cls, time_seconds):
|
# ts = time_str.split(":")
|
# return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# 是否是涨停价买
|
@classmethod
|
def is_limit_up_price_buy(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否为涨停卖
|
@classmethod
|
def is_limit_up_price_sell(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 2:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否涨停买撤
|
@classmethod
|
def is_limit_up_price_buy_cancel(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否卖撤
|
@classmethod
|
def is_sell_cancel(cls, val):
|
if int(val["operateType"]) == 3:
|
return True
|
return False
|
|
# 是否为卖
|
@classmethod
|
def is_sell(cls, val):
|
if int(val["operateType"]) == 2:
|
return True
|
return False
|
|
|
# L2交易数据处理器
|
# 一些常见的概念:
|
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
|
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
|
# 计算位置(当前计算的整个计算的位置):compute_index
|
#
|
|
class L2TradeDataProcessor:
|
unreal_buy_dict = {}
|
random_key = {}
|
|
@classmethod
|
def debug(cls, code, content, *args):
|
logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def cancel_debug(cls, code, content, *args):
|
logger_l2_trade_cancel.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def buy_debug(cls, code, content, *args):
|
logger_l2_trade_buy.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
# 数据处理入口
|
# datas: 本次截图数据
|
# capture_timestamp:截图时间戳
|
def process(cls, code, datas, capture_timestamp):
|
cls.random_key[code] = random.randint(0, 100000)
|
now_time_str = datetime.now().strftime("%H:%M:%S")
|
__start_time = round(t.time() * 1000)
|
try:
|
if len(datas) > 0:
|
|
# 判断价格区间是否正确
|
if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
|
raise L2DataException(L2DataException.CODE_PRICE_ERROR,
|
"股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
|
# 加载历史数据
|
load_l2_data(code)
|
# 纠正数据
|
datas = L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
|
_start_index = local_today_datas[code][-1]["index"] + 1
|
add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
|
if len(add_datas) > 0:
|
# 拼接数据
|
local_today_datas[code].extend(add_datas)
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
|
total_datas = local_today_datas[code]
|
# 过时 买入确认点处理
|
# TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
|
# total_datas[-1],
|
# add_datas)
|
if len(add_datas) > 0:
|
_start_time = round(t.time() * 1000)
|
latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
|
# 时间差不能太大才能处理
|
# TODO 暂时关闭处理
|
# if L2DataUtil.is_same_time(now_time_str, latest_time):
|
# # 判断是否已经挂单
|
# state = trade_manager.get_trade_state(code)
|
# start_index = len(total_datas) - len(add_datas)
|
# end_index = len(total_datas) - 1
|
# if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# # 已挂单
|
# cls.__process_order(code, start_index, end_index, capture_timestamp)
|
# else:
|
# # 未挂单
|
# cls.__process_not_order(code, start_index, end_index, capture_timestamp)
|
logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
|
add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
|
# 保存数据
|
save_l2_data(code, datas, add_datas)
|
finally:
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
|
@classmethod
|
def __compute_big_money_data(cls, code, start_index, end_index):
|
# 计算大单
|
total_datas = local_today_datas[code]
|
num = 0
|
for index in range(start_index, end_index + 1):
|
data = total_datas[index]
|
if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
|
if int(data["val"]["operateType"]) == 0:
|
num += data["re"]
|
elif int(data["val"]["operateType"]) == 1:
|
num -= data["re"]
|
big_money_num_manager.add_num(code, num)
|
|
# 处理未挂单
|
@classmethod
|
def __process_not_order(cls, code, start_index, end_index, capture_time):
|
# 获取阈值
|
threshold_money = cls.__get_threshmoney(code)
|
cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
|
|
@classmethod
|
def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
|
index, old_buy_count, old_cancel_count = TradePointManager.get_count_info_for_cancel_buy(code)
|
for i in range(start_index, end_index + 1):
|
buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
|
old_buy_count += buy_count
|
|
old_cancel_count += buy_cancel_count
|
if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
|
return i, True
|
TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
|
old_cancel_count)
|
return end_index, False
|
|
# 处理已挂单
|
@classmethod
|
def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
|
if start_index < 0:
|
start_index = 0
|
|
if end_index < start_index:
|
return
|
# 获取之前是否有记录的撤买信号
|
# cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
|
|
# cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
|
# if cancel_computed_index is None:
|
# logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
|
|
# 统计群撤大单
|
L2BetchCancelBigNumProcessor.process_new(code, start_index, end_index)
|
|
# 统计最大连续买单
|
L2ContinueLimitUpCountManager.process(code, start_index, end_index)
|
|
# 计算大单撤销
|
need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, start_index, end_index)
|
if need_cancel:
|
# 已经撤单了
|
threshold_money = cls.__get_threshmoney(code)
|
# 重新处理下单
|
cls.__start_compute_buy(code, cancel_data["index"] + 1, end_index, threshold_money, capture_time)
|
return
|
|
# buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
|
# if cancel_index is None:
|
# 无撤单信号起始点记录
|
|
continue_cancel = L2ContinueLimitUpCountManager.get_continue_count(code)
|
order_cancel_begin_start = max(start_index - (continue_cancel - 1),
|
0) if new_add else start_index
|
order_cancel_begin_end = end_index
|
total_datas = local_today_datas[code]
|
|
little_cancel = False
|
# 大单撤单的数据不为空
|
if cancel_data is not None:
|
# 小群撤事件
|
continue_cancel = 5
|
cancel_time_seconds = L2DataUtil.get_time_as_second(cancel_data["val"]["time"])
|
# 查找上一秒与下一秒
|
for i in range(int(cancel_data["index"]), 0, -1):
|
# 查找上一秒和下一秒
|
if total_datas[i]["val"]["time"] != cancel_data["val"][
|
"time"] and cancel_time_seconds - L2DataUtil.get_time_as_second(total_datas[i]["val"]["time"]) > 1:
|
order_cancel_begin_start = i + 1
|
break
|
|
for i in range(int(cancel_data["index"]), len(local_today_datas[code])):
|
# 查找上一秒和下一秒
|
if total_datas[i]["val"]["time"] != cancel_data["val"]["time"] and L2DataUtil.get_time_as_second(
|
total_datas[i]["val"]["time"]) - cancel_time_seconds > 1:
|
order_cancel_begin_end = i - 1
|
break
|
cls.cancel_debug(code, "小群撤事件计算范围:{},{}", order_cancel_begin_start, order_cancel_begin_end)
|
little_cancel = True
|
cancel_start_index = None
|
cancel_end_index = None
|
need_cancel = False
|
if little_cancel:
|
# 小群撤事件
|
cancel_start_index, cancel_end_index = cls.__compute_order_cancel_little_begin_single(code,
|
order_cancel_begin_start
|
, continue_cancel,
|
order_cancel_begin_end)
|
if cancel_start_index is not None:
|
cls.debug(code, "找到小群撤信号,撤单信号范围:{}-{}", cancel_start_index, cancel_end_index)
|
# 有小群撤信号
|
need_cancel = True
|
else:
|
# 不满足小群撤,从小群撤后面一条数据继续处理
|
cls.__process_order(code, cancel_data["index"] + 1, end_index, capture_time, False)
|
return
|
|
else:
|
# 大群撤事件
|
cancel_start_index, cancel_end_index = cls.__compute_order_cancel_begin_single(
|
code, order_cancel_begin_start
|
, continue_cancel, order_cancel_begin_end)
|
if cancel_start_index is not None:
|
cls.debug(code, "找到大群撤信号,连续笔数阈值:{}, 撤单信号范围:{}-{}", continue_cancel, cancel_start_index,
|
cancel_end_index)
|
# 判断是否有大群撤大单撤
|
need_cancel = L2BetchCancelBigNumProcessor.need_cancel(code, cancel_start_index, cancel_end_index)
|
if need_cancel:
|
cls.debug(code, "大群撤信号有大单撤销")
|
else:
|
cls.debug(code, "大群撤信号无大单撤销")
|
|
if need_cancel:
|
# 需要撤买
|
cls.cancel_buy(code)
|
if cancel_end_index >= end_index:
|
return
|
# 继续处理下单信号
|
threshold_money = cls.__get_threshmoney(code)
|
cls.__start_compute_buy(code, cancel_end_index + 1, end_index, threshold_money, capture_time, False)
|
|
else:
|
# 是否有虚拟下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
|
unreal_buy_info[0])
|
|
# 过时 开始计算撤的信号
|
@classmethod
|
def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
|
# sure_type 0-虚拟挂买位 1-真实挂买位
|
cancel_single = cancel_index is not None
|
computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
|
origin_num, threshold_money,
|
cancel_single)
|
|
total_datas = local_today_datas[code]
|
if computed_index is not None:
|
cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
|
total_datas[computed_index])
|
# 发出撤买信号,需要撤买
|
if cls.unreal_buy_dict.get(code) is not None:
|
# 有虚拟下单
|
cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
|
# 删除虚拟下单标记
|
cls.unreal_buy_dict.pop(code)
|
# 删除下单标记位置
|
TradePointManager.delete_buy_point(code)
|
else:
|
# 无虚拟下单,需要执行撤单
|
cls.debug(code, "之前无虚拟下单,执行真实撤单")
|
cls.__cancel_buy(code)
|
|
if computed_index < len(local_today_datas[code]) - 1:
|
# 数据尚未处理完,重新进入下单计算流程
|
cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time, False)
|
pass
|
else:
|
cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"],
|
buy_num_for_cancel)
|
# 无需撤买,设置计算信息
|
TradePointManager.set_compute_info_for_cancel_buy(code, int(total_datas[-1]["index"]), buy_num_for_cancel)
|
# 判断是否有虚拟下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
|
cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
|
unreal_buy_info[0])
|
pass
|
else:
|
# 终止执行
|
pass
|
|
@classmethod
|
def __buy(cls, code, capture_timestamp, last_data, last_data_index):
|
can, reason = cls.__can_buy(code)
|
# 不能购买
|
if not can:
|
cls.debug(code, "不可以下单,原因:{}", reason)
|
return
|
else:
|
cls.debug(code, "可以下单,原因:{}", reason)
|
|
# 删除虚拟下单
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
cls.debug(code, "开始执行买入")
|
try:
|
trade_manager.start_buy(code, capture_timestamp, last_data,
|
last_data_index)
|
TradePointManager.delete_buy_cancel_point(code)
|
cls.debug(code, "执行买入成功")
|
except Exception as e:
|
cls.debug(code, "执行买入异常:{}", str(e))
|
pass
|
finally:
|
cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
|
|
# 是否可以买
|
@classmethod
|
def __can_buy(cls, code):
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second(
|
"14:30:00"):
|
return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
|
|
# 同一板块中老二后面的不能买
|
industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
|
if industry is None:
|
return True, "没有获取到行业"
|
codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
|
if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
|
return False, "同一板块中老三,老四,...不能买"
|
|
# 13:00后涨停,本板块中涨停票数<29不能买
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None:
|
if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
|
if global_util.industry_hot_num.get(industry) < 29:
|
return False, "13:00后涨停,本板块中涨停票数<29不能买"
|
# 老二,本板块中涨停票数<29 不能买
|
if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
|
industry) is not None:
|
if global_util.industry_hot_num.get(industry) < 29:
|
return False, "老二,本板块中涨停票数<29不能买"
|
# 可以下单
|
return True, None
|
|
@classmethod
|
def __cancel_buy(cls, code):
|
try:
|
cls.debug(code, "开始执行撤单")
|
trade_manager.start_cancel_buy(code)
|
# 取消买入标识
|
TradePointManager.delete_buy_point(code)
|
TradePointManager.delete_buy_cancel_point(code)
|
TradePointManager.delete_compute_info_for_cancel_buy(code)
|
TradePointManager.delete_count_info_for_cancel_buy(code)
|
# 删除大群撤事件的大单
|
L2BetchCancelBigNumProcessor.del_recod(code)
|
cls.debug(code, "执行撤单成功")
|
except Exception as e:
|
cls.debug(code, "执行撤单异常:{}", str(e))
|
|
@classmethod
|
def cancel_buy(cls, code):
|
# 删除大群撤事件的大单
|
L2BetchCancelBigNumProcessor.del_recod(code)
|
L2ContinueLimitUpCountManager.del_data(code)
|
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
# 取消买入标识
|
TradePointManager.delete_buy_point(code)
|
TradePointManager.delete_buy_cancel_point(code)
|
TradePointManager.delete_compute_info_for_cancel_buy(code)
|
TradePointManager.delete_count_info_for_cancel_buy(code)
|
# 删除大群撤事件的大单
|
L2BetchCancelBigNumProcessor.del_recod(code)
|
else:
|
cls.__cancel_buy(code)
|
|
L2BigNumProcessor.del_big_num_pos(code)
|
|
@classmethod
|
def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
|
new_add=True):
|
if compute_end_index < compute_start_index:
|
return
|
|
total_datas = local_today_datas[code]
|
# 获取买入信号计算起始位置
|
buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
|
# 是否为新获取到的位置
|
new_get_pos = False
|
if buy_single_index is None:
|
# 有买入信号
|
has_single, _index = cls.__compute_order_begin_pos(code, max(
|
compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
|
buy_single_index = _index
|
if has_single:
|
num = 0
|
new_get_pos = True
|
cls.debug(code, "获取到买入信号起始点:{} 数据:{}", buy_single_index, total_datas[buy_single_index])
|
limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
|
# 重置大单计算
|
big_money_num_manager.reset(code)
|
if buy_single_index is None:
|
# 未获取到买入信号,终止程序
|
return None
|
|
# TODO 可能存在问题 计算大单数量
|
cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index)
|
# 买入纯买额统计
|
compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
|
compute_start_index),
|
compute_end_index, num,
|
threshold_money, buy_single_index,
|
capture_time)
|
if rebegin_buy_pos is not None:
|
# 需要重新计算纯买额
|
cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
|
return
|
|
if compute_index is not None:
|
cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums,
|
total_datas[compute_index])
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
|
# 虚拟下单
|
cls.unreal_buy_dict[code] = (compute_index, capture_time)
|
# 删除之前的所有撤单信号
|
TradePointManager.delete_buy_cancel_point(code)
|
TradePointManager.delete_compute_info_for_cancel_buy(code)
|
TradePointManager.delete_count_info_for_cancel_buy(code)
|
trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
|
# 已过时 为买撤保存基础纯买额
|
# TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
|
b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
|
buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, buy_single_index, compute_index)
|
TradePointManager.set_count_info_for_cancel_buy(code, compute_index, b_buy_count + buy_count,
|
b_buy_cancel_count + buy_cancel_count)
|
# 计算大单(从买入信号起始点到挂单执行点),返回是否取消
|
cancel_result, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, buy_single_index,
|
compute_index)
|
# 计算大群撤的大单
|
L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
|
# 连续涨停数计算
|
L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
|
|
# 数据是否处理完毕
|
if compute_index >= compute_end_index:
|
cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
|
# 数据已经处理完毕,如果还没撤单就实际下单
|
if not cancel_result:
|
cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
|
else:
|
# 数据尚未处理完毕,进行下一步处理
|
cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
|
# 如果还没撤单,就继续处理已下单的步骤
|
if not cancel_result:
|
cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
|
else:
|
cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time,
|
False)
|
else:
|
# 未达到下单条件,保存纯买额,设置纯买额
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums)
|
pass
|
|
# 获取下单起始信号
|
@classmethod
|
def __get_order_begin_pos(cls, code):
|
buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
|
return buy_single_index, buy_exec_index, compute_index, num
|
|
@classmethod
|
def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
|
TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
|
|
# 计算下单起始信号
|
# compute_data_count 用于计算的l2数据数量
|
@classmethod
|
def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
if end_index - start_index + 1 < continue_count:
|
return False, None
|
__time = None
|
|
last_index = None
|
count = 0
|
start = None
|
|
for i in range(start_index, end_index + 1):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (
|
i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
|
if start is None:
|
start = i
|
last_index = i
|
count += datas[i]["re"]
|
if count >= continue_count:
|
return True, start
|
elif not L2DataUtil.is_limit_up_price_sell(_val):
|
last_index = None
|
count = 0
|
start = None
|
|
return False, None
|
|
# 大群撤事件,最多相隔1s
|
@classmethod
|
def __compute_order_cancel_begin_single(cls, code, start_index, continue_count, end_index):
|
datas = local_today_datas[code]
|
if end_index - start_index + 1 < continue_count:
|
return None, None
|
count = 0
|
start = -1
|
start_time = None
|
for i in range(start_index, end_index + 1):
|
_val = datas[i]["val"]
|
_timestamp = L2DataUtil.get_time_as_second(_val["time"])
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
|
if start == -1:
|
start = i
|
start_time = L2DataUtil.get_time_as_second(_val["time"])
|
count += datas[i]["re"]
|
elif not L2DataUtil.is_limit_up_price_sell(_val):
|
if count >= continue_count:
|
return start, i - 1
|
start = -1
|
count = 0
|
start_time = None
|
if count >= continue_count:
|
return start, end_index
|
else:
|
return None, None
|
|
# 小群撤事件
|
@classmethod
|
def __compute_order_cancel_little_begin_single(cls, code, start_index, continue_count, end_index=None):
|
# 必须为同一秒的数据
|
same_second = True
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if len(datas) - start_index < continue_count:
|
return None, None
|
count = 0
|
start = -1
|
start_time = None
|
if end_index is None:
|
end_index = __len - continue_count
|
for i in range(start_index, end_index + 1):
|
_val = datas[i]["val"]
|
_timestamp = L2DataUtil.get_time_as_second(_val["time"])
|
if _timestamp < second_930:
|
continue
|
# 间隔时间不能多于1s
|
if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
|
if start == -1:
|
start = i
|
start_time = L2DataUtil.get_time_as_second(_val["time"])
|
count += int(datas[i]["re"])
|
elif not L2DataUtil.is_limit_up_price_sell(_val):
|
if count >= continue_count:
|
return start, i - 1
|
start = -1
|
count = 0
|
start_time = None
|
if count >= continue_count:
|
return start, end_index
|
else:
|
return None, None
|
|
# 虚拟下单
|
def __unreal_order(self):
|
pass
|
|
@classmethod
|
def __get_threshmoney(cls, code):
|
money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
|
return money
|
|
# 获取预估挂买位
|
@classmethod
|
def __get_sure_order_pos(cls, code):
|
index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
|
if index is None:
|
return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
|
else:
|
return 1, index, data
|
|
# 过时 统计买入净买量
|
@classmethod
|
def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
|
total_datas = local_today_datas[code]
|
buy_nums = origin_num
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
for i in range(compute_start_index, len(total_datas)):
|
_val = total_datas[i]["val"]
|
# 有连续4个涨停买就标记计算起始点
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
if buy_nums >= threshold_num:
|
cls.debug(code, "获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", i, buy_nums, threshold_num)
|
return i, buy_nums
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停买撤
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
cls.debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums,
|
threshold_num)
|
return None, buy_nums
|
|
# 过时 统计买入净买量,不计算在买入信号之前的买撤单
|
@classmethod
|
def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index):
|
total_datas = local_today_datas[code]
|
buy_nums = origin_num
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
property_buy_num_count = 0
|
same_time_property = cls.__get_same_time_property(code)
|
for i in range(compute_start_index, len(total_datas)):
|
data = total_datas[i]
|
_val = total_datas[i]["val"]
|
# 有连续4个涨停买就标记计算起始点
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
if buy_nums >= threshold_num:
|
logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停买撤
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index >= buy_single_index:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
|
else:
|
cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
|
if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,而且还在预估买入位之后按概率计算
|
property_buy_num_count -= int(_val["num"]) * int(data["re"])
|
cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
property_buy_num = round(property_buy_num_count * same_time_property)
|
cls.buy_debug(code, "买入信号点之前同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
|
buy_nums + property_buy_num, threshold_num)
|
# 有撤单信号,且小于阈值
|
if buy_nums + property_buy_num >= threshold_num:
|
return i, buy_nums + property_buy_num
|
|
cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
|
buy_nums + property_buy_num,
|
threshold_num)
|
return None, buy_nums + property_buy_num
|
|
# 统计买入净买量,不计算在买入信号之前的买撤单
|
@classmethod
|
def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, threshold_money,
|
buy_single_index,
|
capture_time):
|
total_datas = local_today_datas[code]
|
buy_nums = origin_num
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
|
for i in range(compute_start_index, compute_end_index + 1):
|
data = total_datas[i]
|
_val = total_datas[i]["val"]
|
if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
|
TradePointManager.delete_buy_point(code)
|
if i == compute_end_index:
|
# 数据处理完毕
|
return None, buy_nums, None
|
else:
|
# 计算买入信号,不能同一时间开始计算
|
for ii in range(buy_single_index + 1, compute_end_index + 1):
|
if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
|
return None, buy_nums, ii
|
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
if buy_nums >= threshold_num:
|
logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停买撤
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index >= buy_single_index:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
|
else:
|
cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
|
if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,当作买入信号之后处理
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
|
buy_nums, threshold_num)
|
# 有撤单信号,且小于阈值
|
if buy_nums >= threshold_num:
|
return i, buy_nums, None
|
|
cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
|
buy_nums,
|
threshold_num)
|
return None, buy_nums, None
|
|
# 计算买入信号之前的且和买入信号数据在同一时间的数量
|
@classmethod
|
def __count_l2_data_before_for_cancel(cls, code, buy_single_index):
|
total_data = local_today_datas[code]
|
single_time = total_data[buy_single_index]["val"]["time"]
|
buy_count = 0
|
cancel_count = 0
|
for i in range(buy_single_index, -1, -1):
|
if single_time == total_data[i]["val"]["time"]:
|
if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
|
buy_count += int(total_data[i]["re"])
|
elif L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
|
cancel_count += int(total_data[i]["re"])
|
else:
|
break
|
return buy_count, cancel_count
|
|
@classmethod
|
def __count_l2_data_for_cancel(cls, code, start_index, end_index):
|
total_data = local_today_datas[code]
|
buy_count = 0
|
cancel_count = 0
|
for i in range(start_index, end_index + 1):
|
if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
|
buy_count += int(total_data[i]["re"])
|
elif L2DataUtil.is_limit_up_price_buy_cancel(total_data[i]["val"]):
|
cancel_count += int(total_data[i]["re"])
|
return buy_count, cancel_count
|
|
# 同一时间买入的概率计算
|
@classmethod
|
def __get_same_time_property(cls, code):
|
# 计算板块热度
|
industry = global_util.code_industry_map.get(code)
|
if industry is not None:
|
hot_num = global_util.industry_hot_num.get(industry)
|
if hot_num is not None:
|
return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
|
return 0.5
|
|
# 过时 统计买撤净买量
|
@classmethod
|
def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True):
|
buy_nums = origin_num
|
total_datas = local_today_datas[code]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
# 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
|
sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
|
same_time_property = cls.__get_same_time_property(code)
|
# 同一秒,在预估买入位之后的数据之和
|
property_buy_num_count = 0
|
cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{} 是否有撤单信号:{}", start_index, len(total_datas) - 1, sure_pos,
|
cancel_single)
|
for i in range(start_index, len(total_datas)):
|
data = total_datas[i]
|
_val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
if i < sure_pos:
|
buy_nums += int(_val["num"]) * int(data["re"])
|
elif sure_data["val"]["time"] == _val["time"]:
|
# 同一秒买入,而且还在预估买入位之后
|
property_buy_num_count += int(_val["num"]) * int(data["re"])
|
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停撤买
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index < sure_pos:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
|
else:
|
cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
|
if sure_data["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,而且还在预估买入位之后按概率计算
|
property_buy_num_count -= int(_val["num"]) * int(data["re"])
|
cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
|
property_buy_num = round(property_buy_num_count * same_time_property)
|
cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
|
buy_nums + property_buy_num, threshold_num)
|
# 有撤单信号,且小于阈值
|
if buy_nums + property_buy_num <= threshold_num and cancel_single:
|
return i, buy_nums + property_buy_num, sure_type
|
buy_num_news = buy_nums + round(property_buy_num_count * same_time_property)
|
cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news)
|
return None, buy_num_news, sure_type
|
|
# 统计买撤净买量
|
|
@classmethod
|
def __count_num_for_cancel_order(cls, code, start_index, origin_buy_num, origin_cancel_num, min_rate,
|
betch_cancel_single=True):
|
buy_nums = origin_buy_num
|
buy_cancel_num = origin_cancel_num
|
total_datas = local_today_datas[code]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
# 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
|
for i in range(start_index, len(total_datas)):
|
data = total_datas[i]
|
_val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(data["re"])
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
buy_cancel_num += int(data["re"])
|
# 有撤单信号,且小于阈值
|
if (buy_nums - buy_cancel_num) / buy_cancel_num <= min_rate and betch_cancel_single:
|
return i, buy_nums, buy_cancel_num
|
return None, buy_nums, buy_cancel_num
|
|
@classmethod
|
def test(cls):
|
code = "000593"
|
load_l2_data(code, True)
|
|
if False:
|
state = trade_manager.get_trade_state(code)
|
cls.random_key[code] = random.randint(0, 100000)
|
capture_timestamp = 1999988888
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, 201, 237, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, 201, 237, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
return
|
|
_start = t.time()
|
# 按s批量化数据
|
total_datas = local_today_datas[code]
|
start_time = total_datas[0]["val"]["time"]
|
start_index = 0
|
for i in range(0, len(total_datas)):
|
if total_datas[i]["val"]["time"] != start_time:
|
cls.random_key[code] = random.randint(0, 100000)
|
# 处理数据
|
start = start_index
|
# if start != 201:
|
# continue
|
end = i - 1
|
print("处理进度:{},{}".format(start, end))
|
capture_timestamp = 1999999999
|
state = trade_manager.get_trade_state(code)
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, start, end, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, start, end, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
# t.sleep(1)
|
start_index = i
|
start_time = total_datas[i]["val"]["time"]
|
|
print("时间花费:", round((t.time() - _start) * 1000))
|
|
@classmethod
|
def test1(cls):
|
code = "000593"
|
load_l2_data(code, True)
|
print(cls.__compute_order_begin_pos(code, 232, 3, 239))
|
|
@classmethod
|
def test2(cls):
|
code = "600082"
|
load_l2_data(code, True)
|
cls.random_key[code] = random.randint(0, 100000)
|
need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123)
|
|
@classmethod
|
def test_can_order(cls):
|
code = "000948"
|
|
global_data_loader.load_industry()
|
limit_up_time_manager.load_limit_up_time()
|
print(cls.__can_buy(code))
|
|
|
# 连续涨停买单数最大值管理器
|
class L2ContinueLimitUpCountManager:
|
@classmethod
|
def del_data(cls, code):
|
cls.__del_last_record(code)
|
cls.__del_max(code)
|
|
# 获取最大值
|
@classmethod
|
def __get_max(cls, code):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
val = redis.get(key)
|
if val is not None:
|
return int(val)
|
else:
|
return None
|
|
# 保存最大值
|
@classmethod
|
def __save_max(cls, code, max_num):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.setex(key, tool.get_expire(), max_num)
|
|
@classmethod
|
def __del_max(cls, code):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.delete(key)
|
|
# 保存上一条数据最大值
|
@classmethod
|
def __save_last_record(cls, code, _time, count, index):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.setex(key, tool.get_expire(), json.dumps((_time, count, index)))
|
|
@classmethod
|
def __del_last_record(cls, code):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.delete(key)
|
|
@classmethod
|
def __get_last_record(cls, code):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
val = redis.get(key)
|
if val is None:
|
return None, None, None
|
else:
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
last_time, last_count, last_index = cls.__get_last_record(code)
|
total_datas = local_today_datas[code]
|
time_count_dict = {}
|
for index in range(start_index, end_index + 1):
|
if last_index is not None and last_index >= index:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
|
if last_count is None:
|
last_count = 0
|
last_time = total_datas[index]["val"]["time"]
|
last_index = index
|
if last_time == total_datas[index]["val"]["time"]:
|
last_count += total_datas[index]["re"]
|
last_index = index
|
else:
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
last_count = total_datas[index]["re"]
|
last_time = total_datas[index]["val"]["time"]
|
last_index = index
|
else:
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
last_count = 0
|
last_time = None
|
last_index = None
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
# 保存latest
|
cls.__save_last_record(code, last_time, last_count, last_index)
|
else:
|
# 移除
|
cls.__del_last_record(code)
|
|
# 查找这批数据中的最大数量
|
max_time = None
|
max_num = None
|
for key in time_count_dict:
|
if max_time is None:
|
max_time = key
|
max_num = time_count_dict[key]
|
if time_count_dict[key] > max_num:
|
max_num = time_count_dict[key]
|
max_time = key
|
if max_num is not None:
|
old_max = cls.__get_max(code)
|
if old_max is None or max_num > old_max:
|
cls.__save_max(code, max_num)
|
|
@classmethod
|
def get_continue_count(cls, code):
|
count = cls.__get_max(code)
|
if count is None:
|
count = 0
|
count = count // 3
|
if count < 15:
|
count = 15
|
return count
|
|
|
# 大单处理器
|
class L2BigNumProcessor:
|
# 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
|
@classmethod
|
def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index):
|
if max_num_info is None:
|
return False, None
|
# 如果是买入单,需要看他前面同一秒是否有撤单
|
if int(max_num_info["val"]["operateType"]) == 0:
|
# 只有买撤信号在买入信号之前的同一秒的单才会撤单情况
|
_map = local_today_num_operate_map.get(code)
|
if _map is not None:
|
cancel_datas = _map.get(
|
"{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"]))
|
|
if cancel_datas is not None:
|
for cancel_data in cancel_datas:
|
# 只能在当前规定的数据范围查找,以防出现重复查找
|
if cancel_data["index"] < start_index or cancel_data["index"] > end_index:
|
continue
|
if cancel_data["index"] > max_num_info["index"]:
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
|
local_today_num_operate_map[
|
code])
|
if buy_index is None:
|
continue
|
if buy_data["val"]["time"] != max_num_info["val"]["time"]:
|
continue
|
|
min_space, max_space = l2_data_util.compute_time_space_as_second(
|
cancel_data["val"]["cancelTime"],
|
cancel_data["val"][
|
"cancelTimeUnit"])
|
if min_space < 60:
|
L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}",
|
json.dumps(cancel_data))
|
return True, cancel_data
|
else:
|
# 如果间隔时间大于等于60s,这判断小群撤事件
|
L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}",
|
json.dumps(cancel_data))
|
return False, cancel_data
|
return False, None
|
else:
|
return True, None
|
|
# 计算数量最大的涨停买/涨停撤
|
@classmethod
|
def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time):
|
new_max_info = max_num_info
|
max_num = 0
|
if max_num_info is not None:
|
max_num = int(max_num_info["val"]["num"])
|
# 计算大单
|
total_data = local_today_datas[code]
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel(
|
val):
|
continue
|
|
# 下单时间与买入执行时间之差大于60s的不做处理
|
if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
pass
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只能处理1s内的撤单
|
if min_space > 1:
|
continue
|
|
# buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
# local_today_num_operate_map.get(code))
|
# if buy_index is None:
|
# continue
|
# if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds(
|
# buy_exec_time) > 1:
|
# continue
|
|
num = int(total_data[i]["val"]["num"])
|
if num > max_num:
|
max_num = num
|
new_max_info = data
|
return new_max_info
|
|
@classmethod
|
def __save_big_num_pos(cls, code, index):
|
redis = _redisManager.getRedis()
|
redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index)
|
|
@classmethod
|
def __get_big_num_pos(cls, code):
|
redis = _redisManager.getRedis()
|
index = redis.get("big_num_pos-{}".format(code))
|
if index is not None:
|
return int(index)
|
return index
|
|
@classmethod
|
def del_big_num_pos(cls, code):
|
redis = _redisManager.getRedis()
|
redis.delete("big_num_pos-{}".format(code))
|
|
@classmethod
|
def __cancel_buy(cls, code, index):
|
L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
|
L2TradeDataProcessor.cancel_buy(code)
|
|
# 处理数据中的大单,返回是否已经撤单和撤单数据的时间
|
@classmethod
|
def process_cancel_with_big_num(cls, code, start_index, end_index):
|
total_data = local_today_datas[code]
|
# 如果无下单信号就无需处理
|
buy_single_index, buy_exec_index, compute_index, nums = TradePointManager.get_buy_compute_start_data(code)
|
if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0:
|
return False, None
|
# 判断是否有大单记录
|
index = cls.__get_big_num_pos(code)
|
# 无大单记录
|
if index is None:
|
# 计算大单
|
new_max_info = cls.__compute_max_num(code, start_index, end_index, None,
|
total_data[buy_exec_index]["val"]["time"])
|
if new_max_info is None:
|
return False, None
|
L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
|
index = new_max_info["index"]
|
# 大单是否有撤单信号
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index)
|
if need_cancel:
|
# 需要撤单
|
# 撤单
|
L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
|
cls.__cancel_buy(code, new_max_info["index"])
|
return True, cancel_data,
|
|
else:
|
# 无需撤单
|
# 保存大单记录
|
cls.__save_big_num_pos(code, index)
|
return False, None
|
else:
|
# 有大单记录
|
need_cancel = False
|
cancel_index = -1
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index)
|
# 需要撤单
|
if need_cancel:
|
# 撤单
|
cls.__cancel_buy(code, cancel_index)
|
return True, cancel_data
|
# 无需撤单
|
else:
|
# 计算新的大单
|
max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index],
|
total_data[buy_exec_index]["val"]["time"])
|
if index == int(max_num_data["index"]):
|
return False, cancel_data
|
L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
|
|
# 大单是否有撤单信号
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"],
|
end_index)
|
if need_cancel:
|
# 需要撤单
|
# 撤单
|
cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
|
L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
|
max_num_data["index"])
|
return True, cancel_data
|
else:
|
# 无需撤单
|
# 保存大单记录
|
cls.__save_big_num_pos(code, max_num_data["index"])
|
return False, cancel_data
|
|
@classmethod
|
def test(cls):
|
code = "000036"
|
load_l2_data(code, True)
|
new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59")
|
print(new_max_info)
|
|
|
# 大群撤大单跟踪
|
class L2BetchCancelBigNumProcessor:
|
@classmethod
|
def __get_recod(cls, code):
|
redis = _redisManager.getRedis()
|
_val = redis.get("betch_cancel_big_num-{}".format(code))
|
if _val is None:
|
return None, None
|
else:
|
datas = json.loads(_val)
|
return datas[0], datas[1]
|
|
@classmethod
|
def del_recod(cls, code):
|
redis = _redisManager.getRedis()
|
key = "betch_cancel_big_num-{}".format(code)
|
redis.delete(key)
|
|
@classmethod
|
def __save_recod(cls, code, max_big_num_info, big_nums_info):
|
redis = _redisManager.getRedis()
|
key = "betch_cancel_big_num-{}".format(code)
|
redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info)))
|
|
# 暂时弃用
|
@classmethod
|
def need_cancel(cls, code, start_index, end_index):
|
# 是否需要撤单
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
if big_nums_info is None:
|
# 无大单信息
|
return True
|
nums_set = set()
|
index_set = set()
|
|
for d in big_nums_info:
|
nums_set.add(d[0])
|
index_set.add(d[1])
|
|
total_datas = local_today_datas[code]
|
|
count = 0
|
latest_buy_index = end_index
|
for index in range(start_index, end_index + 1):
|
if not nums_set.__contains__(total_datas[index]["val"]["num"]):
|
continue
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index],
|
local_today_num_operate_map[code])
|
if buy_index is None:
|
continue
|
if index_set.__contains__(buy_index):
|
count += buy_data["re"]
|
latest_buy_index = buy_index
|
|
# 获取大单数量
|
total_count = 0
|
for i in index_set:
|
if i <= latest_buy_index:
|
total_count += total_datas[i]["re"]
|
L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
|
# 大单小于5笔无脑撤,后修改为无大单无脑撤
|
if total_count <= 0:
|
return True
|
|
# 大单撤单笔数大于总大单笔数的1/5就撤单
|
if count / total_count >= 0.2:
|
return True
|
else:
|
return False
|
|
pass
|
|
# def need_cancel(cls, code, start_index, end_index):
|
# total_datas = local_today_datas[code]
|
# for index in range(start_index,end_index+1):
|
# price = total_datas[index]["val"]["price"]
|
# num = total_datas[index]["val"]["num"]
|
# if total_datas[index]
|
|
# 过时
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
# 寻找最大值
|
for index in range(start_index, end_index + 1):
|
# 只处理涨停买与涨停买撤
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
if max_big_num_info is None:
|
max_big_num_info = (
|
int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"])
|
|
if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]:
|
max_big_num_info = (
|
int(total_datas[index]["val"]["num"]), total_datas[index]["index"])
|
# 将大于最大值90%的数据加入
|
if max_big_num_info is not None:
|
min_num = round(max_big_num_info[0] * 0.9)
|
|
for index in range(start_index, end_index + 1):
|
# 只统计涨停买
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
|
if int(total_datas[index]["val"]["num"]) >= min_num:
|
if big_nums_info is None:
|
big_nums_info = []
|
big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
|
# 移除小于90%的数据
|
big_nums_info_new = []
|
index_set = set()
|
for d in big_nums_info:
|
if d[0] >= min_num:
|
if not index_set.__contains__(d[1]):
|
index_set.add(d[1])
|
big_nums_info_new.append(d)
|
cls.__save_recod(code, max_big_num_info, big_nums_info_new)
|
|
# 最新方法
|
@classmethod
|
def process_new(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
# 大于等于8000手或者金额>=300万就是大单
|
|
for index in range(start_index, end_index + 1):
|
# 只统计涨停买
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
# 大于等于8000手或者金额 >= 300
|
# 万就是大单
|
if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float(
|
total_datas[index]["val"]["price"]) >= 30000:
|
if big_nums_info is None:
|
big_nums_info = []
|
big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
|
# 移除小于90%的数据
|
big_nums_info_new = []
|
index_set = set()
|
if big_nums_info is not None:
|
for d in big_nums_info:
|
if not index_set.__contains__(d[1]):
|
index_set.add(d[1])
|
big_nums_info_new.append(d)
|
cls.__save_recod(code, max_big_num_info, big_nums_info_new)
|
|
|
# 卖跟踪
|
class L2SellProcessor:
|
@classmethod
|
def __get_recod(cls, code):
|
redis = _redisManager.getRedis()
|
_val = redis.get("sell_num-{}".format(code))
|
if _val is None:
|
return None, None
|
else:
|
datas = json.loads(_val)
|
return datas[0], datas[1]
|
|
@classmethod
|
def del_recod(cls, code):
|
redis = _redisManager.getRedis()
|
key = "sell_num-{}".format(code)
|
redis.delete(key)
|
|
@classmethod
|
def __save_recod(cls, code, process_index, count):
|
redis = _redisManager.getRedis()
|
key = "sell_num-{}".format(code)
|
redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
|
|
# 暂时弃用
|
@classmethod
|
def need_cancel(cls, code, start_index, end_index):
|
# 是否需要撤单
|
process_index, count = cls.__get_recod(code)
|
if process_index is None:
|
# 无卖的信息
|
return False
|
if count is None:
|
count = 0
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
return False
|
if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
|
global_util.zyltgb_map[code]):
|
return True
|
return False
|
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
process_index, count = cls.__get_recod(code)
|
# 寻找最大值
|
for index in range(start_index, end_index + 1):
|
# 只处理涨停卖
|
if not L2DataUtil.is_limit_up_price_sell(
|
total_datas[index]["val"]):
|
continue
|
# 不处理历史数据
|
if process_index is not None and process_index >= index:
|
continue
|
if count is None:
|
count = 0
|
count += int(total_datas[index]["val"]["num"])
|
if process_index is None:
|
process_index = end_index
|
cls.__save_recod(code, process_index, count)
|
|
|
|
def __get_time_second(time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
|
|
# 是否是涨停价买
|
def __is_limit_up_price_buy(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
def __is_limit_up_price_buy_cancel(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
# 获取涨停买入起始点
|
def __get_limit_up_buy_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
__time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
|
# 有连续4个涨停买就标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
# index_3 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_2 = j
|
break
|
# if index_2 > 0:
|
# for j in range(index_2 + 1, __len):
|
# # 涨停买
|
# if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
|
# index_3 = j
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1
|
logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
|
return i
|
# 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
# 涨停买
|
if __time is None:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _time == _val["time"]:
|
_limit_up_count_1s += 1
|
else:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _val["operateType"] == 1:
|
# 买撤
|
_time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
|
logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
|
return _limit_up_count_1s_start_index
|
|
return None
|
|
|
# 获取涨停撤销起始点
|
def __get_limit_up_buy_cancel_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
# 有连续3个买撤
|
if __is_limit_up_price_buy_cancel(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_2 = j
|
break
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1:
|
# logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
|
return i
|
return None
|
|
|
# 是否有禁止交易特征
|
def __is_have_forbidden_feature(code, data_count, __continue_count):
|
# 09:30:00出现连续撤销的数量大于一定的值
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
if __len > data_count:
|
start_index = __len - data_count
|
cancel_start = -1
|
cancel_count = 0
|
for i in range(start_index, __len):
|
_val = datas[i]["val"]
|
if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00":
|
# 09:30第一条数据
|
if _val["operateType"] == 1:
|
cancel_start = i
|
else:
|
return False
|
elif cancel_start > -1:
|
# 连续撤销
|
if _val["operateType"] == 1 and i - cancel_start == 1:
|
cancel_start = i
|
cancel_count += 1
|
if cancel_count >= __continue_count:
|
return True
|
else:
|
return False
|
return False
|
|
|
# 设置最新的l2数据采集的数量
|
def __set_l2_data_latest_count(code, count):
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
redis.setex(key, 2, count)
|
pass
|
|
|
# 获取代码最近的l2数据数量
|
def get_l2_data_latest_count(code):
|
if code is None or len(code) < 1:
|
return 0
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
|
result = redis.get(key)
|
if result is None:
|
return 0
|
else:
|
return int(result)
|
|
|
# 初始化l2固定代码库
|
def init_l2_fixed_codes():
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
count = redis.scard(key)
|
if count > 0:
|
redis.delete(key)
|
redis.sadd(key, "000000")
|
redis.expire(key, tool.get_expire())
|
|
|
# 移除l2固定监控代码
|
def remove_from_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.srem(key, code)
|
|
|
# 添加代码到L2固定监控
|
def add_to_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.sadd(key, code)
|
redis.expire(key, tool.get_expire())
|
|
|
# 是否在l2固定监控代码中
|
def is_in_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
return redis.sismember(key, code)
|
|
|
if __name__ == "__main__":
|
# 处理数据
|
code = "002898"
|
load_l2_data(code)
|
L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")
|