Administrator
2025-03-14 c170fbd2d1cbc15d5b07cc5cdffb3e8c43901785
l2/l2_transaction_data_processor.py
@@ -8,7 +8,7 @@
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer, LCancelRateManager
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager
from l2 import l2_data_util, l2_data_manager, transaction_progress, l2_log
from l2 import l2_data_util, l2_data_manager, transaction_progress, l2_log, data_callback
from l2.cancel_buy_strategy import FCancelBigNumComputer, \
    NewGCancelBigNumComputer, \
    NBCancelBigNumComputer
@@ -18,6 +18,7 @@
from l2.l2_data_util import L2DataUtil
from l2.l2_limitup_sell_data_manager import L2LimitUpSellDataManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, HuaXinSellOrderStatisticManager, BigOrderDealManager
from l2.place_order_single_data_manager import L2TradeSingleDataProcessor
from log_module import async_log_util
from log_module.log import hx_logger_l2_debug, logger_l2_trade_buy_queue, logger_debug, hx_logger_l2_upload
from trade import current_price_process_manager, trade_constant
@@ -34,7 +35,10 @@
    # 计算成交进度
    @classmethod
    def __compute_latest_trade_progress(cls, code, buyno_map, fdatas):
    def __compute_latest_trade_progress(cls, code, fdatas):
        buyno_map = l2_data_util.local_today_buyno_map.get(code)
        if not buyno_map:
            return None
        buy_progress_index = None
        for i in range(len(fdatas) - 1, -1, -1):
            d = fdatas[i]
@@ -104,14 +108,15 @@
        f2 = statistic_big_sell_data()
        dask_result = statistic_big_data(f1, f2)
        buy_datas, sell_datas = dask_result.compute()
        if buy_datas or sell_datas:
            buy_money = BigOrderDealManager().get_total_buy_money(code)
            sell_money = BigOrderDealManager().get_total_sell_money(code)
            LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
        # L撤的比例与买卖大单无直接关系了
        # if buy_datas or sell_datas:
        #     buy_money = BigOrderDealManager().get_total_buy_money(code)
        #     sell_money = BigOrderDealManager().get_total_sell_money(code)
        #     LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
    @classmethod
    def process_huaxin_transaction_datas(cls, code, o_datas):
        # TODO 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
@@ -151,6 +156,7 @@
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            _start_time = time.time()
            # 设置涨停卖成交数据
            L2LimitUpSellDataManager.set_deal_datas(code, fdatas)
            use_time_list.append(("统计涨停卖成交", time.time() - _start_time))
            _start_time = time.time()
@@ -163,7 +169,6 @@
            use_time_list.append(("统计大单数据", time.time() - _start_time))
            _start_time = time.time()
            big_sell_order_info = None
            try:
                # 统计上板时间
                try:
@@ -180,9 +185,11 @@
                                break
                except:
                    pass
                big_sell_order_info = None
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, fdatas, limit_up_price)
                big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
                    code, fdatas,
                    limit_up_price)
                use_time_list.append(("处理卖单成交数据", time.time() - _start_time))
                _start_time = time.time()
@@ -216,7 +223,7 @@
                    use_time_list.append(("处理卖单相关撤数据", time.time() - _start_time))
                    _start_time = time.time()
                # 统计涨停卖成交
                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, fdatas, limit_up_price)
                HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
                use_time_list.append(("统计成交量数据", time.time() - _start_time))
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
@@ -226,7 +233,7 @@
            # if big_money_count > 0:
            #     LCancelRateManager.compute_big_num_deal_rate(code)
            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, fdatas)
            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -281,3 +288,120 @@
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{use_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
    @classmethod
    def process_huaxin_transaction_datas_v2(cls, code, o_datas):
        """
        新版处理华鑫成交数据:
        尚未下单的时候异步统计成交,同步遍历获取最后一个涨停卖委托数据,当最后一个涨停卖成交的时候就是下单时机
        @param code:
        @param o_datas:
        @return:
        """
        def __process_placed_order():
            """
            处理处于下单状态的数据
            @return:
            """
            try:
                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
            except Exception as e:
                async_log_util.error(hx_logger_l2_debug, f"统计大单出错:{str(e)}")
            # 统计连续的卖单数据,用于撤单,只有当下单之后才会执行
            big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
                code, fdatas,
                limit_up_price)
            LCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
            need_cancel, cancel_msg = False, ""
            cancel_type = None
            if not need_cancel:
                need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code,
                                                                                    order_begin_pos)
                cancel_type = trade_constant.CANCEL_TYPE_P
            if need_cancel:
                L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
            # 统计涨停主动卖成交,为了F撤准备数据
            HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
            # 计算成交进度
            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
            if buy_progress_index is not None:
                total_datas = l2_data_util.local_today_datas.get(code)
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                  total_datas)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
                if is_placed_order:
                    LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                               buy_progress_index,
                                                               total_datas)
                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index)
                    if cancel_result[0]:
                        L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}",
                                                        cancel_type=trade_constant.CANCEL_TYPE_F)
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # =====格式化数据=====
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
        temp_time_dict = {}
        for d in fdatas:
            if d[0][3] not in temp_time_dict:
                temp_time_dict[d[0][3]] = l2_huaxin_util.convert_time(d[0][3], with_ms=True)
            d[5] = temp_time_dict.get(d[0][3])
            d[4] = d[5][:8]
        temp_time_dict.clear()
        try:
            # ======需要同步处理的数据========
            # 设置成交价
            try:
                current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
                if limit_up_price > fdatas[-1][0][1]:
                    # 没有涨停
                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                    radical_buy_strategy.clear_data(code)
            except:
                pass
            # 统计上板时间
            try:
                last_data = fdatas[-1]
                if last_data[1] and last_data[2]:
                    # 涨停主动买
                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                elif not last_data[1] and last_data[2]:
                    # 涨停主动卖
                    if last_data[2]:
                        L2LimitUpSellDataManager.clear_data(code)
            except:
                pass
            # ==========处于委托状态就同步处理数据,没有下过单就异步处理数据==========
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            if is_placed_order:
                # 下过单了
                __process_placed_order()
            else:
                filter_datas = L2TradeSingleDataProcessor.filter_last_limit_up_sell_data(code, fdatas)
                # 回调数据
                if filter_datas:
                    data_callback.l2_trade_single_callback.OnLastLimitUpSellDeal(code, filter_datas[0])
                # 如果是被动买就更新成交进度
                if not fdatas[-1][1]:
                    buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
                    if buy_progress_index is not None:
                        total_datas = l2_data_util.local_today_datas.get(code)
                        cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                             total_datas)
                # 如果数据量大于20条就采用线程池更新数据
                if len(fdatas) >= 20:
                    cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, fdatas, order_begin_pos)
                else:
                    cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
        except Exception as e:
            hx_logger_l2_debug.exception(e)