Administrator
2023-07-24 0e68e24f54db11d340785b17570fff2bc5fc7ac6
l2/l2_data_util.py
@@ -12,12 +12,11 @@
import numpy
import constant
import gpcode_manager
import l2_data_util
from code_attribute import gpcode_manager
from l2 import l2_data_log, l2_data_source_util
import log
from log_module import log
from db import redis_manager
import tool
from utils import tool
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
@@ -29,18 +28,22 @@
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
# 买入订单号映射,只有原生的L2数据才有
local_today_buyno_map = {}
def load_l2_data(code, force=False):
def load_l2_data(code, load_latest=True, force=False):
    redis = _redisManager.getRedis()
    # 加载最近的l2数据
    if local_latest_datas.get(code) is None or force:
        # 获取最近的数据
        _data = redis.get("l2-data-latest-{}".format(code))
        if _data is not None:
            if code in local_latest_datas:
                local_latest_datas[code] = json.loads(_data)
            else:
                local_latest_datas.setdefault(code, json.loads(_data))
    if load_latest :
        if local_latest_datas.get(code) is None or force:
            # 获取最近的数据
            _data = redis.get("l2-data-latest-{}".format(code))
            if _data is not None:
                if code in local_latest_datas:
                    local_latest_datas[code] = json.loads(_data)
                else:
                    local_latest_datas.setdefault(code, json.loads(_data))
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
@@ -49,6 +52,9 @@
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        data_normal = True
        if datas and len(datas) < datas[-1]["index"] + 1:
            data_normal = False
        # 从数据库加载
        # datas = []
@@ -63,6 +69,9 @@
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        return data_normal
    return True
# 将数据根据num-operate分类
@@ -77,6 +86,25 @@
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
# 将数据根据orderNo分类,原生数据才有
def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
        return
    if local_today_buyno_map.get(code) is None:
        local_today_buyno_map[code] = {}
    if clear:
        local_today_buyno_map[code] = {}
    for data in source_datas:
        if data["val"]["operateType"] != 0:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_buyno_map[code].get(key) is None:
            local_today_buyno_map[code].setdefault(key, data)
@tool.async_call
@@ -124,20 +152,22 @@
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        if datas:
            redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
            set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
        # 暂时不将数据保存到redis
        # saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
def __set_l2_data_latest_count(code, count):
def set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    redis.setex(key, 2, count)
@@ -167,12 +197,25 @@
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    count = data["count"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
    return day, client, channel, code, capture_time, process_time, data, count
# 元数据是否有差异
def is_origin_data_diffrent(data1, data2):
    if data1 is None or data2 is None:
        return True
    if len(data1) != len(data2):
        return True
    # 比较
    data_length = len(data1)
    step = len(data1) // 10
    for i in range(0, data_length, step):
        if json.dumps(data1[i]) != json.dumps(data2[i]):
            return True
    return False
# 是否为大单
@@ -262,7 +305,8 @@
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            # 暂时不将数据保存到redis
            # saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
@@ -435,6 +479,10 @@
                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
            return None, None
        # 3个数据以上的不需要判断最近的一次未涨停时间
        if len(queueList) >= 3:
            latest_not_limit_up_time = None
        # 判断匹配的位置是否可信
        def is_trust(indexes):
            cha = []
@@ -488,6 +536,4 @@
if __name__ == "__main__":
    cha = [0, 2, 4]
    std_result = numpy.std(cha)
    print(std_result)
    print(load_l2_data("002235"))