Administrator
2023-02-05 1252c9489b631905fbce608109260760537b224f
l2_data_manager_new.py
@@ -23,6 +23,8 @@
import trade_manager
import trade_queue_manager
import trade_data_manager
from l2 import safe_count_manager
from l2.cancel_buy_strategy import SecondCancelBigNumComputer
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
@@ -159,6 +161,8 @@
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    @classmethod
    def debug(cls, code, content, *args):
@@ -178,8 +182,8 @@
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
    def process(cls, code, datas, capture_timestamp, do_id):
        cls.random_key[code] = do_id
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -201,8 +205,10 @@
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    l2_data_manager.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                    __start_time = round(t.time() * 1000)
                    l2_data_manager.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
@@ -216,31 +222,31 @@
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
            # ---------- 判断是否需要计算大单 -----------
            try:
                average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                  end_index)
            except Exception as e:
                logging.exception(e)
            # try:
            #     average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
            #         code, local_today_datas[code][-1])
            #     # 计算平均大单
            #     if average_need:
            #         end_index = local_today_datas[code][-1]["index"]
            #         if len(add_datas) > 0:
            #             end_index = add_datas[-1]["index"]
            #         AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
            #                                                       end_index)
            # except Exception as e:
            #     logging.exception(e)
            try:
                average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                        end_index)
            except Exception as e:
                logging.exception(e)
            # try:
            #     average_need, buy_single_index, buy_exec_index = SecondCancelBigNumComputer.is_need_compute_average(
            #         code, local_today_datas[code][-1])
            #     # 计算平均大单
            #     if average_need:
            #         end_index = local_today_datas[code][-1]["index"]
            #         if len(add_datas) > 0:
            #             end_index = add_datas[-1]["index"]
            #         SecondCancelBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
            #                                                             end_index)
            # except Exception as e:
            #     logging.exception(e)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -253,7 +259,8 @@
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
        __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
@@ -275,7 +282,8 @@
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
            __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
                                               "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -284,7 +292,8 @@
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时")
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -300,24 +309,37 @@
        if end_index < start_index:
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                      local_today_num_operate_map.get(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-获取买入信息耗时")
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-买1统计耗时")
        # 撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   buy_exec_index, start_index,
                                                                                   end_index)
            b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  buy_exec_index, start_index,
                                                                                  end_index, total_data)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "申报时间截至大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-s级大单估算")
        # 撤单计算,看分钟级大单撤单
        try:
@@ -336,13 +358,17 @@
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-板上卖耗时")
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-m值大单计算")
        if cancel_data:
            if cancel_data["index"] == 175:
                print("进入调试")
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            if cls.cancel_buy(code, cancel_msg):
@@ -351,6 +377,8 @@
            else:
                # 撤单尚未成功
                pass
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "已下单-撤单+处理剩余数据")
        else:
            # 如果有虚拟下单需要真实下单
@@ -361,12 +389,15 @@
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-真实下单")
        # 判断是否需要计算长大单的信息
        try:
            LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index)
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-计算长大单")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
@@ -388,11 +419,16 @@
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 下单成功,需要删除最大买1
                cls.__thsBuy1VolumnManager.clear_max_buy1_volume(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,None)
                    SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
@@ -494,7 +530,7 @@
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code,total_datas[-1]["val"]["time"]):
        if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
            # 水下捞且板块中的票小于16不能买
            # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
            #         industry) <= 16:
@@ -562,11 +598,11 @@
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        total_datas = local_today_datas[code]
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                code)
            total_datas = local_today_datas[code]
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = tool.get_now_time_str()
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
@@ -588,6 +624,8 @@
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
            # 撤单成功
            cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
@@ -597,7 +635,7 @@
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
@@ -622,7 +660,8 @@
                continue_count = 2
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), continue_count, compute_end_index)
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                               compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
@@ -632,7 +671,7 @@
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "下单信号计算时间")
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "下单信号计算时间")
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
@@ -642,12 +681,14 @@
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "计算m值大单")
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "计算m值大单")
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,compute_start_index),compute_end_index,num,count,threshold_money,buy_single_index,max_num_set)
        _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "纯买额统计时间")
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(
            buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index,
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "纯买额统计时间")
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
@@ -675,14 +716,18 @@
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index,
                                                     buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据")
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                              "记录执行买入数据", force=True)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   compute_index,
                                                                                   buy_single_index, compute_index,
                                                                                   True)
                need_cancel, cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  compute_index,
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas,
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 分钟级大单计算
                # need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                                              buy_single_index, compute_index, True)
@@ -697,13 +742,18 @@
            else:
                # AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                   buy_single_index, compute_index, False)
                SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                                                        buy_single_index, compute_index, False)
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理撤单步骤耗时", force=True)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
@@ -773,15 +823,6 @@
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 是否为万手哥
    @classmethod
    def __is_big_money(cls, limit_up_price, val):
        if int(val["num"]) >= constant.BIG_MONEY_NUM:
            return True
        if int(val["num"]) * limit_up_price >= constant.BIG_MONEY_AMOUNT:
            return True
        return False
    # 计算万手哥笔数
    @classmethod
@@ -868,7 +909,7 @@
                            return None, buy_nums, buy_count, ii, max_buy_num_set
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price, _val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count += int(total_datas[i]["re"])
                    max_buy_num_set.add(i)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
@@ -882,7 +923,7 @@
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if cls.__is_big_money(limit_up_price, _val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    # 只统计59万以上的金额
@@ -911,8 +952,15 @@
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 需要的最小大单笔数
            big_num_count = 2
            if place_order_count > 1:
                # 第一次下单需要大单最少2笔,以后只需要1笔
                big_num_count = 1
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(max_buy_num_set)>1:
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
@@ -1140,35 +1188,68 @@
        logger_buy_1_volumn.info("涨停封单量矫正:代码-{} 量-{} 时间-{}", code, num, time_str)
        time_ = time_str.replace(":", "")
        key = None
        for i in range(4, -2, -2):
            # 获取本(分钟/小时/天)内秒分布数据
            time_regex = "{}*".format(time_[:i])
            keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
            if keys_ and len(keys_) > 1:
                # 需要排序
                keys = []
                for k in keys_:
                    keys.append(k)
                keys.sort(key=lambda tup: int(tup.split("-")[-1]))
                # 有2个元素
                for index in range(0, len(keys) - 1):
                    time_1 = keys[index].split("-")[-1]
                    time_2 = keys[index + 1].split("-")[-1]
                    if int(time_1) <= int(time_) <= int(time_2):
                        # 在此时间范围内
                        if time_ == time_2:
                            key = keys[index + 1]
                        else:
                            key = keys[index]
                        break
            if key:
                val = cls.__get_redis().get(key)
                old_num, old_from, old_to = cls.__format_second_money_record_val(val)
                end_index = old_to
                # 保存最近的数据
                cls.__set_l2_latest_money_record(code, end_index, num)
                logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
        # 获取矫正时间前1分钟的数据
        keys = []
        for i in range(0, 3600):
            temp_time = tool.trade_time_add_second(time_str, 0 - i)
            # 只处理9:30后的数据
            if int(temp_time.replace(":", "")) < int("093000"):
                break
            keys_ = cls.__get_l2_second_money_record_keys(code, temp_time.replace(":", ""))
            if len(keys_) > 0:
                keys.append(keys_[0])
            if len(keys) >= 1:
                break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
            val = cls.__get_redis().get(key)
            old_num, old_from, old_to = cls.__format_second_money_record_val(val)
            end_index = old_to
            # 保存最近的数据
            cls.__set_l2_latest_money_record(code, end_index, num)
            logger_buy_1_volumn.info("涨停封单量矫正成功:代码-{} 位置-{} 量-{}", code, end_index, num)
        else:
            logger_buy_1_volumn.info("涨停封单量矫正失败:代码-{} 时间-{} 量-{}", code, time_str, num)
        # 取消此种方法
        #
        # for i in range(4, -2, -2):
        #     # 获取本(分钟/小时/天)内秒分布数据
        #     time_regex = "{}*".format(time_[:i])
        #     keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
        #     if keys_ and len(keys_) > 1:
        #         # 需要排序
        #         keys = []
        #         for k in keys_:
        #             keys.append(k)
        #         keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        #         # if i == 4:
        #         #    keys=keys[:5]
        #         # 有2个元素
        #         for index in range(0, len(keys) - 1):
        #             time_1 = keys[index].split("-")[-1]
        #             time_2 = keys[index + 1].split("-")[-1]
        #             if int(time_1) <= int(time_) <= int(time_2):
        #                 # 在此时间范围内
        #                 if time_ == time_2:
        #                     key = keys[index + 1]
        #                 else:
        #                     key = keys[index]
        #                 break
        #         if key:
        #             break
        # # 如果没有找到匹配的区间
        # if not key:
        #     # 最后一条数据的时间为相应的区间
        #     total_datas = local_today_datas[code]
        #
        # if key:
        #     val = cls.__get_redis().get(key)
        #     old_num, old_from, old_to = cls.__format_second_money_record_val(val)
        #     end_index = old_to
        #     # 保存最近的数据
        #     cls.__set_l2_latest_money_record(code, end_index, num)
        #     logger_buy_1_volumn.info("涨停封单量矫正结果:代码-{} 位置-{} 量-{}", code, end_index, num)
    # 计算量,用于涨停封单量的计算
    @classmethod
@@ -1385,7 +1466,8 @@
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间",
        l2_data_log.l2_time(code, L2TradeDataProcessor.random_key[code], round(t.time() * 1000) - start_time,
                            "l2数据封单额计算时间",
                            False)
        if cancel_index:
            L2TradeDataProcessor.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
@@ -1486,313 +1568,6 @@
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# s级平均大单计算
# 计算范围到申报时间的那一秒
class SecondAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "s_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_cancel_data(cls, code, cancel_index):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_cancel_datas(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().smembers(key)
        return val
    # 保存买撤数据
    @classmethod
    def __save_apply_time(cls, code, time_str):
        key = "s_average_big_num_apply_time-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), time_str)
    # 获取买撤的数据
    @classmethod
    def __get_apply_time(cls, code):
        key = "s_average_big_num_apply_time-{}".format(code)
        val = cls.__getRedis().get(key)
        return val
    # 保存结束位置
    @classmethod
    def __save_end_index(cls, code, end_index):
        key = "s_average_big_num_end_index_set-{}".format(code)
        cls.__getRedis().sadd(key, end_index)
    @classmethod
    def __list_end_indexs(cls, code):
        key = "s_average_big_num_end_index_set-{}".format(code)
        vals = cls.__getRedis().smembers(key)
        if vals is None:
            return None
        results = []
        for val in vals:
            results.append(int(val))
        results.sort()
        return results
    @classmethod
    def __clear_data(cls, code):
        ks = ["s_average_big_num_comput_info-{}".format(code), "s_average_big_num-{}".format(code),
              "s_average_big_num_end_index_set-{}".format(code)]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["s_average_big_num_comput_info-*", "s_average_big_num-*", "s_average_big_num_end_index_set-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        cls.__save_end_index(code, end_index)
        # 保存结束位置
        end_indexs = cls.__list_end_indexs(code)
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        apply_time = cls.get_apply_time(code)
        apply_time_second = int(apply_time.replace(":", ""))
        for ei in end_indexs:
            if int(total_data[ei]["val"]["time"].replace(":", "")) >= apply_time_second:
                end_index = ei
                break
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # if int(val["time"].replace(":", "")) > apply_time_second:
            #     # 重新设置计算结束位置
            #     end_index = i - 1
            #     break
            if L2DataUtil.is_limit_up_price_buy(val):  # and float(val["price"]) * int(val["num"]) > 7500:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    if int(val["time"].replace(":", "")) > apply_time_second:
                        break
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        average_num = min(constant.BIG_MONEY_NUM,
                          round(constant.BIG_MONEY_AMOUNT / gpcode_manager.get_limit_up_price(code)))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["time"].replace(":", "")) > apply_time_second:
                    break
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        L2TradeDataProcessor.cancel_debug(code, "s级是否需要撤单,数据范围:{}-{}  平均大单信息-({},{},{},{})", start_index, end_index,
                                          average_num, average_up_count, a_start_index, a_end_index)
        if average_num is None:
            return False, None
        total_data = local_today_datas[code]
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
                else:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                     val["cancelTimeUnit"])
                    # 只判断S级撤销,只有s级撤销才有可能相等
                    if max_space - min_space <= 1:
                        buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                        if int(total_data[a_start_index]["val"]["time"].replace(":", "")) <= int(
                                buy_time.replace(":", "")) <= int(
                            total_data[a_end_index]["val"]["time"].replace(":", "")):
                            cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                L2TradeDataProcessor.cancel_debug(code, "s级大单 取消数量:{}", len(cancel_datas))
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <= 1:
                    cancel_rate_threshold = 0.49
                elif place_order_count <= 2:
                    cancel_rate_threshold = 0.59
                else:
                    cancel_rate_threshold = 0.69
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5:
            # 有5s时间上传申报时间
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 设置申报时间
    @classmethod
    def set_apply_time(cls, code, time_str, force=False):
        old_time_str = cls.get_apply_time(code)
        if not force:
            if old_time_str is not None:
                sub_time = tool.trade_time_sub(time_str, old_time_str)
                if sub_time <= 0 or sub_time > 4:
                    # 申报时间与下单时间不能操过4s
                    return
        cls.__save_apply_time(code, time_str)
    @classmethod
    def get_apply_time(cls, code):
        return cls.__get_apply_time(code)
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, buy_single_index, buy_exec_index,
                                              False)
        for i in range(buy_exec_index + 1, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, buy_single_index, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        # cls.__test(("000909", 607, 646, 646, 694))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        cls.__save_end_index("000333", 200)
        cls.__save_end_index("000333", 101)
        cls.__save_end_index("000333", 99)
        cls.__save_end_index("000333", 120)
        cls.__save_end_index("000333", 126)
        cls.__save_end_index("000333", 126)
        print(cls.__list_end_indexs("000333"))
        # 执行是否需要撤销
# 平均大单计算
@@ -2102,26 +1877,27 @@
                    if count >= constant.H_CANCEL_BUY_COUNT:
                        end_index = i
                        break
        # logging.info(f"H撤大单笔数,{count}")
        # 获取大单数量
        average_up_count = 0
        average_up_total_num = 0
        average_num = round(num / count)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if int(val["num"]) >= average_num:
                average_up_count += data["re"]
                average_up_total_num += data["re"] * int(val["num"])
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, count, start_index, end_index)
        cls.__save_average_data(code, average_num, average_up_total_num, count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
        average_num, average_up_total_num, total_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        cancel_num, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        # 14:30过后不再守护
        if int(total_data[end_index]["val"]["time"].replace(":", "")) > int("143000"):
@@ -2141,14 +1917,13 @@
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        cancel_num += data["re"] * int(val["num"])
                        process_index = i
                        sj = 0  # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
                        print("h平均大单计算结果:", "取消数量", cancel_count, "大单总数", average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.75:
                        print("h平均大单计算结果:", "取消手数", cancel_num, "大单手数", average_up_total_num)
                        if cancel_num / average_up_total_num >= constant.H_CANCEL_RATE:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
            cls.__save_compute_info(code, cancel_num, process_index)
        return False, None
    # 下单成功
@@ -2190,7 +1965,7 @@
    # AverageBigNumComputer.test()
    # LongAverageBigNumComputer.test()
    # L2TradeDataProcessor.test()
    SecondAverageBigNumComputer.test()
    L2LimitUpMoneyStatisticUtil.verify_num("601958", 89178, "13:22:45")
    # load_l2_data("600213")
    #
    # buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],