Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
main.py
@@ -1,9 +1,13 @@
"""
GUI管理
"""
import math
import psutil
import constant
from code_attribute import gpcode_manager
from l2.subscript import l2_subscript_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
@@ -14,13 +18,14 @@
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from utils import shared_memery_util
logger_system.info("程序启动Pre:{}", os.getpid())
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
from huaxin_client import l2_market_client, l2_client_v2
from servers import server_util, huaxin_trade_server, server
@@ -74,6 +79,54 @@
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
def __create_l2_subscript():
    channel_list = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        # 创建委托/成交的共享数组和ZMQ通信通道
        delegate_ipc_addr = f"ipc://order_{i}.ipc"
        deal_ipc_addr = f"ipc://deal_{i}.ipc"
        delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
        delegate[0] = shared_memery_util.get_number(delegate[1])
        deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
        deal[0] = shared_memery_util.get_number(deal[1])
        channel_list.append((delegate, deal))
    # L2进程数量
    l2_process_count = 8
    base_channel_count = len(channel_list) // l2_process_count
    left_count = len(channel_list) % l2_process_count
    index = 0
    # ======分组======
    # 记录每个分组的数量
    channel_count_list = []
    # 数据回调队列
    data_callback_queue_list = []
    # 消息传递队列
    sub_single_queue_list = []
    for i in range(l2_process_count):
        channel_count = base_channel_count + (1 if i < left_count else 0)
        channel_count_list.append(channel_count)
        # 该进程下的通道
        channels = channel_list[index:index + channel_count]
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
        sub_single_queue_list.append(sub_single_queue)
        data_callback_queue_list.append(data_callback_queue)
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
                                             args=(sub_single_queue, data_callback_queue, channels, i, ))
        l2_process.start()
    l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
    # 监听L2市场行情数据
    huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
    # 启动ZMQserver,针对委托队列与成交队列进行监听
    l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
if __name__ == '__main__':
    # 可绑定16-31之间的核
    try:
@@ -123,14 +176,15 @@
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
        # L2
        # l2Process = multiprocessing.Process(
        #     target=huaxin_client.l2_client.run,
        #     args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback))
        # l2Process.start()
        # 将L2的进程改为线程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 测试L2单独进程
        if constant.IS_L2_NEW:
            __create_l2_subscript()
        else:
            # 将L2的进程改为线程执行
            threading.Thread(target=huaxin_client.l2_client.run, args=(
                queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 运行华鑫增值服务进程
        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()