Administrator
2022-12-18 86e0061f9cf211b98252a9e6b71d6c9801e4a16b
l2_data_manager_new.py
@@ -8,6 +8,7 @@
import code_data_util
import global_util
import gpcode_manager
import industry_codes_sort
import l2_data_log
import l2_data_manager
import l2_data_util
@@ -17,12 +18,12 @@
import redis_manager
import ths_industry_util
import tool
import trade_data_manager
import trade_manager
import trade_queue_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
    logger_l2_error
# TODO l2数据管理
from trade_data_manager import CodeActualPriceProcessor
@@ -154,6 +155,7 @@
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    @classmethod
    def debug(cls, code, content, *args):
@@ -192,6 +194,18 @@
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
                # ---------- 判断是否需要计算大单 -----------
                try:
                    average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(code)
                    # 计算平均大单
                    if average_need:
                        end_index = local_today_datas[code][-1]["index"]
                        if len(add_datas) > 0:
                            end_index = add_datas[-1]["index"]
                        AverageBigNumComputer.compute_average_big_num(code, buy_exec_index, buy_single_index, end_index)
                except Exception as e:
                    logging.exception(e)
                # -------------数据增量处理------------
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
@@ -208,6 +222,7 @@
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                try:
                if len(add_datas) > 0:
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
@@ -223,10 +238,12 @@
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                           capture_timestamp)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                finally:
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
@@ -239,13 +256,18 @@
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        __start_time = t.time()
        __start_time = round(t.time() * 1000)
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
    @classmethod
    def process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        cls.__process_order(code, start_index, end_index, capture_time, new_add)
    # 处理已挂单
    @classmethod
@@ -260,6 +282,14 @@
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        if not cancel_data:
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, end_index,
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
@@ -281,6 +311,12 @@
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
        # 判断是否需要计算长大单的信息
        try:
            LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index)
        except Exception as e:
            logging.exception(e)
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
@@ -300,6 +336,14 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
                    logging.exception(e)
                    logger_l2_error.exception(e)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
@@ -317,7 +361,7 @@
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
            codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None:
                # 同一板块中老二后面的不能买
                if codes_index.get(code) == 0:
@@ -348,11 +392,33 @@
        #     return False, "尚未获取到涨停价"
        # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
        #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
        # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
        try:
            sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
            cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
            if sell1_time is not None and sell1_volumn > 0:
                # 获取执行位信息
                total_datas = local_today_datas[code]
                buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                buy_nums = num
                for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    _val = total_datas[i]["val"]
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(_val):
                        # 涨停买
                        buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums < sell1_volumn:
                    return False, "纯买量({})小于卖1量{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
        except Exception as e:
            logging.exception(e)
        # 量比超过1.3的不能买
        # 量比超过1.1的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
        if volumn_rate >= 1.1:
            return False, "最大量比超过1.1不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
@@ -364,8 +430,11 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
@@ -378,14 +447,13 @@
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:30后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 16:
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了,老二就不需要买了
            # 如果老大已经买成功了, 老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
@@ -396,15 +464,15 @@
                if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 老大已经买成功了
                    return False, "老大{}已经买成功,老二无需购买".format(key)
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            for key in first_codes:
                # 找到了老大
                time_ = limit_up_time_manager.get_limit_up_time(key)
                if time_ == "09:30:00":
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
            #
            # # 有9点半涨停的老大才能买老二,不然不能买
            # # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
        # 过时  老二,本板块中涨停票数<29 不能买
        # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
@@ -452,12 +520,18 @@
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        _start_time = t.time()
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
@@ -514,7 +588,7 @@
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            # 删除之前的所有撤单信号
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
@@ -632,8 +706,8 @@
                count = 3
            count = round(count * buy1_factor)
            # 最高30笔,最低8笔
            if count > 30:
                count = 30
            if count > 21:
                count = 21
            if count < 8:
                count = 8
            return count
@@ -812,7 +886,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
@@ -833,37 +907,52 @@
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # ----此条注释-----
            # 如果老大已经买成功了,老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
                    first_codes.append(key)
            # first_codes = []
            # for key in codes_index:
            #     if codes_index.get(key) == 0:
            #         first_codes.append(key)
            #
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            for key in first_codes:
                state = trade_manager.get_trade_state(key)
                if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 老大已经买成功了
                    return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            for key in first_codes:
                # 找到了老大
                time_ = limit_up_time_manager.get_limit_up_time(key)
                if time_ == "09:30:00":
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
            # ----此条注释-----
            return True, "老二可以下单"
    @classmethod
    def test3(cls):
        code = "002693"
        code = "002094"
        load_l2_data(code, True)
        start_index = 334
        end_index = 341
        buy_single_index = 152
        cls.random_key[code] = random.randint(0, 100000)
        L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                 buy_single_index)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod
    def test_can_buy(cls):
        code = "002923"
        load_l2_data(code, True)
        limit_up_time_manager.load_limit_up_time()
        can, msg = cls.__can_buy(code)
        print(can, msg)
# 涨停封单额统计
@@ -989,6 +1078,8 @@
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
@@ -1036,6 +1127,7 @@
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        min_volumn_big = min_volumn * 5
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
@@ -1045,18 +1137,19 @@
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index])
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            try:
            if big_money_num_manager.is_big_num(data["val"]):
                if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                    cancel_big_num_count += int(data["re"])
                    # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                    # 获取是否在买入执行信号周围2s
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data["val"],
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_data is not None:
@@ -1067,6 +1160,8 @@
                elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                    cancel_big_num_count -= int(data["re"])
            except Exception as e:
                logging.exception(e)
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
@@ -1084,14 +1179,20 @@
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了
            if val < 0 and start_index <= i <= end_index:
            # 在处理数据的范围内,就需要判断是否要撤单了
            if start_index <= i <= end_index:
                # 如果是减小项
                if val < 0:
                # 累计封单金额小于1000万
                if total_num < min_volumn:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                    cancel_index = i
                    cancel_msg = "封单金额小于1000万"
                    break
@@ -1100,6 +1201,8 @@
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
@@ -1113,13 +1216,38 @@
                        if last_2_second_total_volumn > last_second_total_volumn > total_num:
                            dif = last_2_second_total_volumn - total_num
                            if dif / last_2_second_total_volumn >= threshold_rate:
                                    # 与执行位相隔>=5s时规则生效
                                    if exec_time_offset >= 5:
                                cancel_index = i
                                cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                     last_2_second_total_volumn,
                                                                                                     last_second_total_volumn,
                                                                                                     total_num)
                                break
                # ------大单撤处理-------
                # if total_num < min_volumn_big:
                if exec_time_offset < 1800:
                    try:
                        b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, buy_exec_index, i, i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "1分钟内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                # 30分钟外才执行
                elif 1800 <= exec_time_offset <= 5400:
                    try:
                        b_need_cancel, b_cancel_index = LongAverageBigNumComputer.need_cancel(code, buy_exec_index, i,
                                                                                              i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "60s-1h内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                # ------大单撤处理结束-------
        if not with_cancel:
            cancel_index = None
@@ -1173,7 +1301,7 @@
            return -1
        return int(val)
    # 清除数据
    # 清除数据,当取消成功与买入之前需要清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
@@ -1181,22 +1309,29 @@
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    @classmethod
    def clear(cls):
        keys = cls.__get_redis().keys("limit_up_sell_num-*")
        for k in keys:
            cls.__get_redis().delete(k)
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = zyltgb * 0.015 // (limit_up_price * 100)
        threshold_num = int(zyltgb * 0.015) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        total_datas = local_today_datas.get(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            total_datas = local_today_datas.get(code)
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
@@ -1210,10 +1345,330 @@
            process_index = end_index
        # 保存处理的位置
        cls.__save_process_index(code, process_index)
        return cancel_index
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
        return None, ""
    @classmethod
    def test(cls):
        code = "003005"
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# 平均大单计算
class AverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存短大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                average_up_count,
                                                                                                start_index,
                                                                                                end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((cancel_count, process_index)))
    @classmethod
    def __get_compute_info(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __clear_data(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_exec_index, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                count += data["re"]
                num += int(val["num"])
        average_num = num // count
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均大单:", average_num, average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        process_index = i
                        print("撤销大单", cancel_count)
                        if cancel_count / average_up_count >= 0.49:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code):
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif t.time() - data[0] < 0.5:
            # 500ms内的数据才需要计算average
            cls.__place_order_time_dict.pop(code)
            return True, data[1], data[2]
        return False, None, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.compute_average_big_num(code, buy_exec_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        if cls.is_need_compute_average(code):
            cls.compute_average_big_num(code, buy_exec_index, buy_single_index, datas[3])
        for i in range(buy_exec_index, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                break
    @classmethod
    def test(cls):
        # cls.__test(("601579", 311, 319, 347, 404))
        cls.__test(("601579", 311, 319, 327, 404))
        # 执行是否需要撤销
# 平均大单计算
class LongAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
        key = "l_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((average_num, average_up_count, start_index, end_index)))
    @classmethod
    def __get_average_data(cls, code):
        key = "l_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
        key = "l_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((cancel_count, process_index)))
    @classmethod
    def __get_compute_info(cls, code):
        key = "l_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __clear_data(cls, code):
        key = "l_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "l_average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, buy_exec_index):
        total_data = local_today_datas[code]
        end_index = total_data[-1]["index"]
        start_index = buy_single_index
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) < 3:
            return
        exec_time = total_data[buy_exec_index]["val"]["time"]
        o_average_num, o_average_up_count, o_start_index, o_start_index = cls.__get_average_data(code)
        if o_average_num is not None:
            return
        # 获取买入执行位后2s的数据末位
        for i in range(end_index, buy_exec_index, - 1):
            time_ = total_data[i]["val"]["time"]
            if tool.trade_time_sub(time_, exec_time) <= 2:
                end_index = i
                break
        num = 0
        count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                count += data["re"]
                num += int(val["num"])
        average_num = num / count
        average_num = round(average_num)
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        process_index = i
                        if tool.trade_time_sub(val["time"], total_data[buy_exec_index]["val"]["time"]) > 3600:
                            continue
                        sj = 0  # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
                        print("计算结果", cancel_count, average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.79:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        cls.__clear_data(code)
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_exec_index)
        for i in range(buy_exec_index + 1, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                break
    @classmethod
    def test(cls):
        cls.__test(("002528", 212, 219, 372, 601))
        cls.__test(("003005", 212, 219, 372, 601))
        # 执行是否需要撤销
if __name__ == "__main__":
    L2TradeDataProcessor.test3()
    print("----------------------")
    L2LimitUpSellStatisticUtil.test()
    print(t.time())
    # L2TradeDataProcessor.test()