""" GUI管理 """ import psutil from code_attribute import gpcode_manager from log_module import log from log_module.log import logger_l2_trade, logger_system import logging import multiprocessing import os import threading from task import task_manager from third_data import hx_qc_value_util logger_system.info("程序启动Pre:{}", os.getpid()) import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client from huaxin_client import l2_market_client from servers import server_util, huaxin_trade_server, server def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, queue_l1_w_strategy_r_: multiprocessing.Queue, queue_strategy_w_trade_r_: multiprocessing.Queue, queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, trade_ipc_addr): """ 策略进程 @param queue_strategy_r_trade_w_: @param queue_l1_w_strategy_r_: @param queue_strategy_w_trade_r_: @param queue_strategy_w_trade_r_for_read_: @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) @return: """ logger_system.info("策略进程ID:{}", os.getpid()) log.close_print() # 初始化参数 server.global_data_loader.init() # 开启数据服务器 threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() # 运行数据监听服务 threading.Thread(target=task_manager.run_data_listener, name="task_manager", args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), daemon=True).start() # # 启动华鑫交易服务 huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, trade_ipc_addr) # 主服务 def createServer(pipe): logger_system.info("create Server") laddr = "", 9001 try: tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{laddr[1]} 启动失败") if __name__ == '__main__1': huaxin_client.l2_client.test() if __name__ == '__main__': # 可绑定16-31之间的核 try: logger_l2_trade.info("启动程序") logger_system.info("启动程序--------") log.close_print() # L2读其他写 queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000) # l1 queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000) queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000) # 交易读策略写 queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000) queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000) # 策略读交易写 queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000) # 下单,撤单ipc地址 order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" logger_system.info("主进程ID:{}", os.getpid()) # L1订阅数据 l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w, gpcode_manager.BuyOpenLimitUpCodeManager().get_codes(),)) l1Process.start() l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, args=(queue_l1_w_strategy_r,)) l2MarketProcess.start() # 交易进程 tradeProcess = multiprocessing.Process( target=huaxin_client.trade_client.run, args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read)) tradeProcess.start() # 此处将L2的进程与策略进程合并 # L2 # l2Process = multiprocessing.Process( # target=huaxin_client.l2_client.run, # args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback)) # l2Process.start() # 将L2的进程改为线程执行 threading.Thread(target=huaxin_client.l2_client.run, args=( queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() # 运行华鑫增值服务进程 threading.Thread(target=hx_qc_value_util.run, daemon=True).start() # 绑核运行 # psutil.Process(l1Process.pid).cpu_affinity([20]) # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22]) # cpu_indexes = [i for i in range(23, 30)] # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) # 主进程 run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, (order_ipc_addr, cancel_order_ipc_addr)) # 将tradeServer作为主进程 l1Process.join() # l2Process.join() tradeProcess.join() except Exception as e: logging.exception(e) logger_system.exception(e)