Administrator
2023-02-06 736e61b89e87f7e3c224feca25e94cda459b9ae6
l2/cancel_buy_strategy.py
@@ -6,15 +6,20 @@
# s级平均大单计算
# 计算范围到申报时间的那一秒
import json
import logging
import time
import big_money_num_manager
import constant
import gpcode_manager
import l2_data_log
import l2_data_util
import redis_manager
import tool
import trade_data_manager
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log
from l2_data_manager import L2DataUtil, local_today_num_operate_map, load_l2_data, local_today_datas
from l2.l2_data_manager import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn
class SecondCancelBigNumComputer:
@@ -88,11 +93,12 @@
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
                    need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug(threadId, code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
@@ -171,7 +177,8 @@
                        if cancel_num / buy_num > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num,
                                buy_num)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
@@ -205,20 +212,40 @@
            return None
        return int(val)
    @classmethod
    def __save_watch_index_set(cls, code, datas):
        key = f"h_cancel_watch_indexs-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps(list(datas)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set(cls, code):
        key = f"h_cancel_watch_indexs-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
        val = json.loads(val)
        return val
    # 保存结束位置
    @classmethod
    def __save_compute_data(cls, code, process_index, buy_num, cancel_num):
    def __save_compute_data(cls, code, process_index, cancel_num):
        key = "h_cancel_compute_data-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
        key = "h_cancel_compute_data-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return -1, 0, 0
            return -1, 0
        val = json.loads(val)
        return val[0], val[1], val[2]
        return val[0], val[1]
    @classmethod
    def __del_compute_data(cls, code):
        key = "h_cancel_compute_data-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def __clear_data(cls, code):
@@ -234,124 +261,543 @@
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, start_index, end_index, total_data):
        # 获取大单的最小手数
        left_big_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # 去除非大单
            if not l2_data_util.is_big_money(val):
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    left_big_num -= val["num"] * data["re"]
                elif buy_index is None:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[end_index]["val"]["time"].replace(":", "")):
                            left_big_num -= val["num"] * data["re"]
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, need_cancel=True):
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, threadId):
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        watch_indexs = cls.__get_watch_index_set(code)
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[1]
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                    end_index = i
                    break
        if watch_indexs is None:
            l2_log.cancel_debug(threadId, code, "H撤没获取到监听范围数据")
            return False, None
        # 获取处理进度
        process_index_old, buy_num, cancel_num = cls.__get_compute_data(code)
        processed_index, cancel_num = cls.__get_compute_data(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = -1
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_exec_index, total_data)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
            process_index = end_index
        l2_log.cancel_debug(threadId, code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        process_index = start_index
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if not l2_data_util.is_big_money(val):
                if i <= processed_index:
                    # 已经处理过了
                    continue
                process_index = i
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                         val["cancelTimeUnit"])
                        # 只判断S级撤销,只有s级撤销才有可能相等
                        if max_space - min_space <= 1:
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                    # 保存数据
                    if need_cancel:
                        cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if cancel_num / buy_num > cancel_rate_threshold:
                            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num, buy_num)
            l2_log.cancel_debug(threadId, code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data):
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index):
        l2_log.cancel_debug(code, "成交进度:{}", index)
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map):
        l2_log.cancel_debug(0, code, "成交进度:{}", index)
        # 成交进度
        cls.__save_trade_progress(code, index)
        cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
    @classmethod
    def compute_watch_end_index(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index = cls.__get_trade_progress(code)
        threshold_money = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) * 2
        # 最小值1500万
        if threshold_money < 15000000:
            threshold_money = 15000000
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        if trade_progress_index is None:
            raise Exception("尚未获取到成交进度")
        total_num = 0
        watch_set = set()
        for i in range(trade_progress_index + 1, total_data[-1]["index"] + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                total_num += val["num"] * data["re"]
                # 判断当前买是否已经买撤
                cancel_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(val["num"], "1", val["price"]))
                canceled = False
                if cancel_datas:
                    for cancel_data in cancel_datas:
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                         local_today_num_operate_map)
                        if buy_index == i:
                            # 已经买撤
                            total_num -= buy_data["val"]["num"] * cancel_data["re"]
                            canceled = True
                            if data["re"] - cancel_data["re"] > 0:
                                watch_set.add((i, data["re"] - cancel_data["re"]))
                            break
                if not canceled:
                    watch_set.add((i, data["re"]))
                # 判断是否达到阈值
                if total_num >= threshold_num:
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(watch_set))
                    break
        # 保存计算范围
        cls.__save_watch_index_set(code, watch_set)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)
# --------------------------------封单额变化撤------------------------
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    _thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
            old_to = to_index
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = cls.__get_redis().get(key)
        return cls.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = cls.__get_redis().keys(key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        result = cls.__get_redis().get(key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
        else:
            return 0, -1
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
        # 记录买1矫正日志
        logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
        time_ = time_str.replace(":", "")
        key = None
        # 获取矫正时间前1分钟的数据
        keys = []
        for i in range(0, 3600):
            temp_time = tool.trade_time_add_second(time_str, 0 - i)
            # 只处理9:30后的数据
            if int(temp_time.replace(":", "")) < int("093000"):
                break
            keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
            if len(keys_) > 0:
                keys.append(keys_[0])
            if len(keys) >= 1:
                break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
            val = cls.__get_redis().get(key)
            old_num, old_from, old_to = cls.__format_second_money_record_val(val)
            end_index = old_to
            # 保存最近的数据
            cls.__set_l2_latest_money_record(code, end_index, num)
            logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
        else:
            logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
        # 取消此种方法
        #
        # for i in range(4, -2, -2):
        #     # 获取本(分钟/小时/天)内秒分布数据
        #     time_regex = "{}*".format(time_[:i])
        #     keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
        #     if keys_ and len(keys_) > 1:
        #         # 需要排序
        #         keys = []
        #         for k in keys_:
        #             keys.append(k)
        #         keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        #         # if i == 4:
        #         #    keys=keys[:5]
        #         # 有2个元素
        #         for index in range(0, len(keys) - 1):
        #             time_1 = keys[index].split("-")[-1]
        #             time_2 = keys[index + 1].split("-")[-1]
        #             if int(time_1) <= int(time_) <= int(time_2):
        #                 # 在此时间范围内
        #                 if time_ == time_2:
        #                     key = keys[index + 1]
        #                 else:
        #                     key = keys[index]
        #                 break
        #         if key:
        #             break
        # # 如果没有找到匹配的区间
        # if not key:
        #     # 最后一条数据的时间为相应的区间
        #     total_datas = local_today_datas[code]
        #
        # if key:
        #     val = cls.__get_redis().get(key)
        #     old_num, old_from, old_to = cls.__format_second_money_record_val(val)
        #     end_index = old_to
        #     # 保存最近的数据
        #     cls.__set_l2_latest_money_record(code, end_index, num)
        #     logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
        else:
            # 卖撤
            if L2DataUtil.is_sell_cancel(data["val"]):
                # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
                if l2_data_util.is_sell_index_before_target(data, buy_single_data,
                                                            local_today_num_operate_map.get(code)):
                    return 0
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().delete(key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, random_key, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
                     with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
        start_time = round(time.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        # 坐标-量的map
        num_dict = {}
        # 统计时间分布
        time_dict = {}
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict:
                time_dict[time_] = i
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict_num:
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(time.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        record_msg = f"同花顺买1信息 {total_num},{index}"
        if index == -1:
            # 没有获取到最新的矫正封单额,需要从买入信号开始点计算
            index = buy_single_begin_index - 1
            total_num = 0
        cancel_index = None
        cancel_msg = None
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        min_volumn_big = min_volumn * 5
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 获取最大封单额
        max_buy1_volume = cls._thsBuy1VolumnManager.get_max_buy1_volume(code)
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            try:
                if big_money_num_manager.is_big_num(data["val"]):
                    if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                        cancel_big_num_count += int(data["re"])
                        # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                        # 获取是否在买入执行信号周围2s
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                         local_today_num_operate_map.get(
                                                                                             code))
                        if buy_index is not None and buy_data is not None:
                            # 相差1s
                            buy_time = buy_data["val"]["time"]
                            if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                                cancel_big_num_count += int(data["re"])
                    elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                        cancel_big_num_count -= int(data["re"])
            except Exception as e:
                logging.exception(e)
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
                if cancel_big_num_count < 10:
                    threshold_rate = threshold_rate - cancel_big_num_count * 0.01
                else:
                    threshold_rate = threshold_rate - 10 * 0.01
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
                time_start_index_dict[time_] = i
                # 记录时间分布
                time_list.append(time_)
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 在处理数据的范围内,就需要判断是否要撤单了
            if start_index <= i <= end_index:
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >= 30:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
                            break
                    # 累计封单金额小于1000万
                    if total_num < min_volumn:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            cancel_index = i
                            cancel_msg = "封单金额小于1000万,为{}".format(total_num)
                            break
                    # 相邻2s内的数据减小50%
                    # 上1s的总数
                    last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                    if last_second_total_volumn > 0 and (
                            last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            # 相邻2s内的数据减小50%
                            cancel_index = i
                            cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                             total_num)
                        break
                    # 记录中有上2个数据
                    if len(time_list) >= 2:
                        # 倒数第2个数据
                        last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
                        if last_2_second_total_volumn > 0:
                            if last_2_second_total_volumn > last_second_total_volumn > total_num:
                                dif = last_2_second_total_volumn - total_num
                                if dif / last_2_second_total_volumn >= threshold_rate:
                                    # 与执行位相隔>=5s时规则生效
                                    if exec_time_offset >= 5:
                                        cancel_index = i
                                        cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                             last_2_second_total_volumn,
                                                                                                             last_second_total_volumn,
                                                                                                             total_num)
                                    break
        if not with_cancel:
            cancel_index = None
        print("封单额计算时间:", round(time.time() * 1000) - start_time)
        process_end_index = end_index
        if cancel_index:
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, random_key, round(time.time() * 1000) - start_time,
                            "l2数据封单额计算时间",
                            False)
        if cancel_index:
            l2_log.cancel_debug(random_key, code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
                                total_num)
            return total_datas[cancel_index], cancel_msg
        return None, None
# ---------------------------------板上卖-----------------------------
# 涨停卖统计
class L2LimitUpSellStatisticUtil:
    _redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 新增卖数据
    @classmethod
    def __incre_sell_data(cls, code, num):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().incrby(key, num)
    @classmethod
    def __get_sell_data(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return 0
        return int(val)
    @classmethod
    def __save_process_index(cls, code, index):
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), index)
    @classmethod
    def __get_process_index(cls, code):
        key = "limit_up_sell_index-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return -1
        return int(val)
    # 清除数据,当取消成功与买入之前需要清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().delete(key)
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    @classmethod
    def clear(cls):
        keys = cls.__get_redis().keys("limit_up_sell_num-*")
        for k in keys:
            cls.__get_redis().delete(k)
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, random_key, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        # 大于自由流通市值的4.8%
        threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        total_datas = local_today_datas.get(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
                if total_num > threshold_num:
                    cancel_index = i
                    break
        if cancel_index is not None:
            process_index = cancel_index
        else:
            process_index = end_index
        l2_log.cancel_debug(random_key, code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
                            threshold_num)
        cls.__save_process_index(code, process_index)
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
        return None, ""