Administrator
2025-03-13 984e59be6787f06b927d5ec612f443f54e145044
l2/l2_transaction_data_processor.py
@@ -1,6 +1,8 @@
import logging
import time
import dask
import constant
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer, LCancelRateManager
@@ -32,11 +34,11 @@
    # 计算成交进度
    @classmethod
    def __compute_latest_trade_progress(cls, code, buyno_map, datas):
    def __compute_latest_trade_progress(cls, code, buyno_map, fdatas):
        buy_progress_index = None
        for i in range(len(datas) - 1, -1, -1):
            d = datas[i]
            buy_no = f"{d[6]}"
        for i in range(len(fdatas) - 1, -1, -1):
            d = fdatas[i]
            buy_no = f"{d[0][6]}"
            if buyno_map and buy_no in buyno_map:
                # 成交进度位必须是涨停买
                if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
@@ -45,61 +47,94 @@
        return buy_progress_index
    @classmethod
    def statistic_big_order_infos(cls, code, datas, order_begin_pos: OrderBeginPosInfo):
    def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo):
        """
        统计大单成交
        @param code:
        @param datas:
        @param fdatas: 格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        @return:
        """
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, datas, limit_up_price)
        if buy_datas:
            BigOrderDealManager().add_buy_datas(code, buy_datas)
            active_big_buy_orders = []
        @dask.delayed
        def statistic_big_buy_data():
            buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
            if buy_datas:
                for x in buy_datas:
                    if x[0] > x[6]:
                        # (买单号, 成交金额, 最后成交时间)
                        active_big_buy_orders.append((x[0], x[2], x[4]))
            EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
        try:
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            if is_placed_order:
                if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
                    RadicalBuyDataManager.big_order_deal(code)
                BigOrderDealManager().add_buy_datas(code, buy_datas)
                active_big_buy_orders = []
                if buy_datas:
                    for x in buy_datas:
                        if x[0] > x[6]:
                            # (买单号, 成交金额, 最后成交时间)
                            active_big_buy_orders.append((x[0], x[2], x[4]))
                EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
            try:
                is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                if is_placed_order:
                    if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
                        RadicalBuyDataManager.big_order_deal(code)
                if bigger_buy_datas:
                    # 有大于50w的大单成交
                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
                    if buyno_map:
                        for buy_data in bigger_buy_datas:
                            order_no = f"{buy_data[0]}"
                            if order_no in buyno_map:
                                LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
                                                                       order_begin_pos.buy_single_index)
        except Exception as e:
            logger_debug.exception(e)
                    if bigger_buy_datas:
                        # 有大于50w的大单成交
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                        if buyno_map:
                            for buy_data in bigger_buy_datas:
                                order_no = f"{buy_data[0]}"
                                if order_no in buyno_map:
                                    LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
                                                                           order_begin_pos.buy_single_index)
            except Exception as e:
                logger_debug.exception(e)
            return buy_datas
        sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, datas)
        if sell_datas:
            BigOrderDealManager().add_sell_datas(code, sell_datas)
        @dask.delayed
        def statistic_big_sell_data():
            sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
            if sell_datas:
                BigOrderDealManager().add_sell_datas(code, sell_datas)
            return sell_datas
        @dask.delayed
        def statistic_big_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 并行处理买单与卖单
        f1 = statistic_big_buy_data()
        f2 = statistic_big_sell_data()
        dask_result = statistic_big_data(f1, f2)
        buy_datas, sell_datas = dask_result.compute()
        if buy_datas or sell_datas:
            buy_money = BigOrderDealManager().get_total_buy_money(code)
            sell_money = BigOrderDealManager().get_total_sell_money(code)
            LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
    @classmethod
    def process_huaxin_transaction_datas(cls, code, datas):
    def process_huaxin_transaction_datas(cls, code, o_datas):
        # TODO 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                   data['SellNo'], data['ExecType']))
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
        temp_time_dict = {}
        for d in fdatas:
            if d[3] not in temp_time_dict:
                temp_time_dict[d[3]] = l2_huaxin_util.convert_time(d[3], with_ms=True)
            d[5] = temp_time_dict.get(d[3])
            d[4] = d[5][:8]
        temp_time_dict.clear()
        __start_time = time.time()
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 设置成交价
        try:
            current_price_process_manager.set_trade_price(code, datas[-1][1])
            if limit_up_price > datas[-1][1]:
            current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
            if limit_up_price > fdatas[-1][0][1]:
                # 没有涨停
                EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{datas[-1][1]}")
                EveryLimitupBigDealOrderManager.open_limit_up(code, f"最新成交价:{fdatas[-1][0][1]}")
                radical_buy_strategy.clear_data(code)
        except:
            pass
@@ -116,13 +151,13 @@
            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
            _start_time = time.time()
            L2LimitUpSellDataManager.set_deal_datas(code, datas)
            L2LimitUpSellDataManager.set_deal_datas(code, fdatas)
            use_time_list.append(("统计涨停卖成交", time.time() - _start_time))
            _start_time = time.time()
            #  大单统计
            # cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, datas, order_begin_pos)
            try:
                cls.statistic_big_order_infos(code, datas, order_begin_pos)
                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
            except Exception as e:
                async_log_util.error(hx_logger_l2_debug, f"统计大单出错:{str(e)}")
            use_time_list.append(("统计大单数据", time.time() - _start_time))
@@ -132,24 +167,22 @@
            try:
                # 统计上板时间
                try:
                    for d in datas:
                        if d[6] > d[7]:
                    for d in fdatas:
                        if d[1]:
                            # 主动买
                            if d[1] == limit_up_price:
                            if d[2]:
                                # 涨停
                                current_price_process_manager.set_latest_not_limit_up_time(code,
                                                                                           l2_huaxin_util.convert_time(
                                                                                               d[3], with_ms=True))
                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
                        else:
                            # 主动卖(板上)
                            if d[1] == limit_up_price:
                            if d[2]:
                                L2LimitUpSellDataManager.clear_data(code)
                                break
                except:
                    pass
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas, limit_up_price)
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, fdatas, limit_up_price)
                use_time_list.append(("处理卖单成交数据", time.time() - _start_time))
                _start_time = time.time()
@@ -171,10 +204,10 @@
                                                                                            order_begin_pos)
                        cancel_type = trade_constant.CANCEL_TYPE_P
                    # 判断时间是否与本地时间相差5s以上
                    if tool.trade_time_sub(tool.get_now_time_str(), l2_huaxin_util.convert_time(datas[-1][3])) > 10:
                    if tool.trade_time_sub(tool.get_now_time_str(), fdatas[-1][4]) > 10:
                        now_seconds = int(tool.get_now_time_str().replace(":", ""))
                        if now_seconds < int("093100"):  # or int("130000") <= now_seconds < int("130200"):
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{l2_huaxin_util.convert_time(datas[-1][3])}"
                            need_cancel, cancel_msg = True, f"成交时间与本地时间相差10S以上,{fdatas[-1][4]}"
                            cancel_type = trade_constant.CANCEL_TYPE_L2_DELAY
                    if need_cancel:
                        L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
@@ -183,7 +216,7 @@
                    use_time_list.append(("处理卖单相关撤数据", time.time() - _start_time))
                    _start_time = time.time()
                # 统计涨停卖成交
                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, datas, limit_up_price)
                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, fdatas, limit_up_price)
                use_time_list.append(("统计成交量数据", time.time() - _start_time))
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
@@ -193,7 +226,7 @@
            # if big_money_count > 0:
            #     LCancelRateManager.compute_big_num_deal_rate(code)
            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, datas)
            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, fdatas)
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -246,4 +279,5 @@
            use_time = int((time.time() - __start_time) * 1000)
            if use_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{use_time} 数据数量:{len(datas)}  详情:{use_time_list}")
                            f"{code}处理成交用时:{use_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")