Administrator
2024-01-30 21c96ed504f93f16ce6f8a3ccf164a87c9edd9c0
l2/l2_data_util.py
@@ -19,6 +19,7 @@
from db import redis_manager_delegate as redis_manager
from utils import tool
__db = 1
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
@@ -31,6 +32,9 @@
# 买入订单号映射,只有原生的L2数据才有
local_today_buyno_map = {}
# 已经撤单的订单号
local_today_canceled_buyno_map = {}
def load_l2_data(code, load_latest=True, force=False):
@@ -70,6 +74,7 @@
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
        return data_normal
    return True
@@ -105,6 +110,26 @@
        key = "{}".format(data["val"]["orderNo"])
        if local_today_buyno_map[code].get(key) is None:
            local_today_buyno_map[code].setdefault(key, data)
# 将数据根据orderNo分类已撤订单,原生数据才有
def load_canceled_buy_no_map(local_today_canceled_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
        return
    if local_today_canceled_buyno_map.get(code) is None:
        local_today_canceled_buyno_map[code] = {}
    if clear:
        local_today_canceled_buyno_map[code] = {}
    for data in source_datas:
        # 只留下买撤
        if data["val"]["operateType"] != 1:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_canceled_buyno_map[code].get(key) is None:
            local_today_canceled_buyno_map[code].setdefault(key, data)
@tool.async_call
@@ -148,11 +173,11 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
    if add_datas:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(),
            RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(),
                                   json.dumps(datas))
            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
@@ -162,8 +187,6 @@
            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
        except Exception as e:
            logging.exception(e)
        # 暂时不将数据保存到redis
        # saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
@@ -221,7 +244,7 @@
    price = float(val["price"])
    money = price * int(val["num"])
    if price > 3.0:
        if money >= 30000:
        if money >= 29900:
            return True
        else:
            return False
@@ -385,11 +408,16 @@
        if int(val["operateType"]) != 2:
            return False
        return True
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
    # 涨停卖撤
    @classmethod
    def is_limit_up_price_sell_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 3:
            return False
        return True
    # 是否涨停买撤
@@ -421,6 +449,22 @@
            return True
        return False
    # 是否为买
    @classmethod
    def is_buy(cls, val):
        if int(val["operateType"]) == 0:
            return True
        return False
    # l2时间差值
    @classmethod
    def time_sub_as_ms(cls, val1, val2):
        # 计算时间差值
        sub_s = tool.trade_time_sub(val1["time"], val2["time"])
        sub_ms = int(val1["tms"]) - int(val2["tms"])
        fs = sub_s * 1000 + sub_ms
        return fs
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
@@ -432,8 +476,9 @@
        # 是否有买撤数据
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(cancel_data,
                                                                                                    local_today_buyno_map.get(
                                                                                                        code))
                if buy_index == data["index"]:
                    return True
        return False
@@ -534,4 +579,4 @@
if __name__ == "__main__":
    print(load_l2_data("002235"))
    print(L2DataUtil.time_sub_as_ms({"time": "13:00:00", "time_ms": "980"}, {"time": "11:29:59", "time_ms": "590"}))