Administrator
2025-04-22 0a99d6aeec309b658786a403774c0e65fd97740e
l2/l2_transaction_data_processor.py
@@ -33,6 +33,8 @@
class HuaXinTransactionDatasProcessor:
    __statistic_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=constant.HUAXIN_L2_MAX_CODES_COUNT + 2)
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    # 非涨停成交时间
    __not_limit_up_time_dict = {}
    # 计算成交进度
    @classmethod
@@ -60,9 +62,11 @@
        @return:
        """
        @dask.delayed
        def statistic_big_buy_data():
            use_time_list = []
            __start_time = time.time()
            buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
            use_time_list.append((time.time() - __start_time, "买单统计"))
            if buy_datas:
                BigOrderDealManager().add_buy_datas(code, buy_datas)
                active_big_buy_orders = []
@@ -72,6 +76,7 @@
                            # (买单号, 成交金额, 最后成交时间)
                            active_big_buy_orders.append((x[0], x[2], x[4]))
                EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
                use_time_list.append((time.time() - __start_time, "买单统计结果处理"))
            try:
                is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                if is_placed_order:
@@ -89,26 +94,40 @@
                                                                           order_begin_pos.buy_single_index)
            except Exception as e:
                logger_debug.exception(e)
            if use_time_list and use_time_list[-1][0] > 0.005:
                l2_log.info(code, hx_logger_l2_upload,
                            f"买单统计+处理耗时:{use_time_list[-1][0]}  详情:{use_time_list}")
            return buy_datas
        @dask.delayed
        def statistic_big_sell_data():
            use_time_list = []
            __start_time = time.time()
            sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
            if sell_datas:
                BigOrderDealManager().add_sell_datas(code, sell_datas)
            use_time_list.append((time.time() - __start_time, "卖单统计"))
            if use_time_list and use_time_list[-1][0] > 0.005:
                l2_log.info(code, hx_logger_l2_upload,
                            f"卖单统计+处理耗时:{use_time_list[-1][0]}  详情:{use_time_list}")
            return sell_datas
        @dask.delayed
        def statistic_big_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 并行处理买单与卖单
        f1 = statistic_big_buy_data()
        f2 = statistic_big_sell_data()
        dask_result = statistic_big_data(f1, f2)
        buy_datas, sell_datas = dask_result.compute()
        if False and len(fdatas) > 100:
            # 并行处理买单与卖单
            # 超过100条数据才需要并行处理
            f1 = dask.delayed(statistic_big_buy_data)()
            f2 = dask.delayed(statistic_big_sell_data)()
            dask_result = dask.delayed(statistic_big_data)(f1, f2)
            buy_datas, sell_datas = dask_result.compute()
        else:
            buy_datas = statistic_big_buy_data()
            sell_datas = statistic_big_sell_data()
        # L撤的比例与买卖大单无直接关系了
        # if buy_datas or sell_datas:
        #     buy_money = BigOrderDealManager().get_total_buy_money(code)
@@ -134,16 +153,23 @@
        temp_time_dict.clear()
        __start_time = time.time()
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 设置成交价
        try:
            current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
            if limit_up_price > fdatas[-1][0][1]:
                # 没有涨停
                EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                radical_buy_strategy.clear_data(code)
        except:
            pass
            if not fdatas[-1][2]:
                if code not in cls.__not_limit_up_time_dict:
                    cls.__not_limit_up_time_dict[code] = fdatas[-1][5]
                last_time = cls.__not_limit_up_time_dict[code]
                # 炸板时间持续500ms以上算炸板
                if tool.trade_time_sub_with_ms(fdatas[-1][5], last_time) > 500:
                    # 没有涨停
                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                    radical_buy_strategy.clear_data(code, msg=f"没有涨停:{fdatas[-1][0]}")
            else:
                if code in cls.__not_limit_up_time_dict:
                    cls.__not_limit_up_time_dict.pop(code)
        except Exception as e:
            async_log_util.error(logger_debug, f"L2成交开板计算错误:{str(e)}")
        total_datas = l2_data_util.local_today_datas.get(code)
        use_time_list = []
@@ -171,21 +197,12 @@
            _start_time = time.time()
            try:
                last_data = fdatas[-1]
                # 统计上板时间
                try:
                    for d in fdatas:
                        if d[1]:
                            # 主动买
                            if d[2]:
                                # 涨停
                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
                        else:
                            # 主动卖(板上)
                            if d[2]:
                                L2LimitUpSellDataManager.clear_data(code)
                                break
                except:
                    pass
                if last_data[1] and last_data[2]:
                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                if not last_data[1] and last_data[2]:
                    L2LimitUpSellDataManager.clear_data(code)
                big_sell_order_info = None
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
@@ -239,11 +256,10 @@
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                  total_datas)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
                l2_log.info(code, logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, buy_progress_index)
                if is_placed_order:
                    NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                                  buy_progress_index)
                    # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                    #                                               buy_progress_index)
                    LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                               buy_progress_index,
                                                               total_datas)
@@ -268,11 +284,10 @@
                                                                cancel_type=trade_constant.CANCEL_TYPE_W)
                        except:
                            pass
                    SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                                                                  buy_progress_index)
                    HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                                                                     buy_progress_index)
                    # SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                    #                                               buy_progress_index)
                    # HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
                    #                                                  buy_progress_index)
            else:
                pass
            if is_placed_order:
@@ -344,7 +359,7 @@
        # =====格式化数据=====
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        use_time_list = []
        __start_time = int(time.time()*1000)
        __start_time = int(time.time() * 1000)
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
@@ -356,7 +371,7 @@
            d[4] = d[5][:8]
        temp_time_dict.clear()
        _start_time = int(time.time() * 1000)
        use_time_list.append((_start_time - __start_time , "数据整形"))
        use_time_list.append((_start_time - __start_time, "数据整形"))
        try:
@@ -364,7 +379,7 @@
            # 设置成交价
            try:
                current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
                if limit_up_price > fdatas[-1][0][1]:
                if not fdatas[-1][2]:
                    # 没有涨停
                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                    radical_buy_strategy.clear_data(code)
@@ -379,8 +394,7 @@
                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                elif not last_data[1] and last_data[2]:
                    # 涨停主动卖
                    if last_data[2]:
                        L2LimitUpSellDataManager.clear_data(code)
                    L2LimitUpSellDataManager.clear_data(code)
            except:
                pass
@@ -395,7 +409,7 @@
                _start_time = int(time.time() * 1000)
                use_time_list.append((_start_time - __start_time, "处理涨停卖"))
                # 回调数据
                if filter_datas:
                if filter_datas is not None:
                    l2_log.info(code, logger_l2_trade, f"最后一笔涨停卖被吃:{filter_datas[0]}")
                    data_callback.l2_trade_single_callback.OnLastLimitUpSellDeal(code, filter_datas[0][0])
@@ -423,4 +437,4 @@
            _start_time = int(time.time() * 1000)
            if _start_time - __start_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{_start_time - __start_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
                            f"{code}处理成交用时:{_start_time - __start_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")