""" GUI管理 """ import logging import multiprocessing import os import threading import constant from db import redis_manager_delegate as redis_manager import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client import huaxin_client.l1_client_for_trade from huaxin_client import l2_market_client from log_module import log from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1 import server # 交易服务 from third_data import data_server from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server # from huaxin_api import trade_client, l2_client, l1_client from utils import tool def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, queue_l1_w_strategy_r_: multiprocessing.Queue, queue_strategy_w_trade_r_: multiprocessing.Queue, queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, trade_ipc_addr): """ 策略进程 @param pipe_server: @param queue_strategy_r_trade_w_: @param queue_l1_w_strategy_r_: @param queue_strategy_w_trade_r_: @param queue_strategy_w_trade_r_for_read_: @param queue_l1_trade_r_strategy_w_: @param queue_l1_trade_w_strategy_r_: @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) @return: """ logger_system.info("策略进程ID:{}", os.getpid()) log.close_print() # 初始化参数 server.global_data_loader.init() # # 数据服务 t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True) t1.start() # # 交易接口服务 t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_), daemon=True) t1.start() # # redis后台服务 t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) t1.start() # # 启动华鑫交易服务 huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, queue_l1_trade_w_strategy_r_, trade_ipc_addr) # 主服务 def createServer(pipe): logger_system.info("create Server") laddr = "", 9001 try: tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{laddr[1]} 启动失败") def createDataServer(): logger_system.info("create DataServer") logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}") tcpserver = data_server.run("", 9004) tcpserver.serve_forever() try: tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{9004} 启动失败") if __name__ == '__main__1': huaxin_client.l2_client.test() if __name__ == '__main__': try: logger_l2_trade.info("启动程序") logger_system.info("启动程序--------") log.close_print() # 策略与server间的通信 pss_server, pss_strategy = multiprocessing.Pipe() # L2读其他写 queue_other_w_l2_r = multiprocessing.Queue() # l1 queue_l1_w_strategy_r = multiprocessing.Queue() queue_l1_r_strategy_w = multiprocessing.Queue() # l1交易 queue_l1_trade_w_strategy_r = multiprocessing.Queue() queue_l1_trade_r_strategy_w = multiprocessing.Queue() # 交易读策略写 queue_strategy_w_trade_r = multiprocessing.Queue() queue_strategy_w_trade_r_for_read = multiprocessing.Queue() # 策略读交易写 queue_strategy_r_trade_w = multiprocessing.Queue() # 下单,撤单ipc地址 order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" # 托管环境下不创建 # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) # serverProcess.start() logger_system.info("主进程ID:{}", os.getpid()) # L1订阅数据 l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,)) l1Process.start() l1TradeProcess = multiprocessing.Process(target=huaxin_client.l1_client_for_trade.run, args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,)) l1TradeProcess.start() l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, args=(queue_l1_w_strategy_r,)) l2MarketProcess.start() # 交易进程 tradeProcess = multiprocessing.Process( target=huaxin_client.trade_client.run, args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read)) tradeProcess.start() # 此处将L2的进程与策略进程合并 # L2 # l2Process = multiprocessing.Process( # target=huaxin_client.l2_client.run, # args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback)) # l2Process.start() # 将L2的进程改为进程执行 threading.Thread(target=huaxin_client.l2_client.run, args=( queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() # 主进程 createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr)) # 将tradeServer作为主进程 l1Process.join() # l2Process.join() tradeProcess.join() except Exception as e: logging.exception(e) logger_system.exception(e)