Administrator
2025-06-11 01696a5d8c2c3cf3062aa6a8ccbf123547c2dbf0
l2/l2_data_source_util.py
@@ -1,7 +1,9 @@
"""
L2数据溯源
"""
import tool
import constant
from log_module.log import logger_l2_error
from utils import tool
class L2DataSourceUtils(object):
@@ -19,13 +21,13 @@
        cls.__buy_and_cancel_map[code][buy_index] = cancel_index
    @classmethod
    def __get_cancel_index(cls, code, buy_index):
    def __get_cancel_index_cache(cls, code, buy_index):
        if code not in cls.__buy_and_cancel_map:
            return None
        return cls.__buy_and_cancel_map[code].get(buy_index)
    @classmethod
    def __get_buy_index(cls, code, cancel_index):
    def __get_buy_index_cache(cls, code, cancel_index):
        if code not in cls.__cancel_and_buy_map:
            return None
        return cls.__cancel_and_buy_map[code].get(cancel_index)
@@ -52,35 +54,33 @@
            # 小时
            return __time * 3600, (__time + 1) * 3600
    # 根据买撤数据(与今日总的数据)计算买入数据
    # 华鑫渠道的L2,根据买撤数据查找买入数据
    @classmethod
    def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"])
        cancel_datas = local_today_num_operate_map.get(key)
        try:
            cancel_datas.sort(key=lambda t: t["index"])
        except Exception as e:
            print("测试")
        for item in cancel_datas:
            cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
        return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
    @classmethod
    def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        buy_index = cls.__get_buy_index(code, cancel_data["index"])
        if buy_index is not None:
            return buy_index
        min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                                  cancel_data["val"]["cancelTimeUnit"])
        max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space)
        min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space)
    def __get_buy_index_with_cancel_data_by_huaxin_l2(cls, code, cancel_data, local_today_num_operate_map):
        buy_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
        if buy_datas is None:
            # 无数据
            return None
        # orderNo
        for bd in buy_datas:
            # 根据订单号做匹配
            if bd["val"]["orderNo"] == cancel_data["val"]["orderNo"]:
                return bd["index"]
        return None
    # 同花顺渠道的L2,根据买撤数据查找买入数据
    @classmethod
    def __get_buy_index_with_cancel_data_by_ths_l2(cls, code, cancel_data, local_today_num_operate_map):
        buy_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
        if buy_datas is None:
            # 无数据
            return None
        min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                                  cancel_data["val"]["cancelTimeUnit"])
        max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space)
        min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space)
        # 匹配到的index
        suit_indexes = []
        for i in range(0, len(buy_datas)):
@@ -91,7 +91,7 @@
            if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
                continue
                # 如果能找到对应的买撤就需要返回
            cancel_index = cls.__get_cancel_index(code, data["index"])
            cancel_index = cls.__get_cancel_index_cache(code, data["index"])
            if cancel_index is not None and cancel_index != cancel_data["index"]:
                continue
@@ -110,6 +110,104 @@
            return suit_indexes[0]
        return None
    @classmethod
    def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        buy_index = cls.__get_buy_index_cache(code, cancel_data["index"])
        if buy_index is not None:
            return buy_index
        if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
            return cls.__get_buy_index_with_cancel_data_by_huaxin_l2(code, cancel_data, local_today_num_operate_map)
        else:
            return cls.__get_buy_index_with_cancel_data_by_ths_l2(code, cancel_data, local_today_num_operate_map)
    # 根据买撤数据(与今日总的数据)计算买入数据
    @classmethod
    def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map):
        key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"])
        cancel_datas = local_today_num_operate_map.get(key)
        if cancel_datas:
            try:
                cancel_datas.sort(key=lambda t: t["index"])
            except Exception as e:
                # print("测试")
                pass
            for item in cancel_datas:
                # 提前做计算
                cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map)
        return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
    # 根据买撤数据计算买入数据(华鑫原生L2数据适用)
    @classmethod
    def get_buy_index_with_cancel_data_v2(cls, cancel_data, buyno_map):
        order_no = str(cancel_data["val"]["orderNo"])
        buy_data = buyno_map.get(order_no)
        if buy_data:
            return buy_data["index"]
        return None
    # 获取没撤的笔数
    @classmethod
    def get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        # 判断当前买是否已经买撤
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        canceled = False
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = cls.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map)
                if buy_index == index:
                    canceled = True
                    count = data["re"] - cancel_data["re"]
                    if count > 0:
                        return count
                    break
        if not canceled:
            count = data["re"]
            return count
        return 0
        # 获取没撤的笔数
    # 获取涨停买没有撤单的数量
    @classmethod
    def get_limit_up_buy_no_canceled_count_v2(cls, code, index, total_data, canceled_buyno_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(
                f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        order_no = str(val["orderNo"])
        canceled_data = canceled_buyno_map.get(order_no)
        if canceled_data:
            return data["re"] - canceled_data["re"]
        else:
            return data["re"]
    @classmethod
    def get_limit_up_buy_canceled_data_v2(cls, code, index, total_data, canceled_buyno_map):
        data = None
        try:
            data = total_data[index]
        except Exception as e:
            logger_l2_error.error(
                f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}")
        val = data["val"]
        order_no = str(val["orderNo"])
        canceled_data = canceled_buyno_map.get(order_no)
        if canceled_data:
            return canceled_data
        else:
            return None
# if __name__ == "__main__":
#     code = "000925"