Administrator
2023-02-06 736e61b89e87f7e3c224feca25e94cda459b9ae6
l2/l2_data_util.py
@@ -2,20 +2,385 @@
L2相关数据处理
"""
# L2交易队列
import datetime
import decimal
import json
import logging
import time
import constant
import gpcode_manager
import l2_data_log
import log
import redis_manager
import tool
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
# 本地最新一次上传的数据
local_latest_datas = {}
# 本地今日数据
local_today_datas = {}
# 本地手数+操作那类型组成的临时变量
# 用于加快数据处理,用空换时间
local_today_num_operate_map = {}
def load_l2_data(code, force=False):
    redis = _redisManager.getRedis()
    # 加载最近的l2数据
    if local_latest_datas.get(code) is None or force:
        # 获取最近的数据
        _data = redis.get("l2-data-latest-{}".format(code))
        if _data is not None:
            if code in local_latest_datas:
                local_latest_datas[code] = json.loads(_data)
            else:
                local_latest_datas.setdefault(code, json.loads(_data))
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas = []
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
        local_today_num_operate_map[code] = {}
    if clear:
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(time.time() * 1000)
    # 查询票是否在待监听的票里面
    if not gpcode_manager.is_in_gp_pool(code):
        return None
    # 验证股价的正确性
    redis_instance = _redisManager.getRedis()
    try:
        if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
            # 计算保留的时间
            expire = tool.get_expire()
            i = 0
            for _data in datas:
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time))
    return datas
# 保存l2数据
def save_l2_data(code, datas, add_datas, randomKey=None):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            log.logger_l2_data.info("{}-{}", code, add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
# 设置最新的l2数据采集的数量
def __set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    redis.setex(key, 2, count)
    pass
# 获取代码最近的l2数据数量
def get_l2_data_latest_count(code):
    if code is None or len(code) < 1:
        return 0
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    result = redis.get(key)
    if result is None:
        return 0
    else:
        return int(result)
def parseL2Data(str):
    day = datetime.datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, latest_datas, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_data = None
        latest_datas_ = latest_datas
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, latest_datas, _datas):
        latest_data = latest_datas
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
        datas = []
        dataIndexs = {}
        same_time_num = {}
        for item in data:
            # 解析数据
            time = item["time"]
            if time in same_time_num:
                same_time_num[time] = same_time_num[time] + 1
            else:
                same_time_num[time] = 1
            price = float(item["price"])
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None:
                if limit_up_price == tool.to_price(decimal.Decimal(price)):
                    limitPrice = 1
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                                   cancelTimeUnit)
            if key in dataIndexs:
                # 数据重复次数+1
                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
            else:
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # TODO 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # @classmethod
    # def get_time_as_str(cls, time_seconds):
    #     ts = time_str.split(":")
    #     return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 0:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        # if price * num * 100 < 50 * 10000:
        #     return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
class L2TradeQueueUtils(object):
    # 获取成交进度索引
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList):
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   latest_not_limit_up_time=None):
        if len(queueList) == 0:
            return None
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price))
                "{}-{}-{}".format(num, "0", buy_1_price_format))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    index_set.add(data["index"])
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
@@ -29,6 +394,9 @@
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            return new_index_list[len(temp_str.split(","))]
        raise Exception("尚未找到成交进度")