Administrator
7 天以前 d2a4dd9c837f8df2a19e58f7fb4c81a91c114b67
l2/l2_transaction_data_processor.py
@@ -22,12 +22,12 @@
from log_module import async_log_util
from log_module.log import hx_logger_l2_debug, logger_l2_trade_buy_queue, logger_debug, hx_logger_l2_upload, \
    logger_trade, logger_l2_trade
from trade import current_price_process_manager, trade_constant
from trade import current_price_process_manager, trade_constant, trade_manager
import concurrent.futures
from trade.buy_radical import radical_buy_strategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, EveryLimitupBigDealOrderManager
from utils import tool
from utils import tool, trade_util
class HuaXinTransactionDatasProcessor:
@@ -35,6 +35,8 @@
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    # 非涨停成交时间
    __not_limit_up_time_dict = {}
    # 最近成交数据字典
    __latest_transaction_data_dict = {}
    # 计算成交进度
    @classmethod
@@ -58,7 +60,7 @@
                if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
                    buy_progress_index = buyno_map[buy_no]["index"]
                break
        if buy_progress_index is None and buy_exec_index is not None:
        if buy_progress_index is None and buy_exec_index is not None and buy_exec_index >= 0:
            # 没有找到真实成交进度位且有买入执行位置
            # 根据最近的成交买单号计算真实成交位置
            try:
@@ -178,6 +180,8 @@
        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
        #                   data['SellNo'], data['ExecType']))
        if o_datas:
            cls.__latest_transaction_data_dict[code] = o_datas[-1]
        fdatas = [
            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
            for d in o_datas]
@@ -288,12 +292,17 @@
            # if big_money_count > 0:
            #     LCancelRateManager.compute_big_num_deal_rate(code)
            buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas)
            trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
            # 计算成交进度
            buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas,
                                                                                 order_begin_pos.buy_exec_index if trade_util.is_delegated(
                                                                                     trade_state) else -1)
            if buy_progress_index is not None:
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                  total_datas)
                l2_log.info(code, logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, buy_progress_index)
                l2_log.info(code, logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{} is_similar-{}", code,
                            buy_progress_index, is_similar)
                if is_placed_order:
                    # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                    #                                               buy_progress_index)
@@ -375,9 +384,11 @@
                L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
            # 统计涨停主动卖成交,为了F撤准备数据
            HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
            trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
            # 计算成交进度
            _buy_progress_index, _is_similar = cls.__compute_latest_trade_progress(code, fdatas,
                                                                                   order_begin_pos.buy_exec_index)
                                                                                   order_begin_pos.buy_exec_index if trade_util.is_delegated(
                                                                                       trade_state) else -1)
            if _buy_progress_index is not None:
                total_datas = l2_data_util.local_today_datas.get(code)
                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, _buy_progress_index,
@@ -393,6 +404,8 @@
                        L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}",
                                                        cancel_type=trade_constant.CANCEL_TYPE_F)
        if o_datas:
            cls.__latest_transaction_data_dict[code] = o_datas[-1]
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # =====格式化数据=====
        # 整形数据,格式:[(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)]
@@ -456,7 +469,11 @@
                # 如果是被动买就更新成交进度
                if not fdatas[-1][1]:
                    buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas)
                    trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                    # 计算成交进度
                    buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas,
                                                                                         order_begin_pos.buy_exec_index if trade_util.is_delegated(
                                                                                             trade_state) else -1)
                    if buy_progress_index is not None:
                        total_datas = l2_data_util.local_today_datas.get(code)
                        cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -476,3 +493,7 @@
            if _start_time - __start_time > 5:
                l2_log.info(code, hx_logger_l2_upload,
                            f"{code}处理成交用时:{_start_time - __start_time} 数据数量:{len(fdatas)}  详情:{use_time_list}")
    @classmethod
    def get_latest_transaction_data(cls, code):
        return cls.__latest_transaction_data_dict.get(code)