"""
|
GUI管理
|
"""
|
import math
|
|
import psutil
|
|
import constant
|
from code_attribute import gpcode_manager
|
from l2.subscript import l2_subscript_manager
|
from log_module import log
|
from log_module.log import logger_l2_trade, logger_system
|
import logging
|
import multiprocessing
|
import os
|
import threading
|
|
from task import task_manager
|
from third_data import hx_qc_value_util
|
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
|
from utils import shared_memery_util
|
|
logger_system.info("程序启动Pre:{}", os.getpid())
|
|
import huaxin_client.trade_client
|
import huaxin_client.l2_client
|
import huaxin_client.l1_client
|
from huaxin_client import l2_market_client, l2_client_v2
|
|
from servers import server_util, huaxin_trade_server, server
|
|
|
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
|
queue_l1_w_strategy_r_: multiprocessing.Queue,
|
queue_strategy_w_trade_r_: multiprocessing.Queue,
|
queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
|
trade_ipc_addr):
|
"""
|
策略进程
|
@param queue_strategy_r_trade_w_:
|
@param queue_l1_w_strategy_r_:
|
@param queue_strategy_w_trade_r_:
|
@param queue_strategy_w_trade_r_for_read_:
|
@param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
|
@return:
|
"""
|
logger_system.info("策略进程ID:{}", os.getpid())
|
log.close_print()
|
# 初始化参数
|
server.global_data_loader.init()
|
|
# 开启数据服务器
|
threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
|
|
# 运行数据监听服务
|
threading.Thread(target=task_manager.run_data_listener, name="task_manager",
|
args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
|
daemon=True).start()
|
#
|
# 启动华鑫交易服务
|
huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
|
queue_strategy_w_trade_r_for_read_,
|
trade_ipc_addr)
|
|
|
# 主服务
|
def createServer(pipe):
|
logger_system.info("create Server")
|
laddr = "", 9001
|
try:
|
tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
|
pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
|
|
|
if __name__ == '__main__1':
|
huaxin_client.l2_client.test()
|
|
|
def __create_l2_subscript():
|
channel_list = []
|
for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
|
# 创建委托/成交的共享数组和ZMQ通信通道
|
delegate_ipc_addr = f"ipc://order_{i}.ipc"
|
deal_ipc_addr = f"ipc://deal_{i}.ipc"
|
delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
|
delegate[0] = shared_memery_util.get_number(delegate[1])
|
deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
|
deal[0] = shared_memery_util.get_number(deal[1])
|
channel_list.append((delegate, deal))
|
|
# L2进程数量
|
l2_process_count = 8
|
|
base_channel_count = len(channel_list) // l2_process_count
|
left_count = len(channel_list) % l2_process_count
|
index = 0
|
# ======分组======
|
# 记录每个分组的数量
|
channel_count_list = []
|
# 数据回调队列
|
data_callback_queue_list = []
|
# 消息传递队列
|
sub_single_queue_list = []
|
|
for i in range(l2_process_count):
|
channel_count = base_channel_count + (1 if i < left_count else 0)
|
channel_count_list.append(channel_count)
|
# 该进程下的通道
|
channels = channel_list[index:index + channel_count]
|
index += channel_count
|
# 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
|
sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue()
|
sub_single_queue_list.append(sub_single_queue)
|
data_callback_queue_list.append(data_callback_queue)
|
l2_process = multiprocessing.Process(target=l2_client_v2.run,
|
args=(sub_single_queue, data_callback_queue, channels, i, ))
|
l2_process.start()
|
|
l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
|
# 监听L2市场行情数据
|
huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
|
# 启动ZMQserver,针对委托队列与成交队列进行监听
|
l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
|
|
|
if __name__ == '__main__':
|
# 可绑定16-31之间的核
|
try:
|
logger_l2_trade.info("启动程序")
|
logger_system.info("启动程序--------")
|
log.close_print()
|
|
# L2读其他写
|
queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000)
|
# l1
|
queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000)
|
queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000)
|
|
# 交易读策略写
|
queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000)
|
queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000)
|
# 策略读交易写
|
queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000)
|
|
# 下单,撤单ipc地址
|
order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
|
|
logger_system.info("主进程ID:{}", os.getpid())
|
|
fix_codes = set()
|
open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
|
if open_buy_codes:
|
fix_codes |= set(open_buy_codes)
|
# 要监控的高标
|
high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
|
if high_codes:
|
fix_codes |= set(high_codes)
|
# L1订阅数据
|
l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
|
args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
|
fix_codes,))
|
l1Process.start()
|
|
l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
|
args=(queue_l1_w_strategy_r,))
|
l2MarketProcess.start()
|
|
# 交易进程
|
tradeProcess = multiprocessing.Process(
|
target=huaxin_client.trade_client.run,
|
args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read))
|
tradeProcess.start()
|
# 此处将L2的进程与策略进程合并
|
|
# 测试L2单独进程
|
|
if True:
|
__create_l2_subscript()
|
else:
|
# 将L2的进程改为线程执行
|
threading.Thread(target=huaxin_client.l2_client.run, args=(
|
queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
|
|
# 运行华鑫增值服务进程
|
threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
|
|
# 绑核运行
|
# psutil.Process(l1Process.pid).cpu_affinity([20])
|
# psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
|
# cpu_indexes = [i for i in range(23, 30)]
|
# psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
|
# 主进程
|
run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read,
|
(order_ipc_addr, cancel_order_ipc_addr))
|
|
# 将tradeServer作为主进程
|
l1Process.join()
|
# l2Process.join()
|
tradeProcess.join()
|
except Exception as e:
|
logging.exception(e)
|
logger_system.exception(e)
|