"""
|
L2的数据处理
|
"""
|
import decimal
|
import json
|
import logging
|
import random
|
import time as t
|
from datetime import datetime
|
|
import big_money_num_manager
|
import code_data_util
|
import constant
|
import global_data_loader
|
import global_util
|
import industry_codes_sort
|
import l2_data_log
|
import l2_data_util
|
|
import gpcode_manager
|
import l2_trade_factor
|
import log
|
|
import redis_manager
|
import ths_industry_util
|
import tool
|
import trade_manager
|
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_data
|
import trade_data_manager
|
import limit_up_time_manager
|
|
_redisManager = redis_manager.RedisManager(1)
|
# l2数据管理
|
# 本地最新一次上传的数据
|
local_latest_datas = {}
|
# 本地今日数据
|
local_today_datas = {}
|
# 本地手数+操作那类型组成的临时变量
|
# 用于加快数据处理,用空换时间
|
local_today_num_operate_map = {}
|
|
|
class L2DataException(Exception):
|
# 价格不匹配
|
CODE_PRICE_ERROR = 1
|
# 无收盘价
|
CODE_NO_CLOSE_PRICE = 2
|
|
def __init__(self, code, msg):
|
super().__init__(self)
|
self.code = code
|
self.msg = msg
|
|
def __str__(self):
|
return self.msg
|
|
def get_code(self):
|
return self.code
|
|
|
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
|
class TradePointManager:
|
@staticmethod
|
def __get_redis():
|
return _redisManager.getRedis()
|
|
# 删除买入点数据
|
@staticmethod
|
def delete_buy_point(code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_compute_index_info-{}".format(code))
|
|
# 获取买入点信息
|
# 返回数据为:买入点 累计纯买额 已经计算的数据索引
|
@staticmethod
|
def get_buy_compute_start_data(code):
|
redis = TradePointManager.__get_redis()
|
_key = "buy_compute_index_info-{}".format(code)
|
_data_json = redis.get(_key)
|
if _data_json is None:
|
return None, None, None, 0, 0, []
|
_data = json.loads(_data_json)
|
return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5]
|
|
# 设置买入点的值
|
# buy_single_index 买入信号位
|
# buy_exec_index 买入执行位
|
# compute_index 计算位置
|
# nums 累计纯买额
|
@staticmethod
|
def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
_key = "buy_compute_index_info-{}".format(code)
|
if buy_single_index is not None:
|
redis.setex(_key, expire,
|
json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
|
else:
|
_buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index = TradePointManager.get_buy_compute_start_data(
|
code)
|
redis.setex(_key, expire,
|
json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
|
|
# 获取撤买入开始计算的信息
|
# 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
|
@staticmethod
|
def get_buy_cancel_single_pos(code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("buy_cancel_single_pos-{}".format(code))
|
if info is None:
|
return None
|
else:
|
return int(info)
|
|
# 设置买撤点信息
|
# buy_num 纯买额 computed_index计算到的下标 index撤买信号起点
|
|
@classmethod
|
def set_buy_cancel_single_pos(cls, code, index):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
|
|
# 删除买撤点数据
|
@classmethod
|
def delete_buy_cancel_point(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_cancel_single_pos-{}".format(code))
|
|
# 设置买撤纯买额
|
@classmethod
|
def set_compute_info_for_cancel_buy(cls, code, index, nums):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
|
logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
|
|
# 获取买撤纯买额计算信息
|
@classmethod
|
def get_compute_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("compute_info_for_cancel_buy-{}".format(code))
|
if info is None:
|
return None, 0
|
else:
|
info = json.loads(info)
|
return info[0], info[1]
|
|
@classmethod
|
def delete_compute_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("compute_info_for_cancel_buy-{}".format(code))
|
|
# 从买入信号开始设置涨停买与涨停撤的单数
|
@classmethod
|
def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
|
logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
|
|
# 获取买撤纯买额计算信息
|
@classmethod
|
def get_count_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("count_info_for_cancel_buy-{}".format(code))
|
if info is None:
|
return None, 0, 0
|
else:
|
info = json.loads(info)
|
return info[0], info[1], info[2]
|
|
@classmethod
|
def delete_count_info_for_cancel_buy(cls, code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("count_info_for_cancel_buy-{}".format(code))
|
|
|
def load_l2_data(code, force=False):
|
redis = _redisManager.getRedis()
|
# 加载最近的l2数据
|
if local_latest_datas.get(code) is None or force:
|
# 获取最近的数据
|
_data = redis.get("l2-data-latest-{}".format(code))
|
if _data is not None:
|
if code in local_latest_datas:
|
local_latest_datas[code] = json.loads(_data)
|
else:
|
local_latest_datas.setdefault(code, json.loads(_data))
|
# 获取今日的数据
|
|
if local_today_datas.get(code) is None or force:
|
datas = log.load_l2_from_log()
|
datas = datas.get(code)
|
if datas is None:
|
datas = []
|
local_today_datas[code] = datas
|
|
# 从数据库加载
|
# datas = []
|
# keys = redis.keys("l2-{}-*".format(code))
|
# for k in keys:
|
# value = redis.get(k)
|
# _data = l2_data_util.l2_data_key_2_obj(k, value)
|
# datas.append(_data)
|
# # 排序
|
# new_datas = sorted(datas,
|
# key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
|
# local_today_datas[code] = new_datas
|
# 根据今日数据加载
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
|
|
|
@tool.async_call
|
def saveL2Data(code, datas, msg=""):
|
start_time = round(t.time() * 1000)
|
# 查询票是否在待监听的票里面
|
if not gpcode_manager.is_in_gp_pool(code):
|
return None
|
# 验证股价的正确性
|
redis_instance = _redisManager.getRedis()
|
|
try:
|
if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
|
|
# 计算保留的时间
|
expire = tool.get_expire()
|
i = 0
|
for _data in datas:
|
i += 1
|
key = "l2-" + _data["key"]
|
value = redis_instance.get(key)
|
if value is None:
|
# 新增
|
try:
|
value = {"index": _data["index"], "re": _data["re"]}
|
redis_instance.setex(key, expire, json.dumps(value))
|
except:
|
logging.error("更正L2数据出错:{} key:{}".format(code, key))
|
else:
|
json_value = json.loads(value)
|
if json_value["re"] != _data["re"]:
|
json_value["re"] = _data["re"]
|
redis_instance.setex(key, expire, json.dumps(json_value))
|
finally:
|
redis_instance.delete("l2-save-{}".format(code))
|
|
print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
|
return datas
|
|
|
def parseL2Data(str):
|
day = datetime.now().strftime("%Y%m%d")
|
dict = json.loads(str)
|
data = dict["data"]
|
client = dict["client"]
|
code = data["code"]
|
channel = data["channel"]
|
capture_time = data["captureTime"]
|
process_time = data["processTime"]
|
data = data["data"]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
|
# 获取涨停价
|
return day, client, channel, code, capture_time, process_time, datas, data
|
|
|
# 保存l2数据
|
def save_l2_data(code, datas, add_datas):
|
redis = _redisManager.getRedis()
|
# 只有有新曾数据才需要保存
|
if len(add_datas) > 0:
|
# 保存最近的数据
|
__start_time = round(t.time() * 1000)
|
redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
|
l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
|
# 设置进内存
|
local_latest_datas[code] = datas
|
__set_l2_data_latest_count(code, len(datas))
|
try:
|
logger_l2_data.info("{}-{}", code, add_datas)
|
except Exception as e:
|
logging.exception(e)
|
saveL2Data(code, add_datas)
|
|
|
# 清除l2数据
|
def clear_l2_data(code):
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
keys = redis_l2.keys("l2-{}-*".format(code))
|
for k in keys:
|
redis_l2.delete(k)
|
|
redis_l2.delete("l2-data-latest-{}".format(code))
|
|
|
class L2DataUtil:
|
@classmethod
|
def is_same_time(cls, time1, time2):
|
if constant.TEST:
|
return True
|
time1_s = time1.split(":")
|
time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
|
time2_s = time2.split(":")
|
time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
|
if abs(time2_second - time1_second) < 3:
|
return True
|
else:
|
return False
|
|
# 获取增量数据
|
@classmethod
|
def get_add_data(cls, code, datas, _start_index):
|
if datas is not None and len(datas) < 1:
|
return []
|
last_data = None
|
latest_datas_ = local_latest_datas.get(code)
|
if latest_datas_ is not None and len(latest_datas_) > 0:
|
last_data = latest_datas_[-1]
|
|
count = 0
|
start_index = -1
|
# 如果原来没有数据
|
# 设置add_data的序号
|
for n in reversed(datas):
|
count += 1
|
if n["key"] == (last_data["key"] if last_data is not None else ""):
|
start_index = len(datas) - count
|
break
|
|
_add_datas = []
|
if last_data is not None:
|
if start_index < 0:
|
if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
|
last_data["val"]["time"]):
|
_add_datas = datas
|
else:
|
_add_datas = []
|
elif start_index + 1 >= len(datas):
|
_add_datas = []
|
else:
|
_add_datas = datas[start_index + 1:]
|
else:
|
_add_datas = datas[start_index + 1:]
|
for i in range(0, len(_add_datas)):
|
_add_datas[i]["index"] = _start_index + i
|
|
return _add_datas
|
|
# 纠正数据,将re字段替换为较大值
|
@classmethod
|
def correct_data(cls, code, _datas):
|
latest_data = local_latest_datas.get(code)
|
if latest_data is None:
|
latest_data = []
|
save_list = []
|
for data in _datas:
|
for _ldata in latest_data:
|
# 新数据条数比旧数据多才保存
|
if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
|
max_re = max(_ldata["re"], data["re"])
|
_ldata["re"] = max_re
|
data["re"] = max_re
|
# 保存到数据库,更新re的数据
|
save_list.append(_ldata)
|
if len(save_list) > 0:
|
saveL2Data(code, save_list, "保存纠正数据")
|
local_latest_datas[code] = latest_data
|
return _datas
|
|
# 处理l2数据
|
@classmethod
|
def format_l2_data(cls, data, code, limit_up_price):
|
datas = []
|
dataIndexs = {}
|
same_time_num = {}
|
for item in data:
|
# 解析数据
|
time = item["time"]
|
if time in same_time_num:
|
same_time_num[time] = same_time_num[time] + 1
|
else:
|
same_time_num[time] = 1
|
|
price = float(item["price"])
|
num = item["num"]
|
limitPrice = item["limitPrice"]
|
# 涨停价
|
if limit_up_price is not None:
|
if limit_up_price == tool.to_price(decimal.Decimal(price)):
|
limitPrice = 1
|
else:
|
limitPrice = 0
|
item["limitPrice"] = "{}".format(limitPrice)
|
operateType = item["operateType"]
|
# 不需要非涨停买与买撤
|
if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
|
continue
|
|
cancelTime = item["cancelTime"]
|
cancelTimeUnit = item["cancelTimeUnit"]
|
key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
|
cancelTimeUnit)
|
if key in dataIndexs:
|
# 数据重复次数+1
|
datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
|
else:
|
# 数据重复次数默认为1
|
datas.append({"key": key, "val": item, "re": 1})
|
dataIndexs.setdefault(key, len(datas) - 1)
|
# TODO 测试的时候开启,方便记录大单数据
|
# l2_data_util.save_big_data(code, same_time_num, data)
|
return datas
|
|
@classmethod
|
def get_time_as_second(cls, time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# @classmethod
|
# def get_time_as_str(cls, time_seconds):
|
# ts = time_str.split(":")
|
# return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# 是否是涨停价买
|
@classmethod
|
def is_limit_up_price_buy(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否为涨停卖
|
@classmethod
|
def is_limit_up_price_sell(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 2:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否涨停买撤
|
@classmethod
|
def is_limit_up_price_buy_cancel(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否卖撤
|
@classmethod
|
def is_sell_cancel(cls, val):
|
if int(val["operateType"]) == 3:
|
return True
|
return False
|
|
# 是否为卖
|
@classmethod
|
def is_sell(cls, val):
|
if int(val["operateType"]) == 2:
|
return True
|
return False
|
|
# 连续涨停买单数最大值管理器
|
class L2ContinueLimitUpCountManager:
|
@classmethod
|
def del_data(cls, code):
|
cls.__del_last_record(code)
|
cls.__del_max(code)
|
|
# 获取最大值
|
@classmethod
|
def __get_max(cls, code):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
val = redis.get(key)
|
if val is not None:
|
return int(val)
|
else:
|
return None
|
|
# 保存最大值
|
@classmethod
|
def __save_max(cls, code, max_num):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.setex(key, tool.get_expire(), max_num)
|
|
@classmethod
|
def __del_max(cls, code):
|
key = "max_same_time_buy_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.delete(key)
|
|
# 保存上一条数据最大值
|
@classmethod
|
def __save_last_record(cls, code, _time, count, index):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.setex(key, tool.get_expire(), json.dumps((_time, count, index)))
|
|
@classmethod
|
def __del_last_record(cls, code):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
redis.delete(key)
|
|
@classmethod
|
def __get_last_record(cls, code):
|
key = "same_time_buy_last_count-{}".format(code)
|
redis = _redisManager.getRedis()
|
val = redis.get(key)
|
if val is None:
|
return None, None, None
|
else:
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
last_time, last_count, last_index = cls.__get_last_record(code)
|
total_datas = local_today_datas[code]
|
time_count_dict = {}
|
for index in range(start_index, end_index + 1):
|
if last_index is not None and last_index >= index:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
|
if last_count is None:
|
last_count = 0
|
last_time = total_datas[index]["val"]["time"]
|
last_index = index
|
if last_time == total_datas[index]["val"]["time"]:
|
last_count += total_datas[index]["re"]
|
last_index = index
|
else:
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
last_count = total_datas[index]["re"]
|
last_time = total_datas[index]["val"]["time"]
|
last_index = index
|
else:
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
last_count = 0
|
last_time = None
|
last_index = None
|
if last_count is not None and last_count > 0:
|
time_count_dict[last_time] = last_count
|
# 保存latest
|
cls.__save_last_record(code, last_time, last_count, last_index)
|
else:
|
# 移除
|
cls.__del_last_record(code)
|
|
# 查找这批数据中的最大数量
|
max_time = None
|
max_num = None
|
for key in time_count_dict:
|
if max_time is None:
|
max_time = key
|
max_num = time_count_dict[key]
|
if time_count_dict[key] > max_num:
|
max_num = time_count_dict[key]
|
max_time = key
|
if max_num is not None:
|
old_max = cls.__get_max(code)
|
if old_max is None or max_num > old_max:
|
cls.__save_max(code, max_num)
|
|
@classmethod
|
def get_continue_count(cls, code):
|
count = cls.__get_max(code)
|
if count is None:
|
count = 0
|
count = count // 3
|
if count < 15:
|
count = 15
|
return count
|
|
|
# 大单处理器
|
class L2BigNumProcessor:
|
# 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
|
@classmethod
|
def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index):
|
if max_num_info is None:
|
return False, None
|
# 如果是买入单,需要看他前面同一秒是否有撤单
|
if int(max_num_info["val"]["operateType"]) == 0:
|
# 只有买撤信号在买入信号之前的同一秒的单才会撤单情况
|
_map = local_today_num_operate_map.get(code)
|
if _map is not None:
|
cancel_datas = _map.get(
|
"{}-{}-{}".format(max_num_info["val"]["num"], "1", max_num_info["val"]["price"]))
|
|
if cancel_datas is not None:
|
for cancel_data in cancel_datas:
|
# 只能在当前规定的数据范围查找,以防出现重复查找
|
if cancel_data["index"] < start_index or cancel_data["index"] > end_index:
|
continue
|
if cancel_data["index"] > max_num_info["index"]:
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
|
local_today_num_operate_map[
|
code])
|
if buy_index is None:
|
continue
|
if buy_data["val"]["time"] != max_num_info["val"]["time"]:
|
continue
|
|
min_space, max_space = l2_data_util.compute_time_space_as_second(
|
cancel_data["val"]["cancelTime"],
|
cancel_data["val"][
|
"cancelTimeUnit"])
|
if min_space < 60:
|
L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间小于60s,撤单数据-{}",
|
json.dumps(cancel_data))
|
return True, cancel_data
|
else:
|
# 如果间隔时间大于等于60s,这判断小群撤事件
|
L2TradeDataProcessor.cancel_debug(code, "找到大单撤单,但撤单间隔时间大于60s,撤单数据-{}",
|
json.dumps(cancel_data))
|
return False, cancel_data
|
return False, None
|
else:
|
return True, None
|
|
# 计算数量最大的涨停买/涨停撤
|
@classmethod
|
def __compute_max_num(cls, code, start_index, end_index, max_num_info, buy_exec_time):
|
new_max_info = max_num_info
|
max_num = 0
|
if max_num_info is not None:
|
max_num = int(max_num_info["val"]["num"])
|
# 计算大单
|
total_data = local_today_datas[code]
|
for i in range(start_index, end_index + 1):
|
data = total_data[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val) and not L2DataUtil.is_limit_up_price_buy_cancel(
|
val):
|
continue
|
|
# 下单时间与买入执行时间之差大于60s的不做处理
|
if l2_data_util.get_time_as_seconds(val["time"]) - l2_data_util.get_time_as_seconds(buy_exec_time) > 1:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(val):
|
pass
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
|
val["cancelTimeUnit"])
|
# 只能处理1s内的撤单
|
if min_space > 1:
|
continue
|
|
# buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
# local_today_num_operate_map.get(code))
|
# if buy_index is None:
|
# continue
|
# if l2_data_util.get_time_as_seconds(buy_data["val"]["time"]) - l2_data_util.get_time_as_seconds(
|
# buy_exec_time) > 1:
|
# continue
|
|
num = int(total_data[i]["val"]["num"])
|
if num > max_num:
|
max_num = num
|
new_max_info = data
|
return new_max_info
|
|
@classmethod
|
def __save_big_num_pos(cls, code, index):
|
redis = _redisManager.getRedis()
|
redis.setex("big_num_pos-{}".format(code), tool.get_expire(), index)
|
|
@classmethod
|
def __get_big_num_pos(cls, code):
|
redis = _redisManager.getRedis()
|
index = redis.get("big_num_pos-{}".format(code))
|
if index is not None:
|
return int(index)
|
return index
|
|
@classmethod
|
def del_big_num_pos(cls, code):
|
redis = _redisManager.getRedis()
|
redis.delete("big_num_pos-{}".format(code))
|
|
@classmethod
|
def __cancel_buy(cls, code, index):
|
L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
|
L2TradeDataProcessor.cancel_buy(code)
|
|
# 处理数据中的大单,返回是否已经撤单和撤单数据的时间
|
@classmethod
|
def process_cancel_with_big_num(cls, code, start_index, end_index):
|
total_data = local_today_datas[code]
|
# 如果无下单信号就无需处理
|
buy_single_index, buy_exec_index, compute_index, nums, max_num_index = TradePointManager.get_buy_compute_start_data(
|
code)
|
if buy_single_index is None or buy_exec_index is None or buy_exec_index < 0:
|
return False, None
|
# 判断是否有大单记录
|
index = cls.__get_big_num_pos(code)
|
# 无大单记录
|
if index is None:
|
# 计算大单
|
new_max_info = cls.__compute_max_num(code, start_index, end_index, None,
|
total_data[buy_exec_index]["val"]["time"])
|
if new_max_info is None:
|
return False, None
|
L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
|
index = new_max_info["index"]
|
# 大单是否有撤单信号
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index)
|
if need_cancel:
|
# 需要撤单
|
# 撤单
|
L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
|
cls.__cancel_buy(code, new_max_info["index"])
|
return True, cancel_data,
|
|
else:
|
# 无需撤单
|
# 保存大单记录
|
cls.__save_big_num_pos(code, index)
|
return False, None
|
else:
|
# 有大单记录
|
need_cancel = False
|
cancel_index = -1
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index)
|
# 需要撤单
|
if need_cancel:
|
# 撤单
|
cls.__cancel_buy(code, cancel_index)
|
return True, cancel_data
|
# 无需撤单
|
else:
|
# 计算新的大单
|
max_num_data = cls.__compute_max_num(code, start_index, end_index, total_data[index],
|
total_data[buy_exec_index]["val"]["time"])
|
if index == int(max_num_data["index"]):
|
return False, cancel_data
|
L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
|
|
# 大单是否有撤单信号
|
need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"],
|
end_index)
|
if need_cancel:
|
# 需要撤单
|
# 撤单
|
cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
|
L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
|
max_num_data["index"])
|
return True, cancel_data
|
else:
|
# 无需撤单
|
# 保存大单记录
|
cls.__save_big_num_pos(code, max_num_data["index"])
|
return False, cancel_data
|
|
@classmethod
|
def test(cls):
|
code = "000036"
|
load_l2_data(code, True)
|
new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59")
|
print(new_max_info)
|
|
|
# 大群撤大单跟踪
|
class L2BetchCancelBigNumProcessor:
|
@classmethod
|
def __get_recod(cls, code):
|
redis = _redisManager.getRedis()
|
_val = redis.get("betch_cancel_big_num-{}".format(code))
|
if _val is None:
|
return None, None
|
else:
|
datas = json.loads(_val)
|
return datas[0], datas[1]
|
|
@classmethod
|
def del_recod(cls, code):
|
redis = _redisManager.getRedis()
|
key = "betch_cancel_big_num-{}".format(code)
|
redis.delete(key)
|
|
@classmethod
|
def __save_recod(cls, code, max_big_num_info, big_nums_info):
|
redis = _redisManager.getRedis()
|
key = "betch_cancel_big_num-{}".format(code)
|
redis.setex(key, tool.get_expire(), json.dumps((max_big_num_info, big_nums_info)))
|
|
# 暂时弃用
|
@classmethod
|
def need_cancel(cls, code, start_index, end_index):
|
# 是否需要撤单
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
if big_nums_info is None:
|
# 无大单信息
|
return True
|
nums_set = set()
|
index_set = set()
|
|
for d in big_nums_info:
|
nums_set.add(d[0])
|
index_set.add(d[1])
|
|
total_datas = local_today_datas[code]
|
|
count = 0
|
latest_buy_index = end_index
|
for index in range(start_index, end_index + 1):
|
if not nums_set.__contains__(total_datas[index]["val"]["num"]):
|
continue
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[index],
|
local_today_num_operate_map[code])
|
if buy_index is None:
|
continue
|
if index_set.__contains__(buy_index):
|
count += buy_data["re"]
|
latest_buy_index = buy_index
|
|
# 获取大单数量
|
total_count = 0
|
for i in index_set:
|
if i <= latest_buy_index:
|
total_count += total_datas[i]["re"]
|
L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
|
# 大单小于5笔无脑撤,后修改为无大单无脑撤
|
if total_count <= 0:
|
return True
|
|
# 大单撤单笔数大于总大单笔数的1/5就撤单
|
if count / total_count >= 0.2:
|
return True
|
else:
|
return False
|
|
pass
|
|
# def need_cancel(cls, code, start_index, end_index):
|
# total_datas = local_today_datas[code]
|
# for index in range(start_index,end_index+1):
|
# price = total_datas[index]["val"]["price"]
|
# num = total_datas[index]["val"]["num"]
|
# if total_datas[index]
|
|
# 过时
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
# 寻找最大值
|
for index in range(start_index, end_index + 1):
|
# 只处理涨停买与涨停买撤
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
if max_big_num_info is None:
|
max_big_num_info = (
|
int(total_datas[start_index]["val"]["num"]), total_datas[start_index]["index"])
|
|
if int(total_datas[index]["val"]["num"]) > max_big_num_info[0]:
|
max_big_num_info = (
|
int(total_datas[index]["val"]["num"]), total_datas[index]["index"])
|
# 将大于最大值90%的数据加入
|
if max_big_num_info is not None:
|
min_num = round(max_big_num_info[0] * 0.9)
|
|
for index in range(start_index, end_index + 1):
|
# 只统计涨停买
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
|
if int(total_datas[index]["val"]["num"]) >= min_num:
|
if big_nums_info is None:
|
big_nums_info = []
|
big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
|
# 移除小于90%的数据
|
big_nums_info_new = []
|
index_set = set()
|
for d in big_nums_info:
|
if d[0] >= min_num:
|
if not index_set.__contains__(d[1]):
|
index_set.add(d[1])
|
big_nums_info_new.append(d)
|
cls.__save_recod(code, max_big_num_info, big_nums_info_new)
|
|
# 最新方法
|
@classmethod
|
def process_new(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
max_big_num_info, big_nums_info = cls.__get_recod(code)
|
# 大于等于8000手或者金额>=300万就是大单
|
|
for index in range(start_index, end_index + 1):
|
# 只统计涨停买
|
if not L2DataUtil.is_limit_up_price_buy(
|
total_datas[index]["val"]):
|
continue
|
# 大于等于8000手或者金额 >= 300
|
# 万就是大单
|
if int(total_datas[index]["val"]["num"]) >= 8000 or int(total_datas[index]["val"]["num"]) * float(
|
total_datas[index]["val"]["price"]) >= 30000:
|
if big_nums_info is None:
|
big_nums_info = []
|
big_nums_info.append((int(total_datas[index]["val"]["num"]), total_datas[index]["index"]))
|
# 移除小于90%的数据
|
big_nums_info_new = []
|
index_set = set()
|
if big_nums_info is not None:
|
for d in big_nums_info:
|
if not index_set.__contains__(d[1]):
|
index_set.add(d[1])
|
big_nums_info_new.append(d)
|
cls.__save_recod(code, max_big_num_info, big_nums_info_new)
|
|
|
# 卖跟踪
|
class L2SellProcessor:
|
@classmethod
|
def __get_recod(cls, code):
|
redis = _redisManager.getRedis()
|
_val = redis.get("sell_num-{}".format(code))
|
if _val is None:
|
return None, None
|
else:
|
datas = json.loads(_val)
|
return datas[0], datas[1]
|
|
@classmethod
|
def del_recod(cls, code):
|
redis = _redisManager.getRedis()
|
key = "sell_num-{}".format(code)
|
redis.delete(key)
|
|
@classmethod
|
def __save_recod(cls, code, process_index, count):
|
redis = _redisManager.getRedis()
|
key = "sell_num-{}".format(code)
|
redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
|
|
# 暂时弃用
|
@classmethod
|
def need_cancel(cls, code, start_index, end_index):
|
# 是否需要撤单
|
process_index, count = cls.__get_recod(code)
|
if process_index is None:
|
# 无卖的信息
|
return False
|
if count is None:
|
count = 0
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
return False
|
if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
|
global_util.zyltgb_map[code]):
|
return True
|
return False
|
|
@classmethod
|
def process(cls, code, start_index, end_index):
|
# 处理大单
|
# 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
|
total_datas = local_today_datas[code]
|
process_index, count = cls.__get_recod(code)
|
# 寻找最大值
|
for index in range(start_index, end_index + 1):
|
# 只处理涨停卖
|
if not L2DataUtil.is_limit_up_price_sell(
|
total_datas[index]["val"]):
|
continue
|
# 不处理历史数据
|
if process_index is not None and process_index >= index:
|
continue
|
if count is None:
|
count = 0
|
count += int(total_datas[index]["val"]["num"])
|
if process_index is None:
|
process_index = end_index
|
cls.__save_recod(code, process_index, count)
|
|
|
def __get_time_second(time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
|
|
# 是否是涨停价买
|
def __is_limit_up_price_buy(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
def __is_limit_up_price_buy_cancel(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
# 获取涨停买入起始点
|
def __get_limit_up_buy_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
__time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
|
# 有连续4个涨停买就标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
# index_3 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_2 = j
|
break
|
# if index_2 > 0:
|
# for j in range(index_2 + 1, __len):
|
# # 涨停买
|
# if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
|
# index_3 = j
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1
|
logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
|
return i
|
# 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
# 涨停买
|
if __time is None:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _time == _val["time"]:
|
_limit_up_count_1s += 1
|
else:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _val["operateType"] == 1:
|
# 买撤
|
_time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
|
logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
|
return _limit_up_count_1s_start_index
|
|
return None
|
|
|
# 获取涨停撤销起始点
|
def __get_limit_up_buy_cancel_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
# 有连续3个买撤
|
if __is_limit_up_price_buy_cancel(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_2 = j
|
break
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1:
|
# logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
|
return i
|
return None
|
|
|
# 是否有禁止交易特征
|
def __is_have_forbidden_feature(code, data_count, __continue_count):
|
# 09:30:00出现连续撤销的数量大于一定的值
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
if __len > data_count:
|
start_index = __len - data_count
|
cancel_start = -1
|
cancel_count = 0
|
for i in range(start_index, __len):
|
_val = datas[i]["val"]
|
if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00":
|
# 09:30第一条数据
|
if _val["operateType"] == 1:
|
cancel_start = i
|
else:
|
return False
|
elif cancel_start > -1:
|
# 连续撤销
|
if _val["operateType"] == 1 and i - cancel_start == 1:
|
cancel_start = i
|
cancel_count += 1
|
if cancel_count >= __continue_count:
|
return True
|
else:
|
return False
|
return False
|
|
|
# 设置最新的l2数据采集的数量
|
def __set_l2_data_latest_count(code, count):
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
redis.setex(key, 2, count)
|
pass
|
|
|
# 获取代码最近的l2数据数量
|
def get_l2_data_latest_count(code):
|
if code is None or len(code) < 1:
|
return 0
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
|
result = redis.get(key)
|
if result is None:
|
return 0
|
else:
|
return int(result)
|
|
|
# 初始化l2固定代码库
|
def init_l2_fixed_codes():
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
count = redis.scard(key)
|
if count > 0:
|
redis.delete(key)
|
redis.sadd(key, "000000")
|
redis.expire(key, tool.get_expire())
|
|
|
# 移除l2固定监控代码
|
def remove_from_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.srem(key, code)
|
|
|
# 添加代码到L2固定监控
|
def add_to_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.sadd(key, code)
|
redis.expire(key, tool.get_expire())
|
|
|
# 是否在l2固定监控代码中
|
def is_in_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
return redis.sismember(key, code)
|
|
|
if __name__ == "__main__":
|
clear_l2_data("603912")
|