Administrator
2024-11-21 a0f4a1d5bed0b4be8be122e90d2f95b76f178a94
main.py
@@ -11,35 +11,29 @@
import os
import threading
from task import task_manager
logger_system.info("程序启动Pre:{}", os.getpid())
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
# 交易服务
# from huaxin_api import trade_client, l2_client, l1_client
from servers import server_util, huaxin_trade_api_server, huaxin_trade_server, server
from servers import server_util, huaxin_trade_server, server
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
                 queue_l1_w_strategy_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
                 trade_ipc_addr):
    """
    策略进程
    @param pipe_server:
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
@@ -48,25 +42,18 @@
    # 初始化参数
    server.global_data_loader.init()
    # 数据服务
    t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True)
    t1.start()
    # 开启数据服务器
    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    # 运行数据监听服务
    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
                     daemon=True).start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
                            trade_ipc_addr)
# 主服务
@@ -91,17 +78,12 @@
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -112,9 +94,6 @@
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
        # L1订阅数据
@@ -149,9 +128,9 @@
        # cpu_indexes = [i for i in range(23, 30)]
        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                     queue_strategy_w_trade_r_for_read,
                     (order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()