Administrator
2023-01-16 6f324f1471a5e28188e9f4206b46cbafdf09d04c
l2_data_manager.py
@@ -78,7 +78,7 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0, 0, -1
            return None, None, None, 0, 0, []
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5]
@@ -88,18 +88,18 @@
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire,
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index)))
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire,
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, max_num_index)))
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets))))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -481,929 +481,6 @@
        if int(val["operateType"]) == 2:
            return True
        return False
# L2交易数据处理器
# 一些常见的概念:
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
# 计算位置(当前计算的整个计算的位置):compute_index
#
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                load_l2_data(code)
                # 纠正数据
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                # 过时 买入确认点处理
                # TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                #                                                    total_datas[-1],
                #                                                    add_datas)
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    # if L2DataUtil.is_same_time(now_time_str, latest_time):
                    #     # 判断是否已经挂单
                    #     state = trade_manager.get_trade_state(code)
                    #     start_index = len(total_datas) - len(add_datas)
                    #     end_index = len(total_datas) - 1
                    #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    #         # 已挂单
                    #         cls.__process_order(code, start_index, end_index, capture_timestamp)
                    #     else:
                    #         # 未挂单
                    #         cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def __compute_big_money_data(cls, code, start_index, end_index):
        # 计算大单
        total_datas = local_today_datas[code]
        num = 0
        for index in range(start_index, end_index + 1):
            data = total_datas[index]
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
                elif int(data["val"]["operateType"]) == 1:
                    num -= data["re"]
        big_money_num_manager.add_num(code, num)
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    @classmethod
    def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
        index, old_buy_count, old_cancel_count = TradePointManager.get_count_info_for_cancel_buy(code)
        for i in range(start_index, end_index + 1):
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
            old_buy_count += buy_count
            old_cancel_count += buy_cancel_count
            if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
                return i, True
        TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
                                                        old_cancel_count)
        return end_index, False
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        if start_index < 0:
            start_index = 0
        if end_index < start_index:
            return
        # 获取之前是否有记录的撤买信号
        #  cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
        # cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
        # if cancel_computed_index is None:
        #     logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
        # 统计群撤大单
        L2BetchCancelBigNumProcessor.process_new(code, start_index, end_index)
        # 统计最大连续买单
        L2ContinueLimitUpCountManager.process(code, start_index, end_index)
        # 计算大单撤销
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, start_index, end_index)
        if need_cancel:
            # 已经撤单了
            threshold_money = cls.__get_threshmoney(code)
            # 重新处理下单
            cls.__start_compute_buy(code, cancel_data["index"] + 1, end_index, threshold_money, capture_time)
            return
        # buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        # if cancel_index is None:
        # 无撤单信号起始点记录
        continue_cancel = L2ContinueLimitUpCountManager.get_continue_count(code)
        order_cancel_begin_start = max(start_index - (continue_cancel - 1),
                                       0) if new_add else start_index
        order_cancel_begin_end = end_index
        total_datas = local_today_datas[code]
        little_cancel = False
        # 大单撤单的数据不为空
        if cancel_data is not None:
            # 小群撤事件
            continue_cancel = 5
            cancel_time_seconds = L2DataUtil.get_time_as_second(cancel_data["val"]["time"])
            # 查找上一秒与下一秒
            for i in range(int(cancel_data["index"]), 0, -1):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"][
                    "time"] and cancel_time_seconds - L2DataUtil.get_time_as_second(total_datas[i]["val"]["time"]) > 1:
                    order_cancel_begin_start = i + 1
                    break
            for i in range(int(cancel_data["index"]), len(local_today_datas[code])):
                # 查找上一秒和下一秒
                if total_datas[i]["val"]["time"] != cancel_data["val"]["time"] and L2DataUtil.get_time_as_second(
                        total_datas[i]["val"]["time"]) - cancel_time_seconds > 1:
                    order_cancel_begin_end = i - 1
                    break
            cls.cancel_debug(code, "小群撤事件计算范围:{},{}", order_cancel_begin_start, order_cancel_begin_end)
            little_cancel = True
        cancel_start_index = None
        cancel_end_index = None
        need_cancel = False
        if little_cancel:
            # 小群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_little_begin_single(code,
                                                                                                  order_cancel_begin_start
                                                                                                  , continue_cancel,
                                                                                                  order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到小群撤信号,撤单信号范围:{}-{}", cancel_start_index, cancel_end_index)
                # 有小群撤信号
                need_cancel = True
            else:
                # 不满足小群撤,从小群撤后面一条数据继续处理
                cls.__process_order(code, cancel_data["index"] + 1, end_index, capture_time, False)
                return
        else:
            # 大群撤事件
            cancel_start_index, cancel_end_index = cls.__compute_order_cancel_begin_single(
                code, order_cancel_begin_start
                , continue_cancel, order_cancel_begin_end)
            if cancel_start_index is not None:
                cls.debug(code, "找到大群撤信号,连续笔数阈值:{}, 撤单信号范围:{}-{}", continue_cancel, cancel_start_index,
                          cancel_end_index)
                # 判断是否有大群撤大单撤
                need_cancel = L2BetchCancelBigNumProcessor.need_cancel(code, cancel_start_index, cancel_end_index)
                if need_cancel:
                    cls.debug(code, "大群撤信号有大单撤销")
                else:
                    cls.debug(code, "大群撤信号无大单撤销")
        if need_cancel:
            # 需要撤买
            cls.cancel_buy(code)
            if cancel_end_index >= end_index:
                return
            # 继续处理下单信号
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_buy(code, cancel_end_index + 1, end_index, threshold_money, capture_time, False)
        else:
            # 是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
    # 过时 开始计算撤的信号
    @classmethod
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        cancel_single = cancel_index is not None
        computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
                                                                                           origin_num, threshold_money,
                                                                                           cancel_single)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
                      total_datas[computed_index])
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                cls.debug(code, "之前无虚拟下单,执行真实撤单")
                cls.__cancel_buy(code)
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time, False)
                pass
        else:
            cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"],
                      buy_num_for_cancel)
            # 无需撤买,设置计算信息
            TradePointManager.set_compute_info_for_cancel_buy(code, int(total_datas[-1]["index"]), buy_num_for_cancel)
            # 判断是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
            else:
                # 终止执行
                pass
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second(
                "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code):
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
            buy_single_index = _index
            if has_single:
                num = 0
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
                # 重置大单计算
                big_money_num_manager.reset(code)
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # TODO 可能存在问题 计算大单数量
        cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index)
        # 买入纯买额统计
        compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                           compute_start_index),
                                                                                 compute_end_index, num,
                                                                                 threshold_money, buy_single_index,
                                                                                 capture_time)
        if rebegin_buy_pos is not None:
            # 需要重新计算纯买额
            cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
            return
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
            buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, buy_single_index, compute_index)
            TradePointManager.set_count_info_for_cancel_buy(code, compute_index, b_buy_count + buy_count,
                                                            b_buy_cancel_count + buy_cancel_count)
            # 计算大单(从买入信号起始点到挂单执行点),返回是否取消
            cancel_result, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, buy_single_index,
                                                                                       compute_index)
            # 计算大群撤的大单
            L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
            # 连续涨停数计算
            L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                if not cancel_result:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 如果还没撤单,就继续处理已下单的步骤
                if not cancel_result:
                    cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                else:
                    cls.__start_compute_buy(code, compute_index + 1, compute_end_index, threshold_money, capture_time,
                                            False)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums)
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, max_num_index = TradePointManager.get_buy_compute_start_data(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, max_num_index
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return False, None
        __time = None
        last_index = None
        count = 0
        start = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (
                    i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
                if start is None:
                    start = i
                last_index = i
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                last_index = None
                count = 0
                start = None
        return False, None
    # 大群撤事件,最多相隔1s
    @classmethod
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count, end_index):
        datas = local_today_datas[code]
        if end_index - start_index + 1 < continue_count:
            return None, None
        count = 0
        start = -1
        start_time = None
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 小群撤事件
    @classmethod
    def __compute_order_cancel_little_begin_single(cls, code, start_index, continue_count, end_index=None):
        # 必须为同一秒的数据
        same_second = True
        datas = local_today_datas[code]
        __len = len(datas)
        if len(datas) - start_index < continue_count:
            return None, None
        count = 0
        start = -1
        start_time = None
        if end_index is None:
            end_index = __len - continue_count
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            _timestamp = L2DataUtil.get_time_as_second(_val["time"])
            if _timestamp < second_930:
                continue
            # 间隔时间不能多于1s
            if L2DataUtil.is_limit_up_price_buy_cancel(_val) and (start_time is None or _timestamp - start_time < 2):
                if start == -1:
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
                count = 0
                start_time = None
        if count >= continue_count:
            return start, end_index
        else:
            return None, None
    # 虚拟下单
    def __unreal_order(self):
        pass
    @classmethod
    def __get_threshmoney(cls, code):
        money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return money
    # 获取预估挂买位
    @classmethod
    def __get_sure_order_pos(cls, code):
        index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
            return 1, index, data
    # 过时 统计买入净买量
    @classmethod
    def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        for i in range(compute_start_index, len(total_datas)):
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    cls.debug(code, "获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", i, buy_nums, threshold_num)
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        cls.debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index, buy_nums,
                  threshold_num)
        return None, buy_nums
    # 过时 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        property_buy_num_count = 0
        same_time_property = cls.__get_same_time_property(code)
        for i in range(compute_start_index, len(total_datas)):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.buy_debug(code, "买入信号点之前同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                          buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num >= threshold_num:
                return i, buy_nums + property_buy_num
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums + property_buy_num,
                      threshold_num)
        return None, buy_nums + property_buy_num
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, threshold_money,
                                  buy_single_index,
                                  capture_time):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, None
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, ii
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num:
                return i, buy_nums, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums,
                      threshold_num)
        return None, buy_nums, None
    # 计算买入信号之前的且和买入信号数据在同一时间的数量
    @classmethod
    def __count_l2_data_before_for_cancel(cls, code, buy_single_index):
        total_data = local_today_datas[code]
        single_time = total_data[buy_single_index]["val"]["time"]
        buy_count = 0
        cancel_count = 0
        for i in range(buy_single_index, -1, -1):
            if single_time == total_data[i]["val"]["time"]:
                if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    buy_count += int(total_data[i]["re"])
                elif L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                    cancel_count += int(total_data[i]["re"])
            else:
                break
        return buy_count, cancel_count
    @classmethod
    def __count_l2_data_for_cancel(cls, code, start_index, end_index):
        total_data = local_today_datas[code]
        buy_count = 0
        cancel_count = 0
        for i in range(start_index, end_index + 1):
            if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                buy_count += int(total_data[i]["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(total_data[i]["val"]):
                cancel_count += int(total_data[i]["re"])
        return buy_count, cancel_count
    # 同一时间买入的概率计算
    @classmethod
    def __get_same_time_property(cls, code):
        # 计算板块热度
        industry = global_util.code_industry_map.get(code)
        if industry is not None:
            hot_num = global_util.industry_hot_num.get(industry)
            if hot_num is not None:
                return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
        return 0.5
    # 过时 统计买撤净买量
    @classmethod
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{} 是否有撤单信号:{}", start_index, len(total_datas) - 1, sure_pos,
                         cancel_single)
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if i < sure_pos:
                    buy_nums += int(_val["num"]) * int(data["re"])
                elif sure_data["val"]["time"] == _val["time"]:
                    # 同一秒买入,而且还在预估买入位之后
                    property_buy_num_count += int(_val["num"]) * int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
                        if sure_data["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                             buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num <= threshold_num and cancel_single:
                return i, buy_nums + property_buy_num, sure_type
        buy_num_news = buy_nums + round(property_buy_num_count * same_time_property)
        cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news)
        return None, buy_num_news, sure_type
        # 统计买撤净买量
    @classmethod
    def __count_num_for_cancel_order(cls, code, start_index, origin_buy_num, origin_cancel_num, min_rate,
                                     betch_cancel_single=True):
        buy_nums = origin_buy_num
        buy_cancel_num = origin_cancel_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                buy_cancel_num += int(data["re"])
            # 有撤单信号,且小于阈值
            if (buy_nums - buy_cancel_num) / buy_cancel_num <= min_rate and betch_cancel_single:
                return i, buy_nums, buy_cancel_num
        return None, buy_nums, buy_cancel_num
    @classmethod
    def test(cls):
        code = "000593"
        load_l2_data(code, True)
        if False:
            state = trade_manager.get_trade_state(code)
            cls.random_key[code] = random.randint(0, 100000)
            capture_timestamp = 1999988888
            try:
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    # 已挂单
                    cls.__process_order(code, 201, 237, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, 201, 237, capture_timestamp)
            except Exception as e:
                logging.exception(e)
            return
        _start = t.time()
        # 按s批量化数据
        total_datas = local_today_datas[code]
        start_time = total_datas[0]["val"]["time"]
        start_index = 0
        for i in range(0, len(total_datas)):
            if total_datas[i]["val"]["time"] != start_time:
                cls.random_key[code] = random.randint(0, 100000)
                # 处理数据
                start = start_index
                # if start != 201:
                #     continue
                end = i - 1
                print("处理进度:{},{}".format(start, end))
                capture_timestamp = 1999999999
                state = trade_manager.get_trade_state(code)
                try:
                    if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        # 已挂单
                        cls.__process_order(code, start, end, capture_timestamp)
                    else:
                        # 未挂单
                        cls.__process_not_order(code, start, end, capture_timestamp)
                except Exception as e:
                    logging.exception(e)
                # t.sleep(1)
                start_index = i
                start_time = total_datas[i]["val"]["time"]
        print("时间花费:", round((t.time() - _start) * 1000))
    @classmethod
    def test1(cls):
        code = "000593"
        load_l2_data(code, True)
        print(cls.__compute_order_begin_pos(code, 232, 3, 239))
    @classmethod
    def test2(cls):
        code = "600082"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123)
    @classmethod
    def test_can_order(cls):
        code = "000948"
        global_data_loader.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
# 连续涨停买单数最大值管理器
class L2ContinueLimitUpCountManager: