From c265bb186a6b6e2a31689d599be59b5b6f10cf42 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 30 八月 2023 11:10:32 +0800
Subject: [PATCH] 将交易合并进策略进程

---
 huaxin_client/l2_data_manager.py |   29 +++++++++++++++++------------
 1 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index aa59f02..13ad2a3 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -180,7 +180,7 @@
                 if udatas:
                     start_time = time.time()
                     # upload_data(code, "l2_order", udatas)
-                    l2_data_callback.OnL2Order(code,  udatas, int(time.time() * 1000))
+                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                     # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                     use_time = int((time.time() - start_time) * 1000)
                     if use_time > 20:
@@ -194,7 +194,7 @@
                 pass
 
 
-def __run_upload_trans(code):
+def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
     if code not in tmep_transaction_queue_dict:
         tmep_transaction_queue_dict[code] = queue.Queue()
     while True:
@@ -208,33 +208,38 @@
                 temp = tmep_transaction_queue_dict[code].get()
                 udatas.append(temp)
             if udatas:
-                upload_data(code, "l2_trans", udatas)
+                # upload_data(code, "l2_trans", udatas)
+                l2_data_callback.OnL2Transaction(code, udatas)
             time.sleep(0.01)
         except Exception as e:
             logger_local_huaxin_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}")
 
 
-def __run_upload_common():
+def __run_upload_common(l2_data_callback: L2DataCallBack):
     print("__run_upload_common")
     while True:
         try:
             while not common_queue.empty():
                 temp = common_queue.get()
-                upload_data(temp[0], temp[1], temp[2])
+                if temp[1] == "l2_market_data":
+                    l2_data_callback.OnMarketData(temp[0], temp[2])
+                else:
+                    upload_data(temp[0], temp[1], temp[2])
             time.sleep(0.01)
         except Exception as e:
             logger_local_huaxin_l2_error.exception(e)
             logger_local_huaxin_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}")
 
 
-def __run_upload_trading_canceled():
+def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
     print("__run_upload_trading_canceled")
     while True:
         try:
             temp = trading_canceled_queue.get()
             if temp:
                 logger_local_huaxin_g_cancel.info(f"鍑嗗涓婃姤锛歿temp}")
-                upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
+                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
+                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                 logger_local_huaxin_g_cancel.info(f"涓婃姤鎴愬姛锛歿temp}")
         except Exception as e:
             logger_local_huaxin_l2_error.exception(e)
@@ -263,17 +268,17 @@
         t.start()
 
     if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
-        t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True)
+        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
         t.start()
 
 
-def run_upload_common():
-    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
+def run_upload_common(l2_data_callback: L2DataCallBack):
+    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
     t.start()
 
 
-def run_upload_trading_canceled():
-    t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True)
+def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
+    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
     t.start()
 
 

--
Gitblit v1.8.0