Administrator
2023-11-02 eb33b717023d9871bd74e6dce47a065228cffefc
main.py
@@ -22,7 +22,9 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -33,7 +35,8 @@
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r),
                          daemon=True)
    t1.start()
    #
@@ -41,14 +44,9 @@
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_,
                            transaction_queues_, market_queue_)
# 主服务
@@ -83,13 +81,10 @@
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 交易写L2读
        queue_trade_w_l2_r = multiprocessing.Queue()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        #
        queue_l1_w_strategy_r = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -102,19 +97,37 @@
        logger_system.info("主进程ID:{}", os.getpid())
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,))
        l1Process.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
            target=huaxin_client.trade_client.run,
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,))
        tradeProcess.start()
        # 创建L2通信队列
        order_queues = []
        transaction_queues = []
        market_queue = multiprocessing.Queue()
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            order_queues.append(multiprocessing.Queue())
        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
            transaction_queues.append(multiprocessing.Queue())
        # L2
        l2Process = multiprocessing.Process(
            target=huaxin_client.l2_client.run,
            args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
        l2Process.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          order_queues, transaction_queues, market_queue)
        # 将tradeServer作为主进程
        l1Process.join()
        l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)