Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
l2/l2_data_manager_new.py
@@ -190,7 +190,8 @@
            # 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
            if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                # 获取买入信号
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], buyno_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
                                                                                                    buyno_map)
                if buy_index is not None and buy_index < begin_pos:
                    continue
@@ -216,6 +217,70 @@
        # print("m值大单计算范围:{}-{}  时间:{}".format(max(start_index, processed_index), end_index,
        #                                      round(t.time() * 1000) - start_time))
class HumanRadicalBuySingleManager:
    """
    人为买入管理
    """
    # 人为下单标记:{"代码":(信号时间, 间隔时间,信号截至时间, radical_result)}
    __human_radical_buy_mark_info = {}
    @classmethod
    def add_single(cls, code, latest_data, radical_result):
        """
        添加买入信号
        @param code:
        @param latest_data:
        @param radical_result:
        @return:
        """
        start_time_with_ms = L2DataUtil.get_time_with_ms(latest_data["val"])
        if tool.is_sh_code(code):
            cls.__human_radical_buy_mark_info[code] = (
                start_time_with_ms, 400, tool.trade_time_add_millionsecond(start_time_with_ms, 3000), radical_result)
        else:
            cls.__human_radical_buy_mark_info[code] = (
                start_time_with_ms, 30, tool.trade_time_add_millionsecond(start_time_with_ms, 2000), radical_result)
    @classmethod
    def remove_single(cls, code):
        """
        移除信号
        @param code:
        @return:
        """
        if code in cls.__human_radical_buy_mark_info:
            cls.__human_radical_buy_mark_info.pop(code)
    @classmethod
    def has_single(cls, code):
        """
        是否有信号
        @param code:
        @return:
        """
        if code in cls.__human_radical_buy_mark_info:
            return True
        return False
    @classmethod
    def is_valid(cls, code, data):
        """
        信号是否有效
        @param code:
        @return: 是否有效,无效消息/有效对象
        """
        if code not in cls.__human_radical_buy_mark_info:
            return False, "没有人买入信号"
        single_time_ms, space_time_ms, expire_time_ms, _ = cls.__human_radical_buy_mark_info[code]
        now_time_ms = L2DataUtil.get_time_with_ms(data["val"])
        if tool.trade_time_sub_with_ms(now_time_ms,
                                       expire_time_ms) > 0:
            cls.__human_radical_buy_mark_info.pop(code)
            async_log_util.info(logger_l2_not_buy_reasons, f"{code}#大单足够,人为下单: 超过信号生效时间-{now_time_ms}/{expire_time_ms}")
            return False, "超过信号生效时间"
        return True, cls.__human_radical_buy_mark_info[code]
class L2TradeDataProcessor:
@@ -328,9 +393,11 @@
    def set_real_place_order_index(cls, code, index, order_begin_pos: OrderBeginPosInfo, last_data):
        trade_record_log_util.add_real_place_order_position_log(code, index, order_begin_pos.buy_single_index)
        total_datas = local_today_datas.get(code)
        use_time = tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(total_datas[index]["val"]) , L2DataUtil.get_time_with_ms(
            total_datas[order_begin_pos.buy_exec_index]["val"]))
        trade_record_log_util.add_place_order_use_time(code, f"执行位时间:{L2DataUtil.get_time_with_ms(total_datas[order_begin_pos.buy_exec_index]['val'])} 耗时:{use_time}")
        use_time = tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(total_datas[index]["val"]),
                                               L2DataUtil.get_time_with_ms(
                                                   total_datas[order_begin_pos.buy_exec_index]["val"]))
        trade_record_log_util.add_place_order_use_time(code,
                                                       f"执行位时间:{L2DataUtil.get_time_with_ms(total_datas[order_begin_pos.buy_exec_index]['val'])} 耗时:{use_time}")
        l2_log.debug(code, "设置真实下单位:{}", index)
        cancel_buy_strategy.set_real_place_position(code, index, order_begin_pos.buy_single_index, is_default=False)
        # 获取真实下单位置之后需要判断F撤
@@ -395,7 +462,7 @@
        finally:
            if datas:
                l2.l2_data_util.save_l2_data(code, None, datas)
            origin_datas.clear()
            # origin_datas.clear()
    @classmethod
    def __recompute_real_order_index(cls, code, pre_real_order_index, order_info, compute_type):
@@ -825,11 +892,12 @@
        average_rate = cls.__Buy1PriceManager.get_average_rate(code)
        if average_rate:
            if tool.is_ge_code(code):
                if average_rate < 0.8:
                if average_rate < 0.08:
                    return False, True, f"均价涨幅({average_rate})小于8%", True
            else:
                if average_rate < 0.04:
                    return False, True, f"均价涨幅({average_rate})小于4%", True
        return True, False, f"", False
    @classmethod
@@ -971,12 +1039,21 @@
                                                             local_today_datas.get(code))
                return False
            else:
                l2_log.debug(code, "可以下单,原因:{}, 下单模式:{}", reason, order_begin_pos.mode)
                try:
                try:
                    # 判断是否为首封下单
                    order_begin_pos.first_limit_up_buy = radical_buy_data_manager.is_first_limit_up_buy(code)
                    if not constant.CAN_BUY_FIRST_LIMIT_UP and order_begin_pos.first_limit_up_buy:
                        reason = "首封不下单"
                        l2_log.debug(code, "不可以下单,原因:{}", reason)
                        trade_record_log_util.add_cant_place_order_log(code, reason)
                        cls.__break_current_batch_data_for_buy_dict[code] = True
                        trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index,
                                                                 order_begin_pos.buy_exec_index,
                                                                 local_today_datas.get(code))
                        return False
                    l2_log.debug(code, "可以下单,原因:{}, 下单模式:{} 最小订单号:{}", reason, order_begin_pos.mode, order_begin_pos.min_order_no)
                    l2_log.debug(code, "开始执行买入")
                    trade_manager.start_buy(code, capture_timestamp, last_data,
                                            last_data_index, order_begin_pos.mode, order_begin_pos.buy_exec_index)
@@ -1444,10 +1521,36 @@
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
        # ---------计算激进买入的信号---------
        radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index)
        # 不需要根据人为下单来下单
        # if not HumanRadicalBuySingleManager.has_single(code):
        #     # ---------计算激进买入的信号---------
        #     radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index)
        # else:
        #     human_radical_result = cls.__compute_human_radical_order_begin_pos(code, compute_start_index,
        #                                                                        compute_end_index)
        #     l2_log.debug(code, f"大单足够,人为下单计算结果({compute_start_index}-{compute_end_index}):{human_radical_result}")
        #     if human_radical_result[0]:
        #         radical_result = list(human_radical_result[2])
        #         # 改变执行位置
        #         radical_result[1] = human_radical_result[1]["index"]
        #     else:
        #         radical_result = None
        if radical_result[0]:
        radical_result = cls.__compute_radical_order_begin_pos(code, compute_start_index, compute_end_index)
        if radical_result and radical_result[0]:
            # if not HumanRadicalBuySingleManager.has_single(code):
            #     big_order_deal_enough_result = radical_buy_data_manager.is_big_order_deal_enough(code,
            #                                                                                      code_volumn_manager.CodeVolumeManager().get_volume_rate_refer_in_5days(
            #                                                                                          code), 0)
            #     if big_order_deal_enough_result[6] <= 0:
            #         HumanRadicalBuySingleManager.add_single(code, total_datas[-1], radical_result)
            #         async_log_util.info(logger_l2_not_buy_reasons, f"{code}#大单足够,需要根据人为下单({compute_start_index}-{compute_end_index}):{radical_result[1]}")
            #         return
            # #下单前一步,移除人为下单信号
            # is_human_radical_buy = HumanRadicalBuySingleManager.has_single(code)
            # HumanRadicalBuySingleManager.remove_single(code)
            buy_single_index, buy_exec_index = radical_result[1], radical_result[1]
            buy_volume_rate = cls.volume_rate_info[code][0]
            refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, total_datas[buy_single_index]["val"][
@@ -1467,7 +1570,9 @@
                                                     mode=OrderBeginPosInfo.MODE_RADICAL,
                                                     mode_desc=f"大单不足扫入:{radical_result[2]}",
                                                     sell_info=sell_info,
                                                     threshold_money=threshold_money)
                                                     threshold_money=threshold_money,
                                                     min_order_no=radical_result[5]
                                                     )
            order_begin_pos_info.at_limit_up = cls.__is_at_limit_up_buy(code)
            ordered = cls.__process_with_find_exec_index(code, order_begin_pos_info, compute_end_index,
                                                         block_info=radical_result[3])
@@ -1920,25 +2025,26 @@
        @param code:
        @param start_index:
        @param end_index:
        @return: (是否获取到信号, 信号位置, 扫入板块/消息, 扫入板块大单流入信息, 需要监听的大单)
        @return: (是否获取到信号, 信号位置, 扫入板块/消息, 扫入板块大单流入信息, 需要监听的大单, 统计上板大单成交的最小订单号)
        """
        # 激进买信号的时间
        def __can_order():
            # 判断是否是板上放量
            if cls.__is_at_limit_up_buy(code, start_index):
                return False, None, "板上放量", None
            # if cls.__is_at_limit_up_buy(code, start_index):
            #     return False, None, "板上放量", None
            total_datas = local_today_datas[code]
            limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            bigger_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code))
            min_num = int(bigger_money / limit_up_price / 100)
            bigger_money_num, current_min_num, total_min_num = int(bigger_money / limit_up_price / 100), int(
                bigger_money / limit_up_price / 100), int(5000 / limit_up_price)
            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, radical_data[3])
            # 参考总卖额
            refer_sell_money = 0
            if refer_sell_data:
                refer_sell_money = refer_sell_data[1]
            # 判断还需大单的金额(max(每次上板大单,累计成交大单))
            big_order_deal_enough_result = radical_buy_data_manager.is_big_order_deal_enough(code,
                                                                                             code_volumn_manager.CodeVolumeManager().get_volume_rate_refer_in_5days(
                                                                                                 code),
@@ -1947,7 +2053,8 @@
                                                                                             is_almost_open_limit_up=
                                                                                             radical_data[5])
            # 缺乏的大单金额
            lack_money = big_order_deal_enough_result[3]
            current_lack_money = int(big_order_deal_enough_result[5])
            total_lack_money = int(big_order_deal_enough_result[6])
            # 如果有大单成交就不需要看大单
            if constant.CAN_RADICAL_BUY_NEED_BIG_ORDER_EVERYTIME:
                # 每次下单都需要大单
@@ -1958,10 +2065,14 @@
                    # 60s以上就不下单了
                    return False, None, "距离上次统计大单时间过去60s", set()
            if lack_money == 0:
                if not tool.is_sh_code(code):
                    # 非上证的票看50w
                    min_num = int(5000 / limit_up_price)
            if max(current_lack_money, total_lack_money) <= 0:
                # 已经不缺少大单了
                # if not tool.is_sh_code(code):
                # 非上证的票看50w
                current_min_num = int(5000 / limit_up_price)
            # 如果累计大单成交足够,只需看50w
            # if big_order_deal_enough_result[4]:
            #     min_num = int(5000 / limit_up_price)
            # 需要监听的大单
            watch_indexes = set()
            # 总委托大单金额
@@ -1977,8 +2088,6 @@
                val = data["val"]
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if val["num"] < min_num:
                    continue
                # 撤单不算
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                         total_datas,
@@ -1993,23 +2102,40 @@
                        # 判断是否为大单
                        order_money = dealing_active_order_info[2] + round(val["price"], 2) * val["num"] * 100
                        if order_money >= bigger_money:
                            lack_money -= order_money
                            watch_indexes.add(i)
                            if lack_money < 0:
                            if val["num"] >= bigger_money_num:
                                watch_indexes.add(i)
                            if val["num"] >= current_min_num:
                                current_lack_money -= order_money
                            if val["num"] >= total_min_num:
                                total_lack_money -= order_money
                            if max(current_lack_money, total_lack_money) < 0:
                                single_index = i
                                break
                if int(val["orderNo"]) <= radical_data[1]:
                    # 主动买单后的数据不算
                    continue
                watch_indexes.add(i)
                lack_money -= round(val["price"], 2) * val["num"] * 100
                if lack_money < 0:
                if val["num"] >= bigger_money_num:
                    watch_indexes.add(i)
                if val["num"] >= current_min_num:
                    current_lack_money -= round(val["price"], 2) * val["num"] * 100
                if val["num"] >= total_min_num:
                    total_lack_money -= round(val["price"], 2) * val["num"] * 100
                if max(current_lack_money, total_lack_money) < 0:
                    single_index = i
                    break
            if single_index is not None:
                return True, single_index, "有大单", watch_indexes
            return False, None, f"大单不足:{trade_index}-{end_index}  缺少的大单-{lack_money}", watch_indexes
                every_time_big_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code)
                if every_time_big_orders:
                    min_order_no = min(min(every_time_big_orders, key=lambda e: e[0])[0], radical_data[1])
                else:
                    min_order_no = radical_data[1]
                return True, single_index, f"有大单,大单情况:{big_order_deal_enough_result[1]}", watch_indexes, min_order_no
            return False, None, f"大单不足:{trade_index}-{end_index}  缺少的大单-{max(current_lack_money, total_lack_money)}  大单情况:{big_order_deal_enough_result[1]}", watch_indexes, None
        radical_data = RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict.get(code)
        record_codes = radical_buy_data_manager.BlockPlaceOrderRecordManager().get_codes()
@@ -2030,7 +2156,7 @@
                return False, None, "超过生效时间"
        result = __can_order()
        l2_log.debug(code, f"L2扫入判断:{result}")
        l2_log.debug(code, f"L2扫入判断({start_index}-{end_index}):{result}")
        if result[0]:
            # 已经扫入下过单且允许板上放量扫入的就需要判断板上放量的距离
            if is_radical_buy and constant.CAN_RADICAL_BUY_AT_LIMIT_UP:
@@ -2053,10 +2179,67 @@
            # 如果板上放量不可买入就需要删除信号
            if not constant.CAN_RADICAL_BUY_AT_LIMIT_UP and code in RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict:
                RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict.pop(code)
            return True, result[1], radical_data[2], radical_data[4], result[3]
            return True, result[1], radical_data[2], radical_data[4], result[3], result[4]
        else:
            async_log_util.info(logger_l2_not_buy_reasons, f"{code}#{result[2]}")
        return result
    @classmethod
    def __compute_human_radical_order_begin_pos(cls, code, start_index, end_index):
        """
        处理跟人买
        @param code:
        @param start_index:
        @param end_index:
        @return:
        """
        total_datas = local_today_datas.get(code)
        result = HumanRadicalBuySingleManager.is_valid(code, total_datas[start_index])
        if not result[0]:
            return False, None, result[1]
        result = result[1]
        single_time_ms, space_time_ms, expire_time_ms, radical_result = result[0], result[1], result[2], result[3]
        bigger_num = l2_data_util.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code),
                                                    tool.is_ge_code(code)) // (
                             gpcode_manager.get_limit_up_price_as_num(code) * 100)
        canceled_buyno_map = local_today_buyno_map.get(code)
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val["num"] < bigger_num:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     canceled_buyno_map)
            if left_count == 0:
                continue
            # 判断是否超过生效时间
            if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(total_datas[i]["val"]),
                                           expire_time_ms) > 0:
                HumanRadicalBuySingleManager.remove_single(code)
                return False, None, "超过信号生效时间"
            is_valid = False
            # 判断距离上个50w买单的时间是否超过了space_time_ms
            buy_exec_index = radical_result[1]
            for ii in range(i - 1, buy_exec_index, -1):
                data_child = total_datas[ii]
                val_child = data_child["val"]
                if not L2DataUtil.is_limit_up_price_buy(val_child):
                    continue
                if val_child["num"] * float(val_child["price"]) < 5000:
                    continue
                if tool.trade_time_sub_with_ms(L2DataUtil.get_time_with_ms(val),
                                               L2DataUtil.get_time_with_ms(val_child)) > space_time_ms:
                    is_valid = True
                    break
            if is_valid:
                return True, data, radical_result
        return False, None, "没有有效信号"
    # 总卖额参考时间使用记录
    __refer_sell_used_times = {}
@@ -2070,6 +2253,8 @@
        @param end_index:
        @return: 信号信息(信号位,执行位), 消息, 可买入的板块
        """
        if True:
            return None, "此条不生效", None
        if not tool.is_sz_code(code):
            return None, "非深证的票", None
        # 判断抛压是否大于5000w
@@ -2230,7 +2415,8 @@
                                f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 判断买入位置是否在买入信号之前
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], buy_no_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
                                                                                                    buy_no_map)
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
@@ -2370,7 +2556,8 @@
                                f"{code}获取到买入执行点(积极下单):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 判断买入位置是否在买入信号之前
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i], buyno_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
                                                                                                    buyno_map)
                if buy_index is not None:
                    # 找到买撤数据的买入点