""" GUI管理 """ import math import psutil import constant from code_attribute import gpcode_manager from l2.subscript import l2_subscript_manager from log_module import log from log_module.log import logger_l2_trade, logger_system import logging import multiprocessing import os import threading from task import task_manager from third_data import hx_qc_value_util from third_data.code_plate_key_manager import KPLPlateForbiddenManager from utils import shared_memery_util logger_system.info("程序启动Pre:{}", os.getpid()) import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client from huaxin_client import l2_market_client, l2_client_v2 from servers import server_util, huaxin_trade_server, server def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, queue_l1_w_strategy_r_: multiprocessing.Queue, queue_strategy_w_trade_r_: multiprocessing.Queue, queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, trade_ipc_addr): """ 策略进程 @param queue_strategy_r_trade_w_: @param queue_l1_w_strategy_r_: @param queue_strategy_w_trade_r_: @param queue_strategy_w_trade_r_for_read_: @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) @return: """ logger_system.info("策略进程ID:{}", os.getpid()) log.close_print() # 初始化参数 server.global_data_loader.init() # 开启数据服务器 threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() # 运行数据监听服务 threading.Thread(target=task_manager.run_data_listener, name="task_manager", args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), daemon=True).start() # # 启动华鑫交易服务 huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, trade_ipc_addr) # 主服务 def createServer(pipe): logger_system.info("create Server") laddr = "", 9001 try: tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{laddr[1]} 启动失败") if __name__ == '__main__1': huaxin_client.l2_client.test() def __create_l2_subscript(): channel_list = [] for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): # 创建委托/成交的共享数组和ZMQ通信通道 delegate_ipc_addr = f"ipc://order_{i}.ipc", deal_ipc_addr = f"ipc://deal_{i}.ipc", delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr] delegate[0] = shared_memery_util.get_number(delegate[1]) deal = [0, shared_memery_util.create_array(), deal_ipc_addr] deal[0] = shared_memery_util.get_number(deal[1]) channel_list.append((delegate, deal)) # L2进程数量 l2_process_count = 8 base_channel_count = len(channel_list) // l2_process_count left_count = len(channel_list) % l2_process_count index = 0 # ======分组====== # 记录每个分组的数量 channel_count_list = [] # 数据回调队列 data_callback_queue_list = [] # 消息传递队列 sub_single_queue_list = [] for i in range(l2_process_count): channel_count = base_channel_count + (1 if i < left_count else 0) channel_count_list.append(channel_count) # 该进程下的通道 channels = channel_list[index:index + channel_count] index += channel_count # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调) sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue() sub_single_queue_list.append(sub_single_queue) data_callback_queue_list.append(data_callback_queue) l2_process = multiprocessing.Process(target=l2_client_v2.run, args=(sub_single_queue, data_callback_queue, channels, i, )) l2_process.start() l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list) # 监听L2市场行情数据 huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list) # 启动ZMQserver,针对委托队列与成交队列进行监听 l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback) if __name__ == '__main__': # 可绑定16-31之间的核 try: logger_l2_trade.info("启动程序") logger_system.info("启动程序--------") log.close_print() # L2读其他写 queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000) # l1 queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000) queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000) # 交易读策略写 queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000) queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000) # 策略读交易写 queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000) # 下单,撤单ipc地址 order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" logger_system.info("主进程ID:{}", os.getpid()) fix_codes = set() open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() if open_buy_codes: fix_codes |= set(open_buy_codes) # 要监控的高标 high_codes = KPLPlateForbiddenManager().get_watch_high_codes() if high_codes: fix_codes |= set(high_codes) # L1订阅数据 l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fix_codes,)) l1Process.start() l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, args=(queue_l1_w_strategy_r,)) l2MarketProcess.start() # 交易进程 tradeProcess = multiprocessing.Process( target=huaxin_client.trade_client.run, args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read)) tradeProcess.start() # 此处将L2的进程与策略进程合并 # 测试L2单独进程 if True: __create_l2_subscript() else: # 将L2的进程改为线程执行 threading.Thread(target=huaxin_client.l2_client.run, args=( queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() # 运行华鑫增值服务进程 threading.Thread(target=hx_qc_value_util.run, daemon=True).start() # 绑核运行 # psutil.Process(l1Process.pid).cpu_affinity([20]) # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22]) # cpu_indexes = [i for i in range(23, 30)] # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) # 主进程 run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, (order_ipc_addr, cancel_order_ipc_addr)) # 将tradeServer作为主进程 l1Process.join() # l2Process.join() tradeProcess.join() except Exception as e: logging.exception(e) logger_system.exception(e)