""" GUI管理 """ import multiprocessing import os from db import redis_manager_delegate as redis_manager import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client from log_module import log from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1 from server import * # 交易服务 from third_data import data_server from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server # from huaxin_api import trade_client, l2_client, l1_client def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue): logger_system.info("策略进程ID:{}", os.getpid()) log.close_print() # 初始化参数 global_data_loader.init() # # 数据服务 t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True) t1.start() # # 交易接口服务 t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), daemon=True) t1.start() # # redis后台服务 t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) t1.start() # # 启动L2订阅服务 t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback), daemon=True) t1.start() # # 启动华鑫交易服务 huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r) # 主服务 def createServer(pipe): logger_system.info("create Server") laddr = "", 9001 tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle try: tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{laddr[1]} 启动失败") def createDataServer(): logger_system.info("create DataServer") logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}") tcpserver = data_server.run("", 9004) tcpserver.serve_forever() try: tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{9004} 启动失败") if __name__ == '__main__': try: logger_l2_trade.info("启动程序") logger_system.info("启动程序--------") log.close_print() # 策略与server间的通信 pss_server, pss_strategy = multiprocessing.Pipe() # 交易写L2读 queue_trade_w_l2_r = multiprocessing.Queue() # 策略与l2之间的通信 psl2_strategy, psl2_l2 = multiprocessing.Pipe() # l1与策略间的通信 pl1t_l1, pl1t_strategy = multiprocessing.Pipe() # 交易读策略写 queue_strategy_w_trade_r = multiprocessing.Queue() # 策略读交易写 queue_strategy_r_trade_w = multiprocessing.Queue() # 托管环境下不创建 # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) # serverProcess.start() logger_system.info("主进程ID:{}", os.getpid()) # L1订阅数据 l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) l1Process.start() # 交易进程 tradeProcess = multiprocessing.Process( target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r)) tradeProcess.start() # 主进程 createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r) # 将tradeServer作为主进程 l1Process.join() tradeProcess.join() except Exception as e: logging.exception(e) logger_system.exception(e)