Administrator
6 天以前 abd510d66074ac640555c241b6343a53cca8f070
main.py
@@ -1,54 +1,66 @@
"""
GUI管理
"""
import math
import psutil
import constant
from code_attribute import gpcode_manager
from l2.subscript import l2_subscript_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
import multiprocessing
import os
import threading
from db import redis_manager_delegate as redis_manager
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from utils import shared_memery_util
logger_system.info("程序启动Pre:{}", os.getpid())
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
from huaxin_client import l2_market_client, l2_client_v2
from server import *
# 交易服务
from third_data import data_server
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
from servers import server_util, huaxin_trade_server, server
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
                 queue_l1_w_strategy_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
                 trade_ipc_addr):
    """
    策略进程
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    global_data_loader.init()
    server.global_data_loader.init()
    # # 数据服务
    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    # 开启数据服务器
    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
    # 运行数据监听服务
    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
                     daemon=True).start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            trade_ipc_addr)
# 主服务
@@ -56,65 +68,140 @@
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
                                                pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
def createDataServer():
    logger_system.info("create DataServer")
    logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{9004} 启动失败")
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
def __create_l2_subscript():
    channel_list = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        # 创建委托/成交的共享数组和ZMQ通信通道
        delegate_ipc_addr = f"ipc://order_{i}.ipc"
        deal_ipc_addr = f"ipc://deal_{i}.ipc"
        delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
        delegate[0] = shared_memery_util.get_number(delegate[1])
        deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
        deal[0] = shared_memery_util.get_number(deal[1])
        channel_list.append((delegate, deal))
    # L2进程数量
    l2_process_count = 8
    base_channel_count = len(channel_list) // l2_process_count
    left_count = len(channel_list) % l2_process_count
    index = 0
    # ======分组======
    # 记录每个分组的数量
    channel_count_list = []
    # 数据回调队列
    data_callback_queue_list = []
    # 消息传递队列
    sub_single_queue_list = []
    for i in range(l2_process_count):
        channel_count = base_channel_count + (1 if i < left_count else 0)
        channel_count_list.append(channel_count)
        # 该进程下的通道
        channels = channel_list[index:index + channel_count]
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
        sub_single_queue_list.append(sub_single_queue)
        data_callback_queue_list.append(data_callback_queue)
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
                                             args=(sub_single_queue, data_callback_queue, channels, i, ))
        l2_process.start()
    l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
    # 监听L2市场行情数据
    huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
    # 启动ZMQserver,针对委托队列与成交队列进行监听
    l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
if __name__ == '__main__':
    # 可绑定16-31之间的核
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 交易写L2读
        queue_trade_w_l2_r = multiprocessing.Queue()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000)
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000)
        queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000)
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
        queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000)
        queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000)
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
        queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000)
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
        logger_system.info("主进程ID:{}", os.getpid())
        fix_codes = set()
        open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if open_buy_codes:
            fix_codes |= set(open_buy_codes)
        # 要监控的高标
        high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
        if high_codes:
            fix_codes |= set(high_codes)
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  fix_codes,))
        l1Process.start()
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                                  args=(queue_l1_w_strategy_r,))
        l2MarketProcess.start()
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
            target=huaxin_client.trade_client.run,
            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
        # 测试L2单独进程
        if constant.IS_L2_NEW:
            __create_l2_subscript()
        else:
            # 将L2的进程改为线程执行
            threading.Thread(target=huaxin_client.l2_client.run, args=(
                queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 运行华鑫增值服务进程
        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
        # 绑核运行
        # psutil.Process(l1Process.pid).cpu_affinity([20])
        # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
        # cpu_indexes = [i for i in range(23, 30)]
        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                     queue_strategy_w_trade_r_for_read,
                     (order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()
        # l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)