Administrator
2023-02-16 92cb2dd75ea37b64b174f42ddd0b5b17d6a4634a
l2/l2_data_util.py
@@ -9,9 +9,12 @@
import logging
import time
import numpy
import constant
import gpcode_manager
from l2 import l2_data_log
import l2_data_util
from l2 import l2_data_log, l2_data_source_util
import log
from db import redis_manager
import tool
@@ -115,7 +118,7 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas, randomKey=None):
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
@@ -378,47 +381,113 @@
class L2TradeQueueUtils(object):
    # 买入数据是否已撤
    @classmethod
    def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map):
        val = data["val"]
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        # 是否有买撤数据
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                if buy_index == data["index"]:
                    return True
        return False
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList, last_index,
    def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList,
                                   last_index,
                                   latest_not_limit_up_time=None):
        def find_traded_progress_index_simple(queues):
            index_set = set()
            for num in queues:
                buy_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(num, "0", buy_1_price_format))
                if buy_datas is not None and len(buy_datas) > 0:
                    for data in buy_datas:
                        # 在最近一次非涨停买1更新的时间之后才有效
                        if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                                   latest_not_limit_up_time) >= 0:
                            if data["index"] >= last_index:
                                index_set.add(data["index"])
            index_list = list(index_set)
            index_list.sort()
            num_list = []
            new_index_list = []
            for index in index_list:
                for i in range(0, total_datas[index]["re"]):
                    num_list.append(total_datas[index]["val"]["num"])
                    new_index_list.append(index)
            index_list_str = ",".join(list(map(str, num_list)))
            queue_list_str = ",".join(list(map(str, queues)))
            find_index = index_list_str.find(queue_list_str)
            if find_index >= 0:
                temp_str = index_list_str[0:find_index]
                if temp_str.endswith(","):
                    temp_str = temp_str[:-1]
                if temp_str == "":
                    return new_index_list[0], new_index_list[0:len(queues)]
                start_index = len(temp_str.split(","))
                return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)]
            return None, None
        # 判断匹配的位置是否可信
        def is_trust(indexes):
            cha = []
            for i in range(1, len(indexes)):
                cha.append(indexes[i] - indexes[i - 1] - 1)
            if len(cha) <= 1:
                return True
            # 标准差小于1
            std_result = numpy.std(cha)
            if std_result < 10:
                # 绝对可信
                return True
            for i in range(0, len(cha)):
                if abs(cha[i]) > 10:
                    # 有超过10 的需要判断两个相临数据间的未撤的买入数量
                    buy_count = 0
                    for index in range(indexes[i] + 1, indexes[i + 1] - 1):
                        if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]):
                            if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map):
                                buy_count += total_datas[index]["re"]
                    # 暂定3个误差范围
                    if buy_count >= 3:
                        return False
            return True
        if len(queueList) == 0:
            return None
        # last_index不能撤,如果已撤就清零
        if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map):
            last_index = 0
        # 补齐整数位5位
        buy_1_price_format = f"{buy_1_price}"
        while buy_1_price_format.find(".") < 4:
            buy_1_price_format = "0" + buy_1_price_format
        index_set = set()
        for num in queueList:
            buy_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(num, "0", buy_1_price_format))
            if buy_datas is not None and len(buy_datas) > 0:
                for data in buy_datas:
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        if data["index"] >= last_index:
                            index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
        new_index_list = []
        for index in index_list:
            for i in range(0, total_datas[index]["re"]):
                num_list.append(total_datas[index]["val"]["num"])
                new_index_list.append(index)
        index_list_str = ",".join(list(map(str, num_list)))
        queue_list_str = ",".join(list(map(str, queueList)))
        find_index = index_list_str.find(queue_list_str)
        if find_index >= 0:
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            if temp_str == "":
                return new_index_list[0]
            return new_index_list[len(temp_str.split(","))]
        # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)---------
        max_win_len = len(queueList)
        min_win_len = len(queueList) // 2
        if max_win_len == min_win_len:
            min_win_len = max_win_len - 1
        for win_len in range(max_win_len, min_win_len, -1):
            # 窗口移动
            for i in range(0, max_win_len - win_len + 1):
                queues = queueList[i:i + win_len]
                f_start_index, f_indexs = find_traded_progress_index_simple(queues)
                if f_start_index and is_trust(f_indexs):
                    return f_start_index
        raise Exception("尚未找到成交进度")
if __name__ == "__main__":
    pass
    cha = [0, 2, 4]
    std_result = numpy.std(cha)
    print(std_result)