Administrator
2024-04-02 1176a493039ded49d888b451793bc272e18a9b5a
l2/l2_data_manager_new.py
@@ -10,6 +10,7 @@
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
from l2.l2_sell_manager import L2MarketSellManager, L2LimitUpSellManager
from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager
from l2.transaction_progress import TradeBuyQueue
from log_module import async_log_util, log_export
from third_data import kpl_data_manager, block_info, history_k_data_util
@@ -677,6 +678,7 @@
                    info = cls.__trade_log_placr_order_info_dict[code]
                    info.mode = order_begin_pos.mode
                    info.set_buy_index(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
                    info.set_sell_info(order_begin_pos.sell_info)
                    if jx_blocks:
                        info.set_kpl_blocks(list(jx_blocks))
                    elif jx_blocks_by:
@@ -690,7 +692,11 @@
                        elif not can_buy_result[0] and not can_buy_result[1]:
                            info.set_kpl_match_blocks(["非独苗不满足身位"])
                        else:
                            info.set_kpl_match_blocks(can_buy_result[0])
                            temps = []
                            temps.extend(can_buy_result[0])
                            if can_buy_result[5]:
                                temps.append(f"激进买入:{can_buy_result[5]}")
                            info.set_kpl_match_blocks(temps)
                    trade_record_log_util.add_place_order_log(code, info)
                except Exception as e:
                    async_log_util.error(logger_l2_error, f"加入买入记录日志出错:{str(e)}")
@@ -1295,24 +1301,36 @@
        new_get_single = False
        buy_single_index = order_begin_pos.buy_single_index
        if buy_single_index is None:
            # 尝试计算快速成交信号
            has_single, _index, sell_info = cls.__compute_fast_order_begin_pos(code, compute_start_index,
                                                                               compute_end_index)
            # ------------------------------确定信号种类----------------------------------
            # 第一步:获取激进下单信号
            continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count()
            has_single, _index, sell_info = cls.__compute_active_order_begin_pos(code, continue_count, max(
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), compute_end_index)
            fast_msg = None
            if has_single:
                order_begin_pos.mode = OrderBeginPosInfo.MODE_FAST
                order_begin_pos.mode = OrderBeginPosInfo.MODE_ACTIVE
                order_begin_pos.sell_info = sell_info
            elif _index is not None and _index < 0:
                fast_msg = sell_info
                continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count()
                # 有买入信号
                has_single, _index = cls.__compute_order_begin_pos(code, max(
                    (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                                   compute_end_index)
                order_begin_pos.mode = OrderBeginPosInfo.MODE_NORMAL
                # 获取最新的买信息
                sell_info = cls.__L2MarketSellManager.get_current_total_sell_data(code)
                order_begin_pos.sell_info = sell_info
            if not has_single:
                # 第二步:计算闪电下单信号
                has_single, _index, sell_info = cls.__compute_fast_order_begin_pos(code, compute_start_index,
                                                                                   compute_end_index)
                fast_msg = None
                if has_single:
                    order_begin_pos.mode = OrderBeginPosInfo.MODE_FAST
                    order_begin_pos.sell_info = sell_info
                elif _index is not None and _index < 0:
                    fast_msg = sell_info
                    # 第三步: 计算常规买入信号
                    has_single, _index = cls.__compute_order_begin_pos(code, max(
                        (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0),
                                                                       continue_count,
                                                                       compute_end_index)
                    order_begin_pos.mode = OrderBeginPosInfo.MODE_NORMAL
                    # 获取最新的买信息
                    sell_info = cls.__L2MarketSellManager.get_current_total_sell_data(code)
                    order_begin_pos.sell_info = sell_info
            # 如果买入信号与上次的买入信号一样就不能算新的信号
            if cls.__last_buy_single_dict.get(code) == _index:
                has_single = None
@@ -1350,20 +1368,14 @@
                                         f"获取的信号位无效(板上买,范围:{trade_index + 1}-{order_begin_pos.buy_single_index} 未成交总手{total_num}/阈值{thresh_hold_num})")
                            return None
                # -----------------------------买入计算初始化,计算纯买额阈值-----------------------
                cls.__last_buy_single_dict[code] = buy_single_index
                new_get_single = True
                order_begin_pos.num = 0
                order_begin_pos.count = 0
                order_begin_pos.buy_single_index = buy_single_index
                # 调整闪电下单的买入阈值
                if order_begin_pos.sell_info and order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
                    # k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code)
                    # if k_format and (k_format[1][0] or k_format[3][0]):
                    #     # 股价新高或者逼近前高
                    #     order_begin_pos.threshold_money = int(sell_info[1])
                    # else:
                    # if float(total_datas[buy_single_index]["val"]["price"]) >= 3 and cls.volume_rate_info[code][
                    #     0] > 0.3 and sell_info[1] > 2000 * 10000 and int(
                    #     tool.get_now_time_str().replace(":", "")) < int("100000"):
                    situation = cls.__MarketSituationManager.get_situation_cache()
                    if sell_info[1] > 2000 * 10000 and situation != MarketSituationManager.SITUATION_GOOD:
                        # 市场行情好时不打折
@@ -1379,6 +1391,10 @@
                            order_begin_pos.threshold_money = int(sell_info[1] * 0.8)
                    else:
                        order_begin_pos.threshold_money = int(sell_info[1])
                if order_begin_pos.sell_info and order_begin_pos.mode == OrderBeginPosInfo.MODE_ACTIVE:
                    # 总卖额的1.5倍
                    order_begin_pos.threshold_money = int(sell_info[1] * 1.5)
                l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},是否板上买:{},数据:{} 模式:{}({})", buy_single_index,
                             compute_start_index,
                             compute_end_index, cls.volume_rate_info[code], order_begin_pos.at_limit_up,
@@ -1404,7 +1420,19 @@
        # 买入纯买额统计
        new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new, not_buy_msg = None, None, None, None, [], ""
        if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
        if order_begin_pos.mode == OrderBeginPosInfo.MODE_ACTIVE:
            threshold_money = order_begin_pos.threshold_money
            new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new, max_num_set_new, not_buy_msg = cls.__sum_buy_num_for_order_active(
                code,
                start_process_index,
                compute_end_index,
                order_begin_pos.num,
                order_begin_pos.count,
                threshold_money,
                order_begin_pos.buy_single_index, order_begin_pos.max_num_set)
            threshold_money = threshold_money_new
            order_begin_pos.threshold_money = threshold_money
        elif order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
            threshold_money = order_begin_pos.threshold_money
            new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new, max_num_set_new, not_buy_msg = cls.__sum_buy_num_for_order_fast(
@@ -1604,6 +1632,71 @@
            if is_first_limit_up:
                return True, i, [refer_sell_data[0], threshold_money]
        return False, None, None
    # 计算激进买的下单信号
    @classmethod
    def __compute_active_order_begin_pos(cls, code, continue_count, start_index, end_index):
        total_datas = local_today_datas[code]
        start_time_str = total_datas[start_index]["val"]["time"]
        # 获取最近的总卖信息
        refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str)
        if refer_sell_data is None:
            return False, -1, "总卖为空"
        # 获取当前是否可激进买
        # 获取板块是否可以激进买
        # 可买的板块, 是否独苗, 消息, 可买的强势板块, 关键词, 激进买的板块
        can_buy_result = CodePlateKeyBuyManager.can_buy(code)
        if can_buy_result and can_buy_result[0] and can_buy_result[5]:
            # 有可买板块,有激进买板块
            # 第一步: 计算总卖额
            threshold_money = refer_sell_data[1]
            for i in range(start_index - 1, -1, -1):
                val = total_datas[i]["val"]
                if tool.compare_time(val["time"], refer_sell_data[0]) <= 0:
                    break
                if L2DataUtil.is_sell(val):
                    threshold_money += val["num"] * int(float(val["price"]) * 100)
                elif L2DataUtil.is_sell_cancel(val):
                    threshold_money -= val["num"] * int(float(val["price"]) * 100)
            # 第二步:计算起始信号
            second_930 = 9 * 3600 + 30 * 60 + 0
            total_datas = local_today_datas.get(code)
            if end_index - start_index + 1 < continue_count:
                return False, -1, "信号不连续"
            last_index = None
            count = 0
            start = None
            for i in range(start_index, end_index + 1):
                _val = total_datas[i]["val"]
                time_s = L2DataUtil.get_time_as_second(_val["time"])
                # 时间要>=09:30:00
                if time_s < second_930:
                    continue
                if L2DataUtil.is_limit_up_price_buy(_val):
                    # 金额要大于50万
                    if _val["num"] * float(_val["price"]) < 5000:
                        continue
                    if last_index is None or (total_datas[last_index]["val"]["time"] == total_datas[i]["val"]["time"]):
                        if start is None:
                            start = i
                        last_index = i
                        count += total_datas[i]["re"]
                        if count >= continue_count:
                            return True, start, [refer_sell_data[0], threshold_money]
                    else:
                        # 本条数据作为起点
                        last_index = i
                        count = datas[i]["re"]
                        start = i
                elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val):
                    # 剔除卖与卖撤
                    last_index = None
                    count = 0
                    start = None
            return False, -1, "未获取到激进买的起始信号"
        else:
            return False, -1, "不可激进买"
    @classmethod
    def __get_threshmoney(cls, code):
@@ -1949,6 +2042,134 @@
        return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, not_buy_msg
    # 返回(买入执行点, 总手, 总笔数, 从新计算起点, 纯买额阈值)
    # 计算激进买入
    @classmethod
    def __sum_buy_num_for_order_active(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                       threshold_money_origin, buy_single_index, max_num_set):
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code)
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        limit_up_price = float(limit_up_price)
        threshold_money = threshold_money_origin
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        # buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        # 间隔最大时间为3s
        max_space_time_ms = 3 * 1000
        # 不下单的信息
        not_buy_msg = ""
        max_buy_num_set = set(max_num_set)
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续2秒内的数据
            if L2DataUtil.time_sub_as_ms(_val, total_datas[buy_single_index]["val"]) > max_space_time_ms:
                cls.__TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms"
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, buy_count, ii, threshold_money, max_buy_num_set, f"【{i}】信号不连续,囊括时间-{max_space_time_ms}ms"
            if L2DataUtil.is_sell(_val):
                threshold_money += _val["num"] * int(float(_val["price"]) * 100)
                threshold_num = round(threshold_money / (limit_up_price * 100))
            elif L2DataUtil.is_sell_cancel(_val):
                threshold_money -= _val["num"] * int(float(_val["price"]) * 100)
                threshold_num = round(threshold_money / (limit_up_price * 100))
            # 涨停买
            elif L2DataUtil.is_limit_up_price_buy(_val):
                if l2_data_util.is_big_money(_val):
                    max_buy_num_set.add(i)
                trigger_buy = True
                # 只统计59万以上的金额
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    l2_log.info(code, logger_l2_trade_buy,
                                f"{code}获取到买入执行点(激进下单):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 判断买入位置是否在买入信号之前
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
                                                                                                    local_today_buyno_map.get(
                                                                                                        code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        max_buy_num_set.discard(buy_index)
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        buy_count -= int(data["re"])
                        l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            max_buy_num_set.discard(buy_index)
                            # 大单撤销
                            l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count -= int(total_datas[i]["re"])
            l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                             buy_nums, threshold_num)
            # 计算信号位置之后的主动卖,加入到阈值之中
            sell_orders = HuaXinSellOrderStatisticManager.get_latest_transaction_datas(code, min_sell_order_no=int(
                total_datas[buy_single_index]['val']['orderNo']))
            sell_order_num = sum([x[1] for x in sell_orders]) // 100
            # 纯买额足够,且笔数大于2笔
            if buy_nums < threshold_num + sell_order_num:
                not_buy_msg = f"【{i}】纯买额不够,{buy_nums}/{threshold_num}"
                continue
            if not trigger_buy:
                not_buy_msg = f"【{i}】没有买单触发"
                continue
            if buy_count < 2:
                not_buy_msg = f"【{i}】安全笔数不足,{buy_count}/{2}"
                continue
            # max_buy_num_set = cls.__filter_not_deal_indexes(code, max_buy_num_set)
            # big_money_count_threshhold = cls.__l2PlaceOrderParamsManagerDict[code].get_big_num_count()
            # if len(max_buy_num_set) < big_money_count_threshhold:
            #     not_buy_msg = f"【{i}】大单不满足要求,需要:{big_money_count_threshhold} 总:{len(max_buy_num_set)}"
            #     continue
            try:
                info = cls.__trade_log_placr_order_info_dict[code]
                info.set_trade_factor(threshold_money, 0, [])
            except Exception as e:
                async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}")
            l2_log.buy_debug(code, f"激进买主动卖手数:{sell_order_num}")
            return i, buy_nums, buy_count, None, threshold_money, max_buy_num_set, "可以下单"
        l2_log.buy_debug(code, "尚未获取到买入执行点(激进买入),起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} ",
                         compute_start_index,
                         buy_nums,
                         threshold_num, buy_count)
        return None, buy_nums, buy_count, None, threshold_money, max_buy_num_set, not_buy_msg
def test_trade_record():
    code = "000333"