From a0f4a1d5bed0b4be8be122e90d2f95b76f178a94 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 21 十一月 2024 17:41:22 +0800 Subject: [PATCH] 精简代码/代码归类 --- main.py | 59 +++++++++++++++++++---------------------------------------- 1 files changed, 19 insertions(+), 40 deletions(-) diff --git a/main.py b/main.py index f463ac8..1a98d6f 100644 --- a/main.py +++ b/main.py @@ -11,35 +11,29 @@ import os import threading +from task import task_manager + logger_system.info("绋嬪簭鍚姩Pre锛歿}", os.getpid()) -from db import redis_manager_delegate as redis_manager import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client from huaxin_client import l2_market_client -# 浜ゆ槗鏈嶅姟 - -# from huaxin_api import trade_client, l2_client, l1_client -from servers import server_util, huaxin_trade_api_server, huaxin_trade_server, server +from servers import server_util, huaxin_trade_server, server -def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, - queue_l1_w_strategy_r_: multiprocessing.Queue, - queue_strategy_w_trade_r_: multiprocessing.Queue, - queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, - queue_l1_trade_w_strategy_r_, trade_ipc_addr): +def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, + queue_l1_w_strategy_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, + trade_ipc_addr): """ 绛栫暐杩涚▼ - - @param pipe_server: @param queue_strategy_r_trade_w_: @param queue_l1_w_strategy_r_: @param queue_strategy_w_trade_r_: @param queue_strategy_w_trade_r_for_read_: - @param queue_l1_trade_r_strategy_w_: - @param queue_l1_trade_w_strategy_r_: @param trade_ipc_addr: 浜ゆ槗ipc鍦板潃(涓嬪崟鍦板潃, 鎾ゅ崟鍦板潃) @return: """ @@ -48,25 +42,18 @@ # 鍒濆鍖栧弬鏁� server.global_data_loader.init() - # 鏁版嵁鏈嶅姟 - t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True) - t1.start() + # 寮�鍚暟鎹湇鍔″櫒 + threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() - # 浜ゆ槗鎺ュ彛鏈嶅姟 - t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", - args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_), - daemon=True) - t1.start() - # - # redis鍚庡彴鏈嶅姟 - t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) - t1.start() - + # 杩愯鏁版嵁鐩戝惉鏈嶅姟 + threading.Thread(target=task_manager.run_data_listener, name="task_manager", + args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), + daemon=True).start() # # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟 - huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, + huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, - queue_l1_trade_w_strategy_r_, trade_ipc_addr) + trade_ipc_addr) # 涓绘湇鍔� @@ -91,17 +78,12 @@ logger_l2_trade.info("鍚姩绋嬪簭") logger_system.info("鍚姩绋嬪簭--------") log.close_print() - # 绛栫暐涓巗erver闂寸殑閫氫俊 - pss_server, pss_strategy = multiprocessing.Pipe() # L2璇诲叾浠栧啓 queue_other_w_l2_r = multiprocessing.Queue() # l1 queue_l1_w_strategy_r = multiprocessing.Queue() queue_l1_r_strategy_w = multiprocessing.Queue() - # l1浜ゆ槗 - queue_l1_trade_w_strategy_r = multiprocessing.Queue() - queue_l1_trade_r_strategy_w = multiprocessing.Queue() # 浜ゆ槗璇荤瓥鐣ュ啓 queue_strategy_w_trade_r = multiprocessing.Queue() @@ -112,9 +94,6 @@ # 涓嬪崟,鎾ゅ崟ipc鍦板潃 order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" - # 鎵樼鐜涓嬩笉鍒涘缓 - # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) - # serverProcess.start() logger_system.info("涓昏繘绋婭D锛歿}", os.getpid()) # L1璁㈤槄鏁版嵁 @@ -149,9 +128,9 @@ # cpu_indexes = [i for i in range(23, 30)] # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) # 涓昏繘绋� - createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, - queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, - queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr)) + run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, + queue_strategy_w_trade_r_for_read, + (order_ipc_addr, cancel_order_ipc_addr)) # 灏唗radeServer浣滀负涓昏繘绋� l1Process.join() -- Gitblit v1.8.0