"""
|
L2数据溯源
|
"""
|
import constant
|
from log_module.log import logger_l2_error
|
from utils import tool
|
|
|
class L2DataSourceUtils(object):
|
__cancel_and_buy_map = {}
|
__buy_and_cancel_map = {}
|
|
@classmethod
|
def __save_map(cls, code, buy_index, cancel_index):
|
if cls.__cancel_and_buy_map.get(code) is None:
|
cls.__cancel_and_buy_map[code] = {}
|
cls.__cancel_and_buy_map[code][cancel_index] = buy_index
|
|
if cls.__buy_and_cancel_map.get(code) is None:
|
cls.__buy_and_cancel_map[code] = {}
|
cls.__buy_and_cancel_map[code][buy_index] = cancel_index
|
|
@classmethod
|
def __get_cancel_index_cache(cls, code, buy_index):
|
if code not in cls.__buy_and_cancel_map:
|
return None
|
return cls.__buy_and_cancel_map[code].get(buy_index)
|
|
@classmethod
|
def __get_buy_index_cache(cls, code, cancel_index):
|
if code not in cls.__cancel_and_buy_map:
|
return None
|
return cls.__cancel_and_buy_map[code].get(cancel_index)
|
|
@classmethod
|
def __compare_time(cls, time1, time2):
|
result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2))
|
return result
|
|
# 计算时间的区间
|
@classmethod
|
def __compute_time_space_as_second(cls, cancel_time, cancel_time_unit):
|
__time = int(cancel_time)
|
if int(cancel_time) == 0:
|
return 0, 0
|
unit = int(cancel_time_unit)
|
if unit == 0:
|
# 秒
|
return __time, (__time + 1)
|
elif unit == 1:
|
# 分钟
|
return __time * 60, (__time + 1) * 60
|
elif unit == 2:
|
# 小时
|
return __time * 3600, (__time + 1) * 3600
|
|
# 华鑫渠道的L2,根据买撤数据查找买入数据
|
@classmethod
|
def __get_buy_index_with_cancel_data_by_huaxin_l2(cls, code, cancel_data, local_today_num_operate_map):
|
buy_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
|
if buy_datas is None:
|
# 无数据
|
return None
|
# orderNo
|
for bd in buy_datas:
|
# 根据订单号做匹配
|
if bd["val"]["orderNo"] == cancel_data["val"]["orderNo"]:
|
return bd["index"]
|
return None
|
|
# 同花顺渠道的L2,根据买撤数据查找买入数据
|
@classmethod
|
def __get_buy_index_with_cancel_data_by_ths_l2(cls, code, cancel_data, local_today_num_operate_map):
|
buy_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
|
if buy_datas is None:
|
# 无数据
|
return None
|
min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"],
|
cancel_data["val"]["cancelTimeUnit"])
|
max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space)
|
min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space)
|
# 匹配到的index
|
suit_indexes = []
|
for i in range(0, len(buy_datas)):
|
data = buy_datas[i]
|
|
if int(data["val"]["operateType"]) != 0:
|
continue
|
if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
|
continue
|
# 如果能找到对应的买撤就需要返回
|
cancel_index = cls.__get_cancel_index_cache(code, data["index"])
|
if cancel_index is not None and cancel_index != cancel_data["index"]:
|
continue
|
|
if min_space == 0 and max_space == 0:
|
if cls.__compare_time(data["val"]["time"], min_time) == 0:
|
suit_indexes.append(data["index"])
|
elif cls.__compare_time(data["val"]["time"], min_time) > 0 and cls.__compare_time(data["val"]["time"],
|
max_time) <= 0:
|
suit_indexes.append(data["index"])
|
if len(suit_indexes) >= 2:
|
# 多个匹配项,优先溯源离取消位置最近的数据
|
suit_indexes.sort(key=lambda t: cancel_data["index"] - t)
|
|
if len(suit_indexes) >= 1:
|
cls.__save_map(code, suit_indexes[0], cancel_data["index"])
|
return suit_indexes[0]
|
return None
|
|
@classmethod
|
def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
|
buy_index = cls.__get_buy_index_cache(code, cancel_data["index"])
|
if buy_index is not None:
|
return buy_index
|
if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
|
return cls.__get_buy_index_with_cancel_data_by_huaxin_l2(code, cancel_data, local_today_num_operate_map)
|
else:
|
return cls.__get_buy_index_with_cancel_data_by_ths_l2(code, cancel_data, local_today_num_operate_map)
|
|
# 根据买撤数据(与今日总的数据)计算买入数据
|
@classmethod
|
def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
|
key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"])
|
cancel_datas = local_today_num_operate_map.get(key)
|
if cancel_datas:
|
try:
|
cancel_datas.sort(key=lambda t: t["index"])
|
except Exception as e:
|
# print("测试")
|
pass
|
for item in cancel_datas:
|
# 提前做计算
|
cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
|
|
return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
|
|
# 根据买撤数据计算买入数据(华鑫原生L2数据适用)
|
@classmethod
|
def get_buy_index_with_cancel_data_v2(cls, cancel_data, buyno_map):
|
order_no = str(cancel_data["val"]["orderNo"])
|
buy_data = buyno_map.get(order_no)
|
if buy_data:
|
return buy_data["index"]
|
return None
|
|
# 获取没撤的笔数
|
@classmethod
|
def get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
|
data = None
|
try:
|
data = total_data[index]
|
except Exception as e:
|
logger_l2_error.error(f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
|
val = data["val"]
|
# 判断当前买是否已经买撤
|
cancel_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(val["num"], "1", val["price"]))
|
canceled = False
|
if cancel_datas:
|
for cancel_data in cancel_datas:
|
buy_index = cls.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
|
if buy_index == index:
|
canceled = True
|
count = data["re"] - cancel_data["re"]
|
if count > 0:
|
return count
|
break
|
if not canceled:
|
count = data["re"]
|
return count
|
return 0
|
|
# 获取没撤的笔数
|
|
# 获取涨停买没有撤单的数量
|
@classmethod
|
def get_limit_up_buy_no_canceled_count_v2(cls, code, index, total_data, canceled_buyno_map):
|
data = None
|
try:
|
data = total_data[index]
|
except Exception as e:
|
logger_l2_error.error(
|
f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
|
val = data["val"]
|
order_no = str(val["orderNo"])
|
canceled_data = canceled_buyno_map.get(order_no)
|
if canceled_data:
|
return data["re"] - canceled_data["re"]
|
else:
|
return data["re"]
|
|
@classmethod
|
def get_limit_up_buy_canceled_data_v2(cls, code, index, total_data, canceled_buyno_map):
|
data = None
|
try:
|
data = total_data[index]
|
except Exception as e:
|
logger_l2_error.error(
|
f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
|
val = data["val"]
|
order_no = str(val["orderNo"])
|
canceled_data = canceled_buyno_map.get(order_no)
|
if canceled_data:
|
return canceled_data
|
else:
|
return None
|
|
|
# if __name__ == "__main__":
|
# code = "000925"
|
# l2_data_util.load_l2_data(code)
|
# total_datas = l2_data_util.local_today_datas.get(code)
|
# local_today_num_operate_map = l2_data_util.local_today_num_operate_map.get(code)
|
# cancel_index = 900
|
# index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index],
|
# l2_data_util.local_today_num_operate_map.get(code))
|
# print("溯源位置:", index)
|