From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 02 十一月 2023 11:23:09 +0800
Subject: [PATCH] L2进程与策略进程分开

---
 main.py |   51 ++++++++++++++++++++++++++++++++-------------------
 1 files changed, 32 insertions(+), 19 deletions(-)

diff --git a/main.py b/main.py
index 0b12b5f..577c3fc 100644
--- a/main.py
+++ b/main.py
@@ -22,7 +22,9 @@
 # from huaxin_api import trade_client, l2_client, l1_client
 
 
-def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
+def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
+                      queue_l1_w_strategy_r_: multiprocessing.Queue,
+                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_):
     logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid())
     log.close_print()
     # 鍒濆鍖栧弬鏁�
@@ -33,7 +35,8 @@
     t1.start()
     #
     # 浜ゆ槗鎺ュ彛鏈嶅姟
-    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
+    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
+                          args=(pipe_server, queue_other_w_l2_r),
                           daemon=True)
     t1.start()
     #
@@ -41,14 +44,9 @@
     t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
     t1.start()
     #
-    # 鍚姩L2璁㈤槄鏈嶅姟
-    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
-                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
-                          daemon=True)
-    t1.start()
-    #
     # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟
-    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
+    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_,
+                            transaction_queues_, market_queue_)
 
 
 # 涓绘湇鍔�
@@ -83,13 +81,10 @@
         # 绛栫暐涓巗erver闂寸殑閫氫俊
         pss_server, pss_strategy = multiprocessing.Pipe()
 
-        # 浜ゆ槗鍐橪2璇�
-        queue_trade_w_l2_r = multiprocessing.Queue()
-        # 绛栫暐涓巐2涔嬮棿鐨勯�氫俊
-        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
-
-        # l1涓庣瓥鐣ラ棿鐨勯�氫俊
-        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
+        # L2璇诲叾浠栧啓
+        queue_other_w_l2_r = multiprocessing.Queue()
+        #
+        queue_l1_w_strategy_r = multiprocessing.Queue()
 
         # 浜ゆ槗璇荤瓥鐣ュ啓
         queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -102,19 +97,37 @@
         logger_system.info("涓昏繘绋婭D锛歿}", os.getpid())
 
         # L1璁㈤槄鏁版嵁
-        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
+        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,))
         l1Process.start()
 
         # 浜ゆ槗杩涚▼
         tradeProcess = multiprocessing.Process(
-            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
+            target=huaxin_client.trade_client.run,
+            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,))
         tradeProcess.start()
 
+        # 鍒涘缓L2閫氫俊闃熷垪
+        order_queues = []
+        transaction_queues = []
+        market_queue = multiprocessing.Queue()
+        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
+            order_queues.append(multiprocessing.Queue())
+        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
+            transaction_queues.append(multiprocessing.Queue())
+
+        # L2
+        l2Process = multiprocessing.Process(
+            target=huaxin_client.l2_client.run,
+            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
+        l2Process.start()
+
         # 涓昏繘绋�
-        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
+        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
+                          order_queues, transaction_queues, market_queue)
 
         # 灏唗radeServer浣滀负涓昏繘绋�
         l1Process.join()
+        l2Process.join()
         tradeProcess.join()
     except Exception as e:
         logging.exception(e)

--
Gitblit v1.8.0