Administrator
2025-06-10 efe62c0c92bee36da5179f34bb73e8ee4db6f814
l2_data_util.py
@@ -7,9 +7,71 @@
import json
import time
import l2_data_manager
import tool
from trade_gui import async_call
from db.redis_manager_delegate import RedisUtils
from utils.tool import async_call
from l2 import l2_data_manager
from utils import tool
def run_time():
    def decorator(func):
        def infunc(*args, **kwargs):
            start = round(time.time() * 1000)
            result = func(args, **kwargs)
            print("执行时间", round(time.time() * 1000) - start)
            return result
        return infunc
    return decorator
# 是否为大单
def is_big_money(val, is_ge=False):
    """
    判断是否为大单
    @param val: l2数据
    @param is_ge: 是否为创业板
    @return:
    """
    price = float(val["price"])
    money = round(price * val["num"], 2)
    if is_ge:
        if money >= 29900 or val["num"] >= 2999:
            return True
        else:
            return False
    else:
        if price > 3.0:
            if money >= 29900 or val["num"] >= 7999:
                return True
            else:
                return False
        else:
            max_money = price * 10000
            if money >= max_money * 0.95:
                return True
            else:
                return False
# 获取大资金的金额
def get_big_money_val(limit_up_price, is_ge=False):
    if is_ge:
        return min(299 * 10000, round(limit_up_price * 2900 * 100))
    else:
        if limit_up_price > 3.0:
            return min(299 * 10000, round(limit_up_price * 7999 * 100))
        else:
            max_money = limit_up_price * 10000 * 100
            return int(max_money * 0.95)
# if int(val["num"]) >= constant.BIG_MONEY_NUM:
#     return True
# if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
#     return True
# return False_
def compare_time(time1, time2):
@@ -34,20 +96,6 @@
    json_value = json.loads(value)
    _data = {"key": key, "val": item, "re": json_value["re"], "index": int(json_value["index"])}
    return _data
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"],data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
# 减去时间
@@ -82,30 +130,42 @@
        return __time * 3600, (__time + 1) * 3600
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map):
# 获取买入时间范围
def get_buy_time_range(cancel_data):
    # 计算时间区间
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
                                                        cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    buy_datas = local_today_num_operate_map.get("{}-{}-{}".format(cancel_data["val"]["num"], "0",cancel_data["val"]["price"]))
    if buy_datas is None:
        # 无数据
        return None, None
    for i in range(0, len(buy_datas)):
        data = buy_datas[i]
        if int(data["val"]["operateType"]) != 0:
            continue
        if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
            continue
        if min_space == 0 and max_space == 0:
            if compare_time(data["val"]["time"], min_time) == 0:
                return data["index"], data
    return min_time, max_time
        elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data["index"], data
    return None, None
# 判断卖撤的卖信号是否在目标信号之前
def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map):
    min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"],
                                                        sell_cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(sell_cancel_data["val"]["time"], min_space)
    min_time = __sub_time(sell_cancel_data["val"]["time"], max_space)
    # 如果最大值都在目标信号之前则信号肯定在目标信号之前
    if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")):
        return True
    sell_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"]))
    if sell_datas:
        for i in range(0, len(sell_datas)):
            data = sell_datas[i]
            if int(data["val"]["operateType"]) != 2:
                continue
            if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]):
                continue
            if min_space == 0 and max_space == 0:
                # 本秒内
                if compare_time(data["val"]["time"], min_time) == 0:
                    return data["index"] < target_data["index"]
            # 数据在正确的区间
            elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
                return data["index"] < target_data["index"]
    return False
__last_big_data = {}
@@ -126,12 +186,150 @@
                # 保存快照
                # logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                break
    time_str = tool.get_now_time_str()
    for key in same_time_nums:
        if same_time_nums[key] > 20:
            redis = l2_data_manager._redisManager.getRedis()
            redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1)
    for time_ in same_time_nums:
        # 只保留最近3s内的大数据
        if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3:
            continue
        if same_time_nums[time_] > 20:
            RedisUtils.setex(l2_data_manager._redisManager.getRedis(),
                             "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(),
                             d1)
            break
# 保存l2最新数据的大小
# @async_call
def save_l2_latest_data_number(code, num):
    RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num)
# 获取最新数据条数
def get_l2_latest_data_number(code):
    num = RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code))
    if num is not None:
        return int(num)
    return None
# l2数据拼接工具  暂时还未启用
class L2DataConcatUtil:
    # 初始化
    def __init__(self, code, last_datas, datas):
        self.last_datas = last_datas
        self.datas = datas
        self.code = code
    def __get_data_identity(self, data_):
        data = data_["val"]
        return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"),
                                          data.get("cancelTime"), data.get("cancelTimeUnit"))
    # 获取拼接的特征,获取最后3笔
    def __get_concat_feature(self):
        # 最少需要3条数据+2条需要有特征点的数据
        min_identity = 2
        min_count = 3
        identity_set = set()
        count = 0
        start_index = -1
        for i in range(len(self.last_datas) - 1, -1, -1):
            identity_set.add(self.__get_data_identity(self.last_datas[i]))
            count += 1
            start_index = i
            if count >= min_count and len(identity_set) >= min_identity:
                break
        return start_index, len(self.last_datas) - 1
    # 获取新增数据
    def get_add_datas(self):
        # 查询当前数据是否在最近一次数据之后
        if self.last_datas and self.datas:
            if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(
                    self.last_datas[-1]["val"]["time"].replace(":", "")) < 0:
                return []
        # 获取拼接点
        start_index, end_index = self.__get_concat_feature()
        if start_index < 0:
            return self.datas
        print("特征位置:", start_index, end_index)
        # 提取特征点的标识数据
        identity_list = []
        for i in range(start_index, end_index + 1):
            identity_list.append(self.__get_data_identity(self.last_datas[i]))
        # 查找完整的特征
        identity_count = len(identity_list)
        for n in range(0, identity_count):
            # 每次遍历减少最前面一个特征量
            for i in range(0, len(self.datas) - len(identity_list) + n):
                if self.__get_data_identity(self.datas[i]) == identity_list[n]:
                    # n==0 表示完全匹配 , i=0 表示即使不是完全匹配,但必须新数据第一个元素匹配
                    if n == 0 or i == 0:
                        find_identity = True
                        for j in range(n + 1, len(identity_list)):
                            if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]):
                                find_identity = False
                                break
                        if find_identity:
                            return self.datas[i + len(identity_list) - n:]
                else:
                    continue
        print("新数据中未找到特征标识")
        return self.datas
def test_add_datas():
    def load_data(datas):
        data_list = []
        for data in datas:
            data_list.append({"val": {"time": data}})
        return data_list
    # 不匹配
    latest_datas = []
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 不匹配
    latest_datas = ["10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 不匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    # 匹配
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"]
    datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
    latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"]
    datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"]
    latest_datas = load_data(latest_datas)
    datas = load_data(datas)
    print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas())
def test(datas):
@@ -139,12 +337,4 @@
if __name__ == "__main__":
    # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}}
    # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}]
    # result= get_buy_data_with_cancel_data(cancel_data,today_datas)
    # print(result)
    code = "001209"
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code))
    print(index, data)
    test_add_datas()