From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 02 十一月 2023 11:23:09 +0800 Subject: [PATCH] L2进程与策略进程分开 --- main.py | 51 ++++++++++++++++++++++++++++++++------------------- 1 files changed, 32 insertions(+), 19 deletions(-) diff --git a/main.py b/main.py index 0b12b5f..577c3fc 100644 --- a/main.py +++ b/main.py @@ -22,7 +22,9 @@ # from huaxin_api import trade_client, l2_client, l1_client -def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue): +def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, + queue_l1_w_strategy_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_): logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid()) log.close_print() # 鍒濆鍖栧弬鏁� @@ -33,7 +35,8 @@ t1.start() # # 浜ゆ槗鎺ュ彛鏈嶅姟 - t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), + t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", + args=(pipe_server, queue_other_w_l2_r), daemon=True) t1.start() # @@ -41,14 +44,9 @@ t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) t1.start() # - # 鍚姩L2璁㈤槄鏈嶅姟 - t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", - args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback), - daemon=True) - t1.start() - # # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟 - huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r) + huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_, + transaction_queues_, market_queue_) # 涓绘湇鍔� @@ -83,13 +81,10 @@ # 绛栫暐涓巗erver闂寸殑閫氫俊 pss_server, pss_strategy = multiprocessing.Pipe() - # 浜ゆ槗鍐橪2璇� - queue_trade_w_l2_r = multiprocessing.Queue() - # 绛栫暐涓巐2涔嬮棿鐨勯�氫俊 - psl2_strategy, psl2_l2 = multiprocessing.Pipe() - - # l1涓庣瓥鐣ラ棿鐨勯�氫俊 - pl1t_l1, pl1t_strategy = multiprocessing.Pipe() + # L2璇诲叾浠栧啓 + queue_other_w_l2_r = multiprocessing.Queue() + # + queue_l1_w_strategy_r = multiprocessing.Queue() # 浜ゆ槗璇荤瓥鐣ュ啓 queue_strategy_w_trade_r = multiprocessing.Queue() @@ -102,19 +97,37 @@ logger_system.info("涓昏繘绋婭D锛歿}", os.getpid()) # L1璁㈤槄鏁版嵁 - l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) + l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,)) l1Process.start() # 浜ゆ槗杩涚▼ tradeProcess = multiprocessing.Process( - target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r)) + target=huaxin_client.trade_client.run, + args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,)) tradeProcess.start() + # 鍒涘缓L2閫氫俊闃熷垪 + order_queues = [] + transaction_queues = [] + market_queue = multiprocessing.Queue() + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + order_queues.append(multiprocessing.Queue()) + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + transaction_queues.append(multiprocessing.Queue()) + + # L2 + l2Process = multiprocessing.Process( + target=huaxin_client.l2_client.run, + args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue)) + l2Process.start() + # 涓昏繘绋� - createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r) + createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, + order_queues, transaction_queues, market_queue) # 灏唗radeServer浣滀负涓昏繘绋� l1Process.join() + l2Process.join() tradeProcess.join() except Exception as e: logging.exception(e) -- Gitblit v1.8.0