"""
|
GUI管理
|
"""
|
import logging
|
import multiprocessing
|
import os
|
import threading
|
|
import constant
|
from db import redis_manager_delegate as redis_manager
|
import huaxin_client.trade_client
|
import huaxin_client.l2_client
|
import huaxin_client.l1_client
|
import huaxin_client.l1_client_for_trade
|
from log_module import log
|
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
|
|
import server
|
|
# 交易服务
|
from third_data import data_server
|
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
|
|
# from huaxin_api import trade_client, l2_client, l1_client
|
from utils import tool
|
|
|
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
|
queue_l1_w_strategy_r_: multiprocessing.Queue,
|
queue_strategy_w_trade_r_: multiprocessing.Queue,
|
queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
|
market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
|
logger_system.info("策略进程ID:{}", os.getpid())
|
log.close_print()
|
# 初始化参数
|
server.global_data_loader.init()
|
|
# # 数据服务
|
t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
|
t1.start()
|
#
|
# 交易接口服务
|
t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
|
args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_),
|
daemon=True)
|
t1.start()
|
#
|
# redis后台服务
|
t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
|
t1.start()
|
#
|
# 启动华鑫交易服务
|
huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
|
queue_strategy_w_trade_r_for_read_, order_queues_,
|
transaction_queues_, market_queue_,
|
queue_l1_trade_w_strategy_r_)
|
|
|
# 主服务
|
def createServer(pipe):
|
logger_system.info("create Server")
|
laddr = "", 9001
|
try:
|
tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
|
pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
|
|
|
def createDataServer():
|
logger_system.info("create DataServer")
|
logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
|
tcpserver = data_server.run("", 9004)
|
tcpserver.serve_forever()
|
try:
|
tcpserver.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{9004} 启动失败")
|
|
|
if __name__ == '__main__1':
|
huaxin_client.l2_client.test()
|
|
if __name__ == '__main__':
|
try:
|
logger_l2_trade.info("启动程序")
|
logger_system.info("启动程序--------")
|
log.close_print()
|
# 策略与server间的通信
|
pss_server, pss_strategy = multiprocessing.Pipe()
|
|
# L2读其他写
|
queue_other_w_l2_r = multiprocessing.Queue()
|
# l1
|
queue_l1_w_strategy_r = multiprocessing.Queue()
|
queue_l1_r_strategy_w = multiprocessing.Queue()
|
# l1交易
|
queue_l1_trade_w_strategy_r = multiprocessing.Queue()
|
queue_l1_trade_r_strategy_w = multiprocessing.Queue()
|
|
# 交易读策略写
|
queue_strategy_w_trade_r = multiprocessing.Queue()
|
queue_strategy_w_trade_r_for_read = multiprocessing.Queue()
|
# 策略读交易写
|
queue_strategy_r_trade_w = multiprocessing.Queue()
|
|
# 托管环境下不创建
|
# serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
|
# serverProcess.start()
|
logger_system.info("主进程ID:{}", os.getpid())
|
|
# L1订阅数据
|
l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
|
args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,))
|
l1Process.start()
|
|
l1TradeProcess = multiprocessing.Process(target=huaxin_client.l1_client_for_trade.run,
|
args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,))
|
l1TradeProcess.start()
|
|
# 交易进程
|
tradeProcess = multiprocessing.Process(
|
target=huaxin_client.trade_client.run,
|
args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read))
|
tradeProcess.start()
|
|
# 创建L2通信队列
|
order_queues = []
|
transaction_queues = []
|
market_queue = multiprocessing.Queue()
|
for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
|
order_queues.append(multiprocessing.Queue())
|
for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
|
transaction_queues.append(multiprocessing.Queue())
|
|
# L2
|
l2Process = multiprocessing.Process(
|
target=huaxin_client.l2_client.run,
|
args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue))
|
l2Process.start()
|
|
# 主进程
|
createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read,
|
order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
|
queue_l1_trade_w_strategy_r)
|
|
# 将tradeServer作为主进程
|
l1Process.join()
|
l2Process.join()
|
tradeProcess.join()
|
except Exception as e:
|
logging.exception(e)
|
logger_system.exception(e)
|