# l2数据工具
|
"""
|
L2数据处理工具包
|
"""
|
|
# 比较时间的大小
|
import datetime
|
import json
|
import time
|
from tool import async_call
|
|
import l2_data_manager
|
import tool
|
|
|
def run_time():
|
def decorator(func):
|
def infunc(*args, **kwargs):
|
start = round(time.time() * 1000)
|
result = func(args, **kwargs)
|
print("执行时间", round(time.time() * 1000) - start)
|
return result
|
|
return infunc
|
|
return decorator
|
|
|
def compare_time(time1, time2):
|
result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
|
return result
|
|
|
# 将key转为l2数据对象
|
def l2_data_key_2_obj(k, value):
|
key = k.replace("l2-", "")
|
split_data = key.split("-")
|
code = split_data[0]
|
operateType = split_data[1]
|
time = split_data[2]
|
num = split_data[3]
|
price = split_data[4]
|
limitPrice = split_data[5]
|
cancelTime = split_data[6]
|
cancelTimeUnit = split_data[7]
|
item = {"operateType": operateType, "time": time, "num": num, "price": price, "limitPrice": limitPrice,
|
"cancelTime": cancelTime, "cancelTimeUnit": cancelTimeUnit}
|
json_value = json.loads(value)
|
_data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
|
return _data
|
|
|
# 将数据根据num-operate分类
|
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
|
if local_today_num_operate_map.get(code) is None:
|
local_today_num_operate_map[code] = {}
|
if clear:
|
local_today_num_operate_map[code] = {}
|
|
for data in source_datas:
|
key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
|
if local_today_num_operate_map[code].get(key) is None:
|
local_today_num_operate_map[code].setdefault(key, [])
|
local_today_num_operate_map[code].get(key).append(data)
|
|
|
# 减去时间
|
def __sub_time(time_str, seconds):
|
time_seconds = get_time_as_seconds(time_str) - seconds
|
h = time_seconds // 3600
|
m = time_seconds % 3600 // 60
|
s = time_seconds % 60
|
return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s)
|
|
|
def get_time_as_seconds(time_str):
|
times = time_str.split(":")
|
time_seconds = int(times[0]) * 3600 + int(times[1]) * 60 + int(times[2])
|
return time_seconds
|
|
|
# 计算时间的区间
|
def compute_time_space_as_second(cancel_time, cancel_time_unit):
|
__time = int(cancel_time)
|
if int(cancel_time) == 0:
|
return 0, 0
|
unit = int(cancel_time_unit)
|
if unit == 0:
|
# 秒
|
return __time, (__time + 1)
|
elif unit == 1:
|
# 分钟
|
return __time * 60, (__time + 1) * 60
|
elif unit == 2:
|
# 小时
|
return __time * 3600, (__time + 1) * 3600
|
|
|
# 获取买入时间范围
|
def get_buy_time_range(cancel_data):
|
# 计算时间区间
|
min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
|
cancel_data["val"]["cancelTimeUnit"])
|
max_time = __sub_time(cancel_data["val"]["time"], min_space)
|
min_time = __sub_time(cancel_data["val"]["time"], max_space)
|
return min_time, max_time
|
|
|
# 根据买撤数据(与今日总的数据)计算买入数据
|
def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map):
|
min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
|
cancel_data["val"]["cancelTimeUnit"])
|
max_time = __sub_time(cancel_data["val"]["time"], min_space)
|
min_time = __sub_time(cancel_data["val"]["time"], max_space)
|
buy_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
|
if buy_datas is None:
|
# 无数据
|
return None, None
|
for i in range(0, len(buy_datas)):
|
data = buy_datas[i]
|
if int(data["val"]["operateType"]) != 0:
|
continue
|
if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
|
continue
|
if min_space == 0 and max_space == 0:
|
if compare_time(data["val"]["time"], min_time) == 0:
|
return data["index"], data
|
|
elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
|
return data["index"], data
|
return None, None
|
|
|
# 判断卖撤的卖信号是否在目标信号之前
|
def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map):
|
min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"],
|
sell_cancel_data["val"]["cancelTimeUnit"])
|
max_time = __sub_time(sell_cancel_data["val"]["time"], min_space)
|
min_time = __sub_time(sell_cancel_data["val"]["time"], max_space)
|
# 如果最大值都在目标信号之前则信号肯定在目标信号之前
|
if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")):
|
return True
|
sell_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"]))
|
if sell_datas:
|
for i in range(0, len(sell_datas)):
|
data = sell_datas[i]
|
if int(data["val"]["operateType"]) != 2:
|
continue
|
if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
|
continue
|
if min_space == 0 and max_space == 0:
|
# 本秒内
|
if compare_time(data["val"]["time"], min_time) == 0:
|
return data["index"] < target_data["index"]
|
# 数据在正确的区间
|
elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
|
return data["index"] < target_data["index"]
|
return False
|
|
|
__last_big_data = {}
|
|
|
@async_call
|
def save_big_data(code, same_time_nums, datas):
|
latest_datas = __last_big_data.get(code)
|
d1 = json.dumps(datas)
|
d2 = json.dumps(latest_datas)
|
if latest_datas is not None and d1.strip() == d2.strip():
|
return None
|
__last_big_data[code] = datas
|
# 获取不一样的快照
|
if latest_datas is not None:
|
for i in range(len(d1)):
|
if d1[i] != d2[i]:
|
# 保存快照
|
# logger_l2_big_data.debug("code:{} d1:{} d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
|
break
|
time_str = datetime.datetime.now().strftime("%H:%M:%S")
|
|
for time_ in same_time_nums:
|
# 只保留最近3s内的大数据
|
if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3:
|
continue
|
if same_time_nums[time_] > 20:
|
redis = l2_data_manager._redisManager.getRedis()
|
redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1)
|
break
|
|
|
# 保存l2最新数据的大小
|
@async_call
|
def save_l2_latest_data_number(code, num):
|
redis = l2_data_manager._redisManager.getRedis()
|
redis.setex("l2_latest_data_num-{}".format(code), 3, num)
|
|
|
# 获取最新数据条数
|
def get_l2_latest_data_number(code):
|
redis = l2_data_manager._redisManager.getRedis()
|
num = redis.get("l2_latest_data_num-{}".format(code))
|
if num is not None:
|
return int(num)
|
return None
|
|
|
# l2数据拼接工具 TODO 暂时还未启用
|
class L2DataConcatUtil:
|
|
# 初始化
|
def __init__(self, code, last_datas, datas):
|
self.last_datas = last_datas
|
self.datas = datas
|
self.code = code
|
|
def __get_data_identity(self, data_):
|
data = data_["val"]
|
return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"),
|
data.get("cancelTime"), data.get("cancelTimeUnit"))
|
|
# 获取拼接的特征,获取最后3笔
|
def __get_concat_feature(self):
|
# 最少需要3条数据+2条需要有特征点的数据
|
min_identity = 2
|
min_count = 3
|
|
identity_set = set()
|
count = 0
|
start_index = -1
|
for i in range(len(self.last_datas) - 1, -1, -1):
|
identity_set.add(self.__get_data_identity(self.last_datas[i]))
|
count += 1
|
start_index = i
|
if count >= min_count and len(identity_set) >= min_identity:
|
break
|
return start_index, len(self.last_datas) - 1
|
|
# 获取新增数据
|
def get_add_datas(self):
|
# 查询当前数据是否在最近一次数据之后
|
if self.last_datas and self.datas:
|
if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(
|
self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
|
return []
|
|
# 获取拼接点
|
start_index, end_index = self.__get_concat_feature()
|
if start_index < 0:
|
return self.datas
|
print("特征位置:", start_index, end_index)
|
# 提取特征点的标识数据
|
identity_list = []
|
for i in range(start_index, end_index + 1):
|
identity_list.append(self.__get_data_identity(self.last_datas[i]))
|
|
# 查找完整的特征
|
identity_count = len(identity_list)
|
for n in range(0, identity_count):
|
# 每次遍历减少最前面一个特征量
|
for i in range(0, len(self.datas) - len(identity_list) + n):
|
if self.__get_data_identity(self.datas[i]) == identity_list[n]:
|
# n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配
|
if n == 0 or i == 0:
|
find_identity = True
|
for j in range(n + 1, len(identity_list)):
|
if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]):
|
find_identity = False
|
break
|
|
if find_identity:
|
return self.datas[i + len(identity_list) - n:]
|
else:
|
continue
|
print("新数据中未找到特征标识")
|
return self.datas
|
|
|
def test_add_datas():
|
def load_data(datas):
|
data_list = []
|
for data in datas:
|
data_list.append({"val": {"time": data}})
|
return data_list
|
|
# 不匹配
|
latest_datas = []
|
datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
# 不匹配
|
latest_datas = ["10:00:02"]
|
datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
# 不匹配
|
latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
|
datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
# 匹配
|
latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
|
datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
|
datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
|
datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"]
|
latest_datas = load_data(latest_datas)
|
datas = load_data(datas)
|
print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
|
|
|
def test(datas):
|
datas["code"] = "test"
|
|
|
if __name__ == "__main__":
|
test_add_datas()
|