| | |
| | | try: |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) |
| | | finally: |
| | | l2_data_log.l2_time_log(code, f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(time.time() * 1000) - now_timestamp}") |
| | | l2_data_log.l2_time_log(code, |
| | | f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(time.time() * 1000) - now_timestamp}") |
| | | |
| | | @classmethod |
| | | def l2_transaction(cls, code, datas): |
| | |
| | | # 设置成交价 |
| | | current_price_process_manager.set_trade_price(code, datas[-1][1]) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | __start_time = time.time() |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | if not buyno_map: |
| | |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | if buy_progress_index is not None: |
| | | cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | buy_time = total_datas[buy_progress_index]["val"]["time"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | m_base_val, |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | if buy_progress_index is not None: |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | m_base_val, |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | |
| | | dask.compute(f2, f3, f4) |
| | | else: |
| | | pass |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | |
| | | dask.compute(f2, f3, f4) |
| | | else: |
| | | pass |
| | | except Exception as e: |
| | | hx_logger_l2_transaction.exception(e) |
| | | finally: |
| | | async_log_util.info(hx_logger_l2_transaction, f"处理用时:{int((time.time() - __start_time) * 1000)}") |
| | | |
| | | @classmethod |
| | | def l2_market_data(cls, code, data): |
| | |
| | | yesterday_codes, |
| | | block_info.get_before_blocks_dict()) |
| | | |
| | | hx_logger_l2_market_data.info(f"{code}#{data}") |
| | | async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}") |
| | | |
| | | @classmethod |
| | | def trading_order_canceled(cls, code, order_no): |
| | |
| | | timestamp = val.get("time") |
| | | # 大于10s的数据放弃处理 |
| | | if type_ == "set_target_codes": |
| | | if time.time() * 1000 - timestamp > 10*1000: |
| | | if time.time() * 1000 - timestamp > 10 * 1000: |
| | | continue |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |