Administrator
2023-09-19 6b0a07dd2aeadff744b3d930366a0a7cec6351bf
添加逐笔成交处理日志
1个文件已修改
86 ■■■■ 已修改文件
trade/huaxin/trade_server.py 86 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py
@@ -294,7 +294,8 @@
        try:
            l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
        finally:
            l2_data_log.l2_time_log(code, f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(time.time() * 1000) - now_timestamp}")
            l2_data_log.l2_time_log(code,
                                    f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(time.time() * 1000) - now_timestamp}")
    @classmethod
    def l2_transaction(cls, code, datas):
@@ -303,6 +304,7 @@
            # 设置成交价
            current_price_process_manager.set_trade_price(code, datas[-1][1])
            total_datas = l2_data_util.local_today_datas.get(code)
            __start_time = time.time()
            try:
                buyno_map = l2_data_util.local_today_buyno_map.get(code)
                if not buyno_map:
@@ -321,51 +323,51 @@
                        buy_progress_index = buyno_map[buy_no]["index"]
                        break
                # 获取执行位时间
                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                    code)
                if True:
                    if buy_progress_index is not None:
                        cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                        num_operate_map = l2_data_util.local_today_num_operate_map.get(
                            code)
                        async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                            buy_progress_index)
                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if buy_exec_index:
                            m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code)
                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                          buy_progress_index,
                                                                                          buy_exec_index,
                                                                                          total_datas,
                                                                                          num_operate_map,
                                                                                          m_base_val,
                                                                                          limit_up_price)
                            if need_cancel:
                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                if buy_progress_index is not None:
                    # 获取执行位时间
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                        code)
                    cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                    num_operate_map = l2_data_util.local_today_num_operate_map.get(
                        code)
                    async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                        buy_progress_index)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if buy_exec_index:
                        m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code)
                        need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                      buy_progress_index,
                                                                                      total_datas)
                        f3 = dask.delayed(
                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                            code,
                            buy_progress_index,
                            total_datas,
                            num_operate_map)
                                                                                      buy_exec_index,
                                                                                      total_datas,
                                                                                      num_operate_map,
                                                                                      m_base_val,
                                                                                      limit_up_price)
                        if need_cancel:
                            L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                        f4 = dask.delayed(
                            SecondCancelBigNumComputer().set_transaction_index)(
                            code,
                            buy_progress_index)
                    f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                  buy_progress_index,
                                                                                  total_datas)
                    f3 = dask.delayed(
                        deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                        code,
                        buy_progress_index,
                        total_datas,
                        num_operate_map)
                        dask.compute(f2, f3, f4)
                    else:
                        pass
                    f4 = dask.delayed(
                        SecondCancelBigNumComputer().set_transaction_index)(
                        code,
                        buy_progress_index)
                    dask.compute(f2, f3, f4)
                else:
                    pass
            except Exception as e:
                hx_logger_l2_transaction.exception(e)
            finally:
                async_log_util.info(hx_logger_l2_transaction, f"处理用时:{int((time.time() - __start_time) * 1000)}")
    @classmethod
    def l2_market_data(cls, code, data):
@@ -404,7 +406,7 @@
                                                         yesterday_codes,
                                                         block_info.get_before_blocks_dict())
        hx_logger_l2_market_data.info(f"{code}#{data}")
        async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}")
    @classmethod
    def trading_order_canceled(cls, code, order_no):
@@ -458,7 +460,7 @@
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        if time.time() * 1000 - timestamp > 10*1000:
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e: