From c265bb186a6b6e2a31689d599be59b5b6f10cf42 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 30 八月 2023 11:10:32 +0800 Subject: [PATCH] 将交易合并进策略进程 --- huaxin_client/l2_data_manager.py | 29 +++++++++++++++++------------ 1 files changed, 17 insertions(+), 12 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index aa59f02..13ad2a3 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -180,7 +180,7 @@ if udatas: start_time = time.time() # upload_data(code, "l2_order", udatas) - l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) + l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) use_time = int((time.time() - start_time) * 1000) if use_time > 20: @@ -194,7 +194,7 @@ pass -def __run_upload_trans(code): +def __run_upload_trans(code, l2_data_callback: L2DataCallBack): if code not in tmep_transaction_queue_dict: tmep_transaction_queue_dict[code] = queue.Queue() while True: @@ -208,33 +208,38 @@ temp = tmep_transaction_queue_dict[code].get() udatas.append(temp) if udatas: - upload_data(code, "l2_trans", udatas) + # upload_data(code, "l2_trans", udatas) + l2_data_callback.OnL2Transaction(code, udatas) time.sleep(0.01) except Exception as e: logger_local_huaxin_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}") -def __run_upload_common(): +def __run_upload_common(l2_data_callback: L2DataCallBack): print("__run_upload_common") while True: try: while not common_queue.empty(): temp = common_queue.get() - upload_data(temp[0], temp[1], temp[2]) + if temp[1] == "l2_market_data": + l2_data_callback.OnMarketData(temp[0], temp[2]) + else: + upload_data(temp[0], temp[1], temp[2]) time.sleep(0.01) except Exception as e: logger_local_huaxin_l2_error.exception(e) logger_local_huaxin_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}") -def __run_upload_trading_canceled(): +def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack): print("__run_upload_trading_canceled") while True: try: temp = trading_canceled_queue.get() if temp: logger_local_huaxin_g_cancel.info(f"鍑嗗涓婃姤锛歿temp}") - upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) + # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) + l2_data_callback.OnTradingOrderCancel(temp[0], temp[1]) logger_local_huaxin_g_cancel.info(f"涓婃姤鎴愬姛锛歿temp}") except Exception as e: logger_local_huaxin_l2_error.exception(e) @@ -263,17 +268,17 @@ t.start() if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: - t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True) + t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) t.start() -def run_upload_common(): - t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) +def run_upload_common(l2_data_callback: L2DataCallBack): + t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True) t.start() -def run_upload_trading_canceled(): - t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True) +def run_upload_trading_canceled(l2_data_callback: L2DataCallBack): + t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True) t.start() -- Gitblit v1.8.0