"""
|
L2相关数据处理
|
"""
|
|
# L2交易队列
|
import datetime
|
import decimal
|
import json
|
import logging
|
import time
|
|
import numpy
|
|
import constant
|
import gpcode_manager
|
import l2_data_util
|
from l2 import l2_data_log, l2_data_source_util
|
import log
|
from db import redis_manager
|
import tool
|
|
_redisManager = redis_manager.RedisManager(1)
|
# l2数据管理
|
# 本地最新一次上传的数据
|
local_latest_datas = {}
|
# 本地今日数据
|
local_today_datas = {}
|
# 本地手数+操作那类型组成的临时变量
|
# 用于加快数据处理,用空换时间
|
local_today_num_operate_map = {}
|
|
# 买入订单号映射,只有原生的L2数据才有
|
local_today_buyno_map = {}
|
|
|
def load_l2_data(code, force=False):
|
redis = _redisManager.getRedis()
|
# 加载最近的l2数据
|
if local_latest_datas.get(code) is None or force:
|
# 获取最近的数据
|
_data = redis.get("l2-data-latest-{}".format(code))
|
if _data is not None:
|
if code in local_latest_datas:
|
local_latest_datas[code] = json.loads(_data)
|
else:
|
local_latest_datas.setdefault(code, json.loads(_data))
|
# 获取今日的数据
|
|
if local_today_datas.get(code) is None or force:
|
datas = log.load_l2_from_log()
|
datas = datas.get(code)
|
if datas is None:
|
datas = []
|
local_today_datas[code] = datas
|
data_normal = True
|
if datas and len(datas) < datas[-1]["index"] + 1:
|
data_normal = False
|
|
# 从数据库加载
|
# datas = []
|
# keys = redis.keys("l2-{}-*".format(code))
|
# for k in keys:
|
# value = redis.get(k)
|
# _data = l2_data_util.l2_data_key_2_obj(k, value)
|
# datas.append(_data)
|
# # 排序
|
# new_datas = sorted(datas,
|
# key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
|
# local_today_datas[code] = new_datas
|
# 根据今日数据加载
|
load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
|
load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
|
return data_normal
|
return True
|
|
|
# 将数据根据num-operate分类
|
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
|
if local_today_num_operate_map.get(code) is None:
|
local_today_num_operate_map[code] = {}
|
if clear:
|
local_today_num_operate_map[code] = {}
|
|
for data in source_datas:
|
key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
|
if local_today_num_operate_map[code].get(key) is None:
|
local_today_num_operate_map[code].setdefault(key, [])
|
local_today_num_operate_map[code].get(key).append(data)
|
|
|
# 将数据根据orderNo分类,原生数据才有
|
def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False):
|
# 只有原生L2数据才会有此操作
|
if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
|
return
|
if local_today_buyno_map.get(code) is None:
|
local_today_buyno_map[code] = {}
|
if clear:
|
local_today_buyno_map[code] = {}
|
|
for data in source_datas:
|
if data["val"]["operateType"] != 0:
|
continue
|
# 只填充买入数据
|
key = "{}".format(data["val"]["orderNo"])
|
if local_today_buyno_map[code].get(key) is None:
|
local_today_buyno_map[code].setdefault(key, [])
|
local_today_buyno_map[code].get(key).append(data)
|
|
|
@tool.async_call
|
def saveL2Data(code, datas, msg=""):
|
start_time = round(time.time() * 1000)
|
# 查询票是否在待监听的票里面
|
if not gpcode_manager.is_in_gp_pool(code):
|
return None
|
# 验证股价的正确性
|
redis_instance = _redisManager.getRedis()
|
|
try:
|
if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
|
|
# 计算保留的时间
|
expire = tool.get_expire()
|
i = 0
|
for _data in datas:
|
i += 1
|
key = "l2-" + _data["key"]
|
value = redis_instance.get(key)
|
if value is None:
|
# 新增
|
try:
|
value = {"index": _data["index"], "re": _data["re"]}
|
redis_instance.setex(key, expire, json.dumps(value))
|
except:
|
logging.error("更正L2数据出错:{} key:{}".format(code, key))
|
else:
|
json_value = json.loads(value)
|
if json_value["re"] != _data["re"]:
|
json_value["re"] = _data["re"]
|
redis_instance.setex(key, expire, json.dumps(json_value))
|
finally:
|
redis_instance.delete("l2-save-{}".format(code))
|
|
print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time))
|
return datas
|
|
|
# 保存l2数据
|
def save_l2_data(code, datas, add_datas):
|
redis = _redisManager.getRedis()
|
# 只有有新曾数据才需要保存
|
if len(add_datas) > 0:
|
# 保存最近的数据
|
__start_time = round(time.time() * 1000)
|
redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
|
l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
|
# 设置进内存
|
local_latest_datas[code] = datas
|
set_l2_data_latest_count(code, len(datas))
|
try:
|
log.logger_l2_data.info("{}-{}", code, add_datas)
|
except Exception as e:
|
logging.exception(e)
|
# 暂时不将数据保存到redis
|
# saveL2Data(code, add_datas)
|
|
|
# 设置最新的l2数据采集的数量
|
def set_l2_data_latest_count(code, count):
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
redis.setex(key, 2, count)
|
pass
|
|
|
# 获取代码最近的l2数据数量
|
def get_l2_data_latest_count(code):
|
if code is None or len(code) < 1:
|
return 0
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
|
result = redis.get(key)
|
if result is None:
|
return 0
|
else:
|
return int(result)
|
|
|
def parseL2Data(str):
|
day = datetime.datetime.now().strftime("%Y%m%d")
|
dict = json.loads(str)
|
data = dict["data"]
|
client = dict["client"]
|
code = data["code"]
|
channel = data["channel"]
|
capture_time = data["captureTime"]
|
process_time = data["processTime"]
|
count = data["count"]
|
data = data["data"]
|
# 获取涨停价
|
return day, client, channel, code, capture_time, process_time, data, count
|
|
|
# 元数据是否有差异
|
def is_origin_data_diffrent(data1, data2):
|
if data1 is None or data2 is None:
|
return True
|
if len(data1) != len(data2):
|
return True
|
# 比较
|
data_length = len(data1)
|
step = len(data1) // 10
|
for i in range(0, data_length, step):
|
if json.dumps(data1[i]) != json.dumps(data2[i]):
|
return True
|
return False
|
|
|
# 是否为大单
|
def is_big_money(val):
|
price = float(val["price"])
|
money = price * int(val["num"])
|
if price > 3.0:
|
if money >= 30000:
|
return True
|
else:
|
return False
|
else:
|
max_money = price * 10000
|
if money >= max_money * 0.95:
|
return True
|
else:
|
return False
|
|
|
class L2DataUtil:
|
@classmethod
|
def is_same_time(cls, time1, time2):
|
if constant.TEST:
|
return True
|
time1_s = time1.split(":")
|
time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
|
time2_s = time2.split(":")
|
time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
|
if abs(time2_second - time1_second) < 3:
|
return True
|
else:
|
return False
|
|
# 获取增量数据
|
@classmethod
|
def get_add_data(cls, code, latest_datas, datas, _start_index):
|
if datas is not None and len(datas) < 1:
|
return []
|
last_data = None
|
latest_datas_ = latest_datas
|
if latest_datas_ is not None and len(latest_datas_) > 0:
|
last_data = latest_datas_[-1]
|
|
count = 0
|
start_index = -1
|
# 如果原来没有数据
|
# 设置add_data的序号
|
for n in reversed(datas):
|
count += 1
|
if n["key"] == (last_data["key"] if last_data is not None else ""):
|
start_index = len(datas) - count
|
break
|
|
_add_datas = []
|
if last_data is not None:
|
if start_index < 0:
|
if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
|
last_data["val"]["time"]):
|
_add_datas = datas
|
else:
|
_add_datas = []
|
elif start_index + 1 >= len(datas):
|
_add_datas = []
|
else:
|
_add_datas = datas[start_index + 1:]
|
else:
|
_add_datas = datas[start_index + 1:]
|
for i in range(0, len(_add_datas)):
|
_add_datas[i]["index"] = _start_index + i
|
|
return _add_datas
|
|
# 纠正数据,将re字段替换为较大值
|
@classmethod
|
def correct_data(cls, code, latest_datas, _datas):
|
latest_data = latest_datas
|
if latest_data is None:
|
latest_data = []
|
save_list = []
|
for data in _datas:
|
for _ldata in latest_data:
|
# 新数据条数比旧数据多才保存
|
if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
|
max_re = max(_ldata["re"], data["re"])
|
_ldata["re"] = max_re
|
data["re"] = max_re
|
# 保存到数据库,更新re的数据
|
save_list.append(_ldata)
|
if len(save_list) > 0:
|
# 暂时不将数据保存到redis
|
# saveL2Data(code, save_list, "保存纠正数据")
|
local_latest_datas[code] = latest_data
|
return _datas
|
|
# 处理l2数据
|
@classmethod
|
def format_l2_data(cls, data, code, limit_up_price):
|
datas = []
|
dataIndexs = {}
|
same_time_num = {}
|
for item in data:
|
# 解析数据
|
time = item["time"]
|
if time in same_time_num:
|
same_time_num[time] = same_time_num[time] + 1
|
else:
|
same_time_num[time] = 1
|
|
price = float(item["price"])
|
num = item["num"]
|
limitPrice = item["limitPrice"]
|
# 涨停价
|
if limit_up_price is not None:
|
if limit_up_price == tool.to_price(decimal.Decimal(price)):
|
limitPrice = 1
|
else:
|
limitPrice = 0
|
item["limitPrice"] = "{}".format(limitPrice)
|
operateType = item["operateType"]
|
# 不需要非涨停买与买撤
|
if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
|
continue
|
|
cancelTime = item["cancelTime"]
|
cancelTimeUnit = item["cancelTimeUnit"]
|
key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
|
cancelTimeUnit)
|
if key in dataIndexs:
|
# 数据重复次数+1
|
datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
|
else:
|
# 数据重复次数默认为1
|
datas.append({"key": key, "val": item, "re": 1})
|
dataIndexs.setdefault(key, len(datas) - 1)
|
# TODO 测试的时候开启,方便记录大单数据
|
# l2_data_util.save_big_data(code, same_time_num, data)
|
return datas
|
|
@classmethod
|
def get_time_as_second(cls, time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# @classmethod
|
# def get_time_as_str(cls, time_seconds):
|
# ts = time_str.split(":")
|
# return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# 是否是涨停价买
|
@classmethod
|
def is_limit_up_price_buy(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
# if price * num * 100 < 50 * 10000:
|
# return False
|
return True
|
|
# 是否为涨停卖
|
@classmethod
|
def is_limit_up_price_sell(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 2:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
# if price * num * 100 < 50 * 10000:
|
# return False
|
return True
|
|
# 是否涨停买撤
|
@classmethod
|
def is_limit_up_price_buy_cancel(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
# if price * num * 100 < 50 * 10000:
|
# return False
|
return True
|
|
# 是否卖撤
|
@classmethod
|
def is_sell_cancel(cls, val):
|
if int(val["operateType"]) == 3:
|
return True
|
return False
|
|
# 是否为卖
|
@classmethod
|
def is_sell(cls, val):
|
if int(val["operateType"]) == 2:
|
return True
|
return False
|
|
|
class L2TradeQueueUtils(object):
|
# 买入数据是否已撤
|
@classmethod
|
def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
|
val = data["val"]
|
cancel_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(val["num"], "1", val["price"]))
|
# 是否有买撤数据
|
if cancel_datas:
|
for cancel_data in cancel_datas:
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
|
local_today_num_operate_map)
|
if buy_index == data["index"]:
|
return True
|
return False
|
|
# 获取成交进度索引
|
@classmethod
|
def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
|
last_index,
|
latest_not_limit_up_time=None):
|
|
def find_traded_progress_index_simple(queues):
|
index_set = set()
|
for num in queues:
|
buy_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(num, "0", buy_1_price_format))
|
if buy_datas is not None and len(buy_datas) > 0:
|
for data in buy_datas:
|
# 在最近一次非涨停买1更新的时间之后才有效
|
if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
|
latest_not_limit_up_time) >= 0:
|
if data["index"] >= last_index:
|
index_set.add(data["index"])
|
index_list = list(index_set)
|
index_list.sort()
|
num_list = []
|
new_index_list = []
|
for index in index_list:
|
for i in range(0, total_datas[index]["re"]):
|
num_list.append(total_datas[index]["val"]["num"])
|
new_index_list.append(index)
|
index_list_str = ",".join(list(map(str, num_list)))
|
queue_list_str = ",".join(list(map(str, queues)))
|
find_index = index_list_str.find(queue_list_str)
|
if find_index >= 0:
|
temp_str = index_list_str[0:find_index]
|
if temp_str.endswith(","):
|
temp_str = temp_str[:-1]
|
if temp_str == "":
|
return new_index_list[0], new_index_list[0:len(queues)]
|
start_index = len(temp_str.split(","))
|
return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
|
return None, None
|
|
# 3个数据以上的不需要判断最近的一次未涨停时间
|
if len(queueList) >= 3:
|
latest_not_limit_up_time = None
|
|
# 判断匹配的位置是否可信
|
def is_trust(indexes):
|
cha = []
|
for i in range(1, len(indexes)):
|
cha.append(indexes[i] - indexes[i - 1] - 1)
|
if len(cha) <= 1:
|
return True
|
# 标准差小于1
|
std_result = numpy.std(cha)
|
if std_result < 10:
|
# 绝对可信
|
return True
|
|
for i in range(0, len(cha)):
|
if abs(cha[i]) > 10:
|
# 有超过10 的需要判断两个相临数据间的未撤的买入数量
|
buy_count = 0
|
for index in range(indexes[i] + 1, indexes[i + 1] - 1):
|
if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
|
if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
|
buy_count += total_datas[index]["re"]
|
# 暂定3个误差范围
|
if buy_count >= 3:
|
return False
|
return True
|
|
if len(queueList) == 0:
|
return None
|
# last_index不能撤,如果已撤就清零
|
if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
|
last_index = 0
|
# 补齐整数位5位
|
buy_1_price_format = f"{buy_1_price}"
|
while buy_1_price_format.find(".") < 4:
|
buy_1_price_format = "0" + buy_1_price_format
|
|
# --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)---------
|
max_win_len = len(queueList)
|
min_win_len = len(queueList) // 2
|
if max_win_len == min_win_len:
|
min_win_len = max_win_len - 1
|
for win_len in range(max_win_len, min_win_len, -1):
|
# 窗口移动
|
for i in range(0, max_win_len - win_len + 1):
|
queues = queueList[i:i + win_len]
|
f_start_index, f_indexs = find_traded_progress_index_simple(queues)
|
if f_start_index and is_trust(f_indexs):
|
return f_start_index
|
|
raise Exception("尚未找到成交进度")
|
|
|
if __name__ == "__main__":
|
print(load_l2_data("002235"))
|