Administrator
2023-01-02 ed9e2367eea9baa6c8bea82e0f81c209ffb2a56f
l2_data_manager_new.py
@@ -6,6 +6,7 @@
import big_money_num_manager
import code_data_util
import constant
import global_util
import gpcode_manager
import industry_codes_sort
@@ -14,12 +15,14 @@
import l2_data_util
import l2_trade_factor
import l2_trade_test
import l2_trade_util
import limit_up_time_manager
import redis_manager
import ths_industry_util
import tool
import trade_manager
import trade_queue_manager
import trade_data_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
@@ -177,7 +180,6 @@
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -194,64 +196,86 @@
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
                # ---------- 判断是否需要计算大单 -----------
                try:
                    average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(code)
                    # 计算平均大单
                    if average_need:
                        end_index = local_today_datas[code][-1]["index"]
                        if len(add_datas) > 0:
                            end_index = add_datas[-1]["index"]
                        AverageBigNumComputer.compute_average_big_num(code, buy_exec_index, buy_single_index, end_index)
                except Exception as e:
                    logging.exception(e)
                # -------------数据增量处理------------
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
                    # 第1条数据是否为09:30:00
                    if add_datas[0]["val"]["time"] == "09:30:00":
                        if global_util.cuurent_prices.get(code):
                            price_data = global_util.cuurent_prices.get(code)
                            if price_data[1]:
                                # 当前涨停价,设置涨停时间
                                logger_l2_process.info("开盘涨停:{}", code)
                                # 保存涨停时间
                                limit_up_time_manager.save_limit_up_time(code, "09:30:00")
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                try:
                    if len(add_datas) > 0:
                        latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                        # 时间差不能太大才能处理
                        # TODO 暂时关闭处理
                        if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time):
                            # 判断是否已经挂单
                            state = trade_manager.get_trade_state(code)
                            start_index = len(total_datas) - len(add_datas)
                            end_index = len(total_datas) - 1
                            if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                                # 已挂单
                                cls.__process_order(code, start_index, end_index, capture_timestamp)
                            else:
                                # 未挂单
                                cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                        logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                               add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                               capture_timestamp)
                        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    l2_data_manager.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        if len(add_datas) > 0:
            now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
            # ---------- 判断是否需要计算大单 -----------
            try:
                average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                  end_index)
            except Exception as e:
                logging.exception(e)
            try:
                average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                        end_index)
            except Exception as e:
                logging.exception(e)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
                if global_util.cuurent_prices.get(code):
                    price_data = global_util.cuurent_prices.get(code)
                    if price_data[1]:
                        # 当前涨停价,设置涨停时间
                        logger_l2_process.info("开盘涨停:{}", code)
                        # 保存涨停时间
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2_data_manager.L2DataUtil.is_same_time(now_time_str,
                                                       latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(
                code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    cls.__process_order(code, start_index, end_index, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp)
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -260,8 +284,7 @@
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -279,9 +302,31 @@
            return
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        # 撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   buy_exec_index, start_index,
                                                                                   end_index)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "申报时间截至大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        # 撤单计算,看分钟级大单撤单
        try:
            b_need_cancel, b_cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index,
                                                                             start_index, end_index)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "1分钟内大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        if not cancel_data:
            # 统计板上卖
@@ -294,13 +339,18 @@
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        if cancel_data:
            if cancel_data["index"] == 175:
                print("进入调试")
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            cls.cancel_buy(code, cancel_msg)
            # 继续计算下单
            cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
            if cls.cancel_buy(code, cancel_msg):
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
            else:
                # 撤单尚未成功
                pass
        else:
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
@@ -336,9 +386,11 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                    SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
@@ -355,28 +407,31 @@
    # 是否可以取消
    @classmethod
    def __can_cancel(cls, code):
        if constant.TEST:
            return True, ""
        # 暂时注释掉
        # 14点后如果是板块老大就不需要取消了
        now_time_str = tool.get_now_time_str()
        if int(now_time_str.replace(":", "")) >= 140000:
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None:
                # 同一板块中老二后面的不能买
                if codes_index.get(code) == 0:
                    return False, "14:00后老大不能撤单"
                elif codes_index.get(code) == 1:
                    # 判断老大是否都是09:30:00涨停的
                    # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
                    first_count = 0
                    for key in codes_index:
                        if codes_index[key] == 0:
                            first_count += 1
                            if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
                                first_count -= 1
                    if first_count == 0:
                        return False, "14:00后老大都开盘涨停,老二不能撤单"
        # now_time_str = tool.get_now_time_str()
        # if int(now_time_str.replace(":", "")) >= 140000:
        #     industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        #     if industry is None:
        #         return True, "没有获取到行业"
        #     codes_index = industry_codes_sort.sort_codes(codes, code)
        #     if codes_index is not None and codes_index.get(code) is not None:
        #         # 同一板块中老二后面的不能买
        #         if codes_index.get(code) == 0:
        #             return False, "14:00后老大不能撤单"
        #         elif codes_index.get(code) == 1:
        #             # 判断老大是否都是09:30:00涨停的
        #             # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
        #             first_count = 0
        #             for key in codes_index:
        #                 if codes_index[key] == 0:
        #                     first_count += 1
        #                     if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
        #                         first_count -= 1
        #             if first_count == 0:
        #                 return False, "14:00后老大都开盘涨停,老二不能撤单"
        return True, ""
@@ -414,11 +469,10 @@
        except Exception as e:
            logging.exception(e)
        # 量比超过1.1的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.1:
            return False, "最大量比超过1.1不能买"
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
@@ -496,15 +550,20 @@
            l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code, msg=None):
        can_cancel, reason = cls.__can_cancel(code)
        if not can_cancel:
            # 不能取消
            cls.cancel_debug(code, "撤单中断,原因:{}", reason)
            return
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
            total_datas = local_today_datas[code]
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
                    return False
        l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
@@ -515,16 +574,26 @@
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
                # 不能取消
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
        return True
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -600,10 +669,27 @@
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   compute_index,
                                                                                   buy_single_index, compute_index,
                                                                                   True)
                # 分钟级大单计算
                # need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                                              buy_single_index, compute_index, True)
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
                if need_cancel:
                    if cls.cancel_buy(code, "分钟级大单撤销"):
                        # 执行撤单成功
                        pass
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                   buy_single_index, compute_index, False)
                SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                                                        buy_single_index, compute_index, False)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
@@ -743,11 +829,23 @@
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买
        trigger_buy = True
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        max_space_time = 2
        if place_order_count <= 0:
            max_space_time = 2
        elif place_order_count <= 1:
            max_space_time = 6 - 1
        else:
            max_space_time = 9 - 1
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续3秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 2:
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
@@ -761,42 +859,48 @@
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count += int(total_datas[i]["re"])
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code, i,
                                             buy_nums,
                                             threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    trigger_buy = True
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        buy_count -= int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None:
                        # 找到买撤数据的买入点
                        if buy_index >= buy_single_index:
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count -= int(total_datas[i]["re"])
                            cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
                                cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                    else:
                        # 未找到买撤数据的买入点
                        cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count():
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy:
                return i, buy_nums, buy_count, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
@@ -1194,7 +1298,7 @@
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            cancel_index = i
                            cancel_msg = "封单金额小于1000万"
                            cancel_msg = "封单金额小于1000万,为{}".format(total_num)
                            break
                    # 相邻2s内的数据减小50%
                    # 上1s的总数
@@ -1227,14 +1331,15 @@
                # ------大单撤处理-------
                # if total_num < min_volumn_big:
                if exec_time_offset < 1800:
                    try:
                        b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, buy_exec_index, i, i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "1分钟内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                    pass
                    # try:
                    #     b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, i, i)
                    #     if b_need_cancel:
                    #         cancel_index = b_cancel_index
                    #         cancel_msg = "1分钟内大单撤销比例触发阈值"
                    #         break
                    # except Exception as e:
                    #     logging.exception(e)
                # 30分钟外才执行
                elif 1800 <= exec_time_offset <= 5400:
                    try:
@@ -1322,7 +1427,8 @@
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = int(zyltgb * 0.015) // (limit_up_price * 100)
        # 大于自由流通市值的4.8%
        threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
@@ -1332,7 +1438,7 @@
                continue
            if i <= process_index:
                continue
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
@@ -1343,7 +1449,9 @@
            process_index = cancel_index
        else:
            process_index = end_index
        # 保存处理的位置
        L2TradeDataProcessor.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
                                          threshold_num)
        cls.__save_process_index(code, process_index)
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
@@ -1355,6 +1463,259 @@
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# s级平均大单计算
# 计算范围到申报时间的那一秒
class SecondAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "s_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_cancel_data(cls, code, cancel_index):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_cancel_datas(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().smembers(key)
        return val
    # 保存买撤数据
    @classmethod
    def __save_apply_time(cls, code, time_str):
        key = "s_average_big_num_apply_time-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), time_str)
    # 获取买撤的数据
    @classmethod
    def __get_apply_time(cls, code):
        key = "s_average_big_num_apply_time-{}".format(code)
        val = cls.__getRedis().get(key)
        return val
    @classmethod
    def __clear_data(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        key = "s_average_big_num_comput_info-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        key = "s_average_big_num-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        apply_time_second = int(cls.get_apply_time(code).replace(":", ""))
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if int(val["time"].replace(":", "")) > apply_time_second:
                # 重新设置计算结束位置
                end_index = i - 1
                break
            if L2DataUtil.is_limit_up_price_buy(val):  # and float(val["price"]) * int(val["num"]) > 7500:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    if int(val["time"].replace(":", "")) > apply_time_second:
                        break
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        average_num = round(5900/ gpcode_manager.get_limit_up_price(code))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["time"].replace(":", "")) > apply_time_second:
                    break
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        total_data = local_today_datas[code]
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <= 1:
                    cancel_rate_threshold = 0.49
                elif place_order_count <= 2:
                    ancel_rate_threshold = 0.549
                else:
                    ancel_rate_threshold = 0.59
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5:
            # 有5s时间上传申报时间
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 设置申报时间
    @classmethod
    def set_apply_time(cls, code, time_str, force=False):
        old_time_str = cls.get_apply_time(code)
        if not force:
            if old_time_str is not None:
                sub_time = tool.trade_time_sub(time_str, old_time_str)
                if sub_time <= 0 or sub_time > 4:
                    # 申报时间与下单时间不能操过4s
                    return
        cls.__save_apply_time(code, time_str)
    @classmethod
    def get_apply_time(cls, code):
        return cls.__get_apply_time(code)
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        for i in range(buy_single_index, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, i, i)
            if cancel:
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        cls.__test(("000716", 410, 420, 461, 536))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        # 执行是否需要撤销
# 平均大单计算
@@ -1384,19 +1745,18 @@
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
    def __save_cancel_data(cls, code, cancel_index):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((cancel_count, process_index)))
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_compute_info(cls, code):
    def __get_cancel_datas(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
        val = cls.__getRedis().smembers(key)
        return val
    @classmethod
    def __clear_data(cls, code):
@@ -1405,10 +1765,22 @@
        key = "average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        key = "average_big_num_comput_info-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        key = "average_big_num-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_exec_index, start_index, end_index):
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
@@ -1416,11 +1788,23 @@
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
            if L2DataUtil.is_limit_up_price_buy(val) and float(val["price"]) * int(val["num"]) >= 5000:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        average_num = num // count
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        #average_num = 0
        average_num = round(5900 / gpcode_manager.get_limit_up_price(code))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
@@ -1428,53 +1812,87 @@
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均大单:", average_num, average_up_count)
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <=1:
                    cancel_rate_threshold=0.49
                elif place_order_count <=2:
                    ancel_rate_threshold = 0.549
                else:
                    ancel_rate_threshold = 0.59
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        process_index = i
                        print("撤销大单", cancel_count)
                        if cancel_count / average_up_count >= 0.49:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code):
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif t.time() - data[0] < 0.5:
            # 500ms内的数据才需要计算average
            cls.__place_order_time_dict.pop(code)
        elif tool.trade_time_sub(latest_data["val"]["time"], total_datas[data[2]]["val"]["time"]) < 3:
            # 3s内的数据才需要计算average
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 下单成功
@@ -1484,7 +1902,7 @@
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.compute_average_big_num(code, buy_exec_index, buy_single_index, total_data[-1]["index"])
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
@@ -1497,18 +1915,18 @@
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        if cls.is_need_compute_average(code):
            cls.compute_average_big_num(code, buy_exec_index, buy_single_index, datas[3])
        for i in range(buy_exec_index, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        for i in range(buy_single_index, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        # cls.__test(("601579", 311, 319, 347, 404))
        cls.__test(("601579", 311, 319, 327, 404))
        cls.__test(("000716", 410, 420, 461, 536))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        # 执行是否需要撤销
@@ -1645,6 +2063,7 @@
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = random.randint(0, 100000)
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
@@ -1662,13 +2081,19 @@
    @classmethod
    def test(cls):
        cls.__test(("002528", 212, 219, 372, 601))
        cls.__test(("003005", 212, 219, 372, 601))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        cls.__test(("002793", 292, 308, 332, 410))
        # 执行是否需要撤销
if __name__ == "__main__":
    L2LimitUpSellStatisticUtil.test()
    print(t.time())
    # AverageBigNumComputer.test()
    # LongAverageBigNumComputer.test()
    # L2TradeDataProcessor.test()
    load_l2_data("600213")
    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
                                                                     local_today_num_operate_map.get(
                                                                         "600213"))
    print(buy_index, buy_data)