Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
l2/l2_data_util.py
@@ -9,13 +9,17 @@
import logging
import time
import constant
import gpcode_manager
import l2_data_log
import log
import redis_manager
import tool
import numpy
import constant
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_log, l2_data_source_util
from log_module import log, log_export, async_log_util
from db import redis_manager_delegate as redis_manager
from utils import tool
__db = 1
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
@@ -26,26 +30,38 @@
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
# 买入订单号映射,只有原生的L2数据才有
local_today_buyno_map = {}
def load_l2_data(code, force=False):
    redis = _redisManager.getRedis()
# 卖出订单号映射,只有原生的L2数据才有
local_today_sellno_map = {}
# 已经撤单的订单号
local_today_canceled_buyno_map = {}
def load_l2_data(code, load_latest=True, force=False):
    # 加载最近的l2数据
    if local_latest_datas.get(code) is None or force:
        # 获取最近的数据
        _data = redis.get("l2-data-latest-{}".format(code))
        if _data is not None:
            if code in local_latest_datas:
                local_latest_datas[code] = json.loads(_data)
            else:
                local_latest_datas.setdefault(code, json.loads(_data))
    if load_latest:
        if local_latest_datas.get(code) is None or force:
            # 获取最近的数据
            _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code))
            if _data is not None:
                if code in local_latest_datas:
                    local_latest_datas[code] = json.loads(_data)
                else:
                    local_latest_datas.setdefault(code, json.loads(_data))
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log.load_l2_from_log()
        datas = log_export.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        data_normal = True
        if datas and len(datas) < datas[-1]["index"] + 1:
            data_normal = False
        # 从数据库加载
        # datas = []
@@ -60,6 +76,44 @@
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
        return data_normal
    return True
# L2数据是否正常
def l2_data_is_normal(code):
    datas = local_today_datas.get(code)
    if not datas:
        # 初始化
        local_today_datas[code] = []
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code))
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code))
        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code))
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code))
    if datas and len(datas) != datas[-1]["index"] + 1:
        return False
    return True
# 加载所有的l2数据
def load_l2_data_all(force=False):
    datas = log_export.load_l2_from_log()
    for code in datas:
        if force:
            local_today_datas[code] = datas[code]
        else:
            if code not in local_today_datas:
                local_today_datas[code] = datas[code]
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_sell_no_map(local_today_sellno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
    constant.L2_DATA_IS_LOADED = True
# 将数据根据num-operate分类
@@ -76,6 +130,64 @@
        local_today_num_operate_map[code].get(key).append(data)
# 将数据根据orderNo分类,原生数据才有
def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
        return
    if local_today_buyno_map.get(code) is None:
        local_today_buyno_map[code] = {}
    if clear:
        local_today_buyno_map[code] = {}
    for data in source_datas:
        if data["val"]["operateType"] != 0:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_buyno_map[code].get(key) is None:
            local_today_buyno_map[code].setdefault(key, data)
# 将数据根据orderNo分类,原生数据才有
def load_sell_no_map(local_today_sellno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
        return
    if local_today_sellno_map.get(code) is None:
        local_today_sellno_map[code] = {}
    if clear:
        local_today_sellno_map[code] = {}
    for data in source_datas:
        if data["val"]["operateType"] != 2:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_sellno_map[code].get(key) is None:
            local_today_sellno_map[code].setdefault(key, data)
# 将数据根据orderNo分类已撤订单,原生数据才有
def load_canceled_buy_no_map(local_today_canceled_buyno_map, code, source_datas, clear=False):
    # 只有原生L2数据才会有此操作
    if constant.L2_SOURCE_TYPE != constant.L2_SOURCE_TYPE_HUAXIN:
        return
    if local_today_canceled_buyno_map.get(code) is None:
        local_today_canceled_buyno_map[code] = {}
    if clear:
        local_today_canceled_buyno_map[code] = {}
    for data in source_datas:
        # 只留下买撤
        if data["val"]["operateType"] != 1:
            continue
        # 只填充买入数据
        key = "{}".format(data["val"]["orderNo"])
        if local_today_canceled_buyno_map[code].get(key) is None:
            local_today_canceled_buyno_map[code].setdefault(key, data)
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(time.time() * 1000)
@@ -86,58 +198,57 @@
    redis_instance = _redisManager.getRedis()
    try:
        if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0:
            # 计算保留的时间
            expire = tool.get_expire()
            i = 0
            for _data in datas:
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                value = RedisUtils.get(redis_instance, key, auto_free=False)
                if value is None:
                    # 新增
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False)
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False)
    finally:
        redis_instance.delete("l2-save-{}".format(code))
        RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False)
        RedisUtils.realse(redis_instance)
    print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time))
    return datas
# 保存l2数据
def save_l2_data(code, datas, add_datas, randomKey=None):
    redis = _redisManager.getRedis()
def save_l2_data(code, datas, add_datas):
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
    if add_datas:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        if datas:
            RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(),
                                   json.dumps(datas))
            # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
            set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
            async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}")
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
def __set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
def set_l2_data_latest_count(code, count):
    key = "latest-l2-count-{}".format(code)
    redis.setex(key, 2, count)
    RedisUtils.setex(_redisManager.getRedis(), key, 2, count)
    pass
@@ -145,10 +256,9 @@
def get_l2_data_latest_count(code):
    if code is None or len(code) < 1:
        return 0
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    result = redis.get(key)
    result = RedisUtils.get(_redisManager.getRedis(), key)
    if result is None:
        return 0
    else:
@@ -164,12 +274,25 @@
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    count = data["count"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
    return day, client, channel, code, capture_time, process_time, data, count
# 元数据是否有差异
def is_origin_data_diffrent(data1, data2):
    if data1 is None or data2 is None:
        return True
    if len(data1) != len(data2):
        return True
    # 比较
    data_length = len(data1)
    step = len(data1) // 10
    for i in range(0, data_length, step):
        if json.dumps(data1[i]) != json.dumps(data2[i]):
            return True
    return False
class L2DataUtil:
@@ -177,11 +300,7 @@
    def is_same_time(cls, time1, time2):
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
        if abs(tool.trade_time_sub(time1, time2)) < 3:
            return True
        else:
            return False
@@ -242,7 +361,8 @@
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            # 暂时不将数据保存到redis
            # saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
@@ -286,7 +406,7 @@
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # TODO 测试的时候开启,方便记录大单数据
        # 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
@@ -323,11 +443,16 @@
        if int(val["operateType"]) != 2:
            return False
        return True
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
    # 涨停卖撤
    @classmethod
    def is_limit_up_price_sell_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 3:
            return False
        return True
    # 是否涨停买撤
@@ -339,10 +464,12 @@
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    @classmethod
    def is_buy_cancel(cls, val):
        if int(val["operateType"]) != 1:
            return False
        return True
    # 是否卖撤
@@ -359,47 +486,138 @@
            return True
        return False
    # 是否为买
    @classmethod
    def is_buy(cls, val):
        if int(val["operateType"]) == 0:
            return True
        return False
    # l2时间差值
    @classmethod
    def time_sub_as_ms(cls, val1, val2):
        # 计算时间差值
        sub_s = tool.trade_time_sub(val1["time"], val2["time"])
        sub_ms = int(val1["tms"]) - int(val2["tms"])
        fs = sub_s * 1000 + sub_ms
        return fs
    @classmethod
    def get_time_with_ms(cls, val):
        return val["time"] + "." + "{:0>3}".format(int(val["tms"]))
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
    @classmethod
    def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
        val = data["val"]
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        # 是否有买撤数据
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(cancel_data,
                                                                                                    local_today_buyno_map.get(
                                                                                                        code))
                if buy_index == data["index"]:
                    return True
        return False
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,
    def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   last_index,
                                   latest_not_limit_up_time=None):
        def find_traded_progress_index_simple(queues):
            index_set = set()
            for num in queues:
                buy_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(num, "0", buy_1_price_format))
                if buy_datas is not None and len(buy_datas) > 0:
                    for data in buy_datas:
                        # 在最近一次非涨停买1更新的时间之后才有效
                        if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                                   latest_not_limit_up_time) >= 0:
                            if data["index"] >= last_index:
                                index_set.add(data["index"])
            index_list = list(index_set)
            index_list.sort()
            num_list = []
            new_index_list = []
            for index in index_list:
                for i in range(0, total_datas[index]["re"]):
                    num_list.append(total_datas[index]["val"]["num"])
                    new_index_list.append(index)
            index_list_str = ",".join(list(map(str, num_list)))
            queue_list_str = ",".join(list(map(str, queues)))
            find_index = index_list_str.find(queue_list_str)
            if find_index >= 0:
                temp_str = index_list_str[0:find_index]
                if temp_str.endswith(","):
                    temp_str = temp_str[:-1]
                if temp_str == "":
                    return new_index_list[0], new_index_list[0:len(queues)]
                start_index = len(temp_str.split(","))
                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
            return None, None
        # 3个数据以上的不需要判断最近的一次未涨停时间
        if len(queueList) >= 3:
            latest_not_limit_up_time = None
        # 判断匹配的位置是否可信
        def is_trust(indexes):
            cha = []
            for i in range(1, len(indexes)):
                cha.append(indexes[i] - indexes[i - 1] - 1)
            if len(cha) <= 1:
                return True
            # 标准差小于1
            std_result = numpy.std(cha)
            if std_result < 10:
                # 绝对可信
                return True
            for i in range(0, len(cha)):
                if abs(cha[i]) > 10:
                    # 有超过10 的需要判断两个相临数据间的未撤的买入数量
                    buy_count = 0
                    for index in range(indexes[i] + 1, indexes[i + 1] - 1):
                        if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                            if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
                                buy_count += total_datas[index]["re"]
                    # 暂定3个误差范围
                    if buy_count >= 3:
                        return False
            return True
        if len(queueList) == 0:
            return None
        # last_index不能撤,如果已撤就清零
        if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
            last_index = 0
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price_format))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
        new_index_list = []
        for index in index_list:
            for i in range(0, total_datas[index]["re"]):
                num_list.append(total_datas[index]["val"]["num"])
                new_index_list.append(index)
        index_list_str = ",".join(list(map(str, num_list)))
        queue_list_str = ",".join(list(map(str, queueList)))
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            return new_index_list[len(temp_str.split(","))]
        # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)---------
        max_win_len = len(queueList)
        min_win_len = len(queueList) // 2
        if max_win_len == min_win_len:
            min_win_len = max_win_len - 1
        for win_len in range(max_win_len, min_win_len, -1):
            # 窗口移动
            for i in range(0, max_win_len - win_len + 1):
                queues = queueList[i:i + win_len]
                f_start_index, f_indexs = find_traded_progress_index_simple(queues)
                if f_start_index and is_trust(f_indexs):
                    return f_start_index
        raise Exception("尚未找到成交进度")
if __name__ == "__main__":
    pass
    print(L2DataUtil.time_sub_as_ms({"time": "09:46:05", "tms": 480}, {"time": "09:46:04", "tms": 90}))