| | |
| | | """ |
| | | GUI管理 |
| | | """ |
| | | import math |
| | | |
| | | import psutil |
| | | |
| | | import constant |
| | | from code_attribute import gpcode_manager |
| | | from l2.subscript import l2_subscript_manager |
| | | from log_module import log |
| | | from log_module.log import logger_l2_trade, logger_system |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import sys |
| | | import threading |
| | | |
| | | from db import redis_manager_delegate as redis_manager |
| | | from task import task_manager |
| | | from third_data import hx_qc_value_util |
| | | from third_data.code_plate_key_manager import KPLPlateForbiddenManager |
| | | from utils import shared_memery_util |
| | | |
| | | logger_system.info("程序启动Pre:{}", os.getpid()) |
| | | |
| | | import huaxin_client.trade_client |
| | | import huaxin_client.l2_client |
| | | import huaxin_client.l1_client |
| | | from log_module import log |
| | | from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1 |
| | | from huaxin_client import l2_market_client, l2_client_v2 |
| | | |
| | | from server import * |
| | | |
| | | # 交易服务 |
| | | from third_data import data_server |
| | | from trade.huaxin import trade_server, trade_api_server |
| | | from servers import server_util, huaxin_trade_server, server |
| | | |
| | | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade): |
| | | def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, |
| | | trade_ipc_addr): |
| | | """ |
| | | 策略进程 |
| | | @param queue_strategy_r_trade_w_: |
| | | @param queue_l1_w_strategy_r_: |
| | | @param queue_strategy_w_trade_r_: |
| | | @param queue_strategy_w_trade_r_for_read_: |
| | | @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) |
| | | @return: |
| | | """ |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | | global_data_loader.init() |
| | | server.global_data_loader.init() |
| | | |
| | | # # 数据服务 |
| | | t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True) |
| | | t1.start() |
| | | # |
| | | # 交易接口服务 |
| | | t1 = threading.Thread(target=trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), |
| | | daemon=True) |
| | | t1.start() |
| | | # |
| | | # redis后台服务 |
| | | t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) |
| | | t1.start() |
| | | # |
| | | # 启动L2订阅服务 |
| | | t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", |
| | | args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), |
| | | daemon=True) |
| | | t1.start() |
| | | # 开启数据服务器 |
| | | threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() |
| | | |
| | | # 运行数据监听服务 |
| | | threading.Thread(target=task_manager.run_data_listener, name="task_manager", |
| | | args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), |
| | | daemon=True).start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd) |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, |
| | | trade_ipc_addr) |
| | | |
| | | |
| | | # 主服务 |
| | | def createServer(pipe): |
| | | logger_system.info("create Server") |
| | | laddr = "", 9001 |
| | | tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle |
| | | try: |
| | | tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, |
| | | pipe_trade=pipe) # 注意:参数是MyBaseRequestHandle |
| | | tcpserver.serve_forever() |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | logger_system.error(f"端口服务器:{laddr[1]} 启动失败") |
| | | |
| | | |
| | | def createDataServer(): |
| | | logger_system.info("create DataServer") |
| | | logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}") |
| | | tcpserver = data_server.run("", 9004) |
| | | tcpserver.serve_forever() |
| | | try: |
| | | tcpserver.serve_forever() |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | logger_system.error(f"端口服务器:{9004} 启动失败") |
| | | if __name__ == '__main__1': |
| | | huaxin_client.l2_client.test() |
| | | |
| | | |
| | | def __create_l2_subscript(): |
| | | channel_list = [] |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | # 创建委托/成交的共享数组和ZMQ通信通道 |
| | | delegate_ipc_addr = f"ipc://order_{i}.ipc" |
| | | deal_ipc_addr = f"ipc://deal_{i}.ipc" |
| | | delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr] |
| | | delegate[0] = shared_memery_util.get_number(delegate[1]) |
| | | deal = [0, shared_memery_util.create_array(), deal_ipc_addr] |
| | | deal[0] = shared_memery_util.get_number(deal[1]) |
| | | channel_list.append((delegate, deal)) |
| | | |
| | | # L2进程数量 |
| | | l2_process_count = 8 |
| | | |
| | | base_channel_count = len(channel_list) // l2_process_count |
| | | left_count = len(channel_list) % l2_process_count |
| | | index = 0 |
| | | # ======分组====== |
| | | # 记录每个分组的数量 |
| | | channel_count_list = [] |
| | | # 数据回调队列 |
| | | data_callback_queue_list = [] |
| | | # 消息传递队列 |
| | | sub_single_queue_list = [] |
| | | |
| | | for i in range(l2_process_count): |
| | | channel_count = base_channel_count + (1 if i < left_count else 0) |
| | | channel_count_list.append(channel_count) |
| | | # 该进程下的通道 |
| | | channels = channel_list[index:index + channel_count] |
| | | index += channel_count |
| | | # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调) |
| | | sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024) |
| | | sub_single_queue_list.append(sub_single_queue) |
| | | data_callback_queue_list.append(data_callback_queue) |
| | | l2_process = multiprocessing.Process(target=l2_client_v2.run, |
| | | args=(sub_single_queue, data_callback_queue, channels, i, )) |
| | | l2_process.start() |
| | | |
| | | l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list) |
| | | # 监听L2市场行情数据 |
| | | huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list) |
| | | # 启动ZMQserver,针对委托队列与成交队列进行监听 |
| | | l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | # 可绑定16-31之间的核 |
| | | try: |
| | | logger_l2_trade.info("启动程序") |
| | | logger_system.info("启动程序--------") |
| | | log.close_print() |
| | | # 策略与server间的通信 |
| | | pss_server, pss_strategy = multiprocessing.Pipe() |
| | | # 策略与交易间的通信 |
| | | pst_trade, pst_strategy = multiprocessing.Pipe() |
| | | # 交易与l2之间的通信 |
| | | ptl2_trade, ptl2_l2 = multiprocessing.Pipe() |
| | | # 策略与l2之间的通信 |
| | | psl2_strategy, psl2_l2 = multiprocessing.Pipe() |
| | | |
| | | # l1与策略间的通信 |
| | | pl1t_l1, pl1t_strategy = multiprocessing.Pipe() |
| | | # L2读其他写 |
| | | queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000) |
| | | # l1 |
| | | queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000) |
| | | queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000) |
| | | |
| | | # 托管环境下不创建 |
| | | # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) |
| | | # serverProcess.start() |
| | | # 交易读策略写 |
| | | queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000) |
| | | queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000) |
| | | # 策略读交易写 |
| | | queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000) |
| | | |
| | | # 下单,撤单ipc地址 |
| | | order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" |
| | | |
| | | logger_system.info("主进程ID:{}", os.getpid()) |
| | | |
| | | fix_codes = set() |
| | | open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() |
| | | if open_buy_codes: |
| | | fix_codes |= set(open_buy_codes) |
| | | # 要监控的高标 |
| | | high_codes = KPLPlateForbiddenManager().get_watch_high_codes() |
| | | if high_codes: |
| | | fix_codes |= set(high_codes) |
| | | # L1订阅数据 |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, |
| | | args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w, |
| | | fix_codes,)) |
| | | l1Process.start() |
| | | |
| | | l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, |
| | | args=(queue_l1_w_strategy_r,)) |
| | | l2MarketProcess.start() |
| | | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade)) |
| | | target=huaxin_client.trade_client.run, |
| | | args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read)) |
| | | tradeProcess.start() |
| | | # 此处将L2的进程与策略进程合并 |
| | | |
| | | # 测试L2单独进程 |
| | | |
| | | if constant.IS_L2_NEW: |
| | | __create_l2_subscript() |
| | | else: |
| | | # 将L2的进程改为线程执行 |
| | | threading.Thread(target=huaxin_client.l2_client.run, args=( |
| | | queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() |
| | | |
| | | # 运行华鑫增值服务进程 |
| | | threading.Thread(target=hx_qc_value_util.run, daemon=True).start() |
| | | |
| | | # 绑核运行 |
| | | # psutil.Process(l1Process.pid).cpu_affinity([20]) |
| | | # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22]) |
| | | # cpu_indexes = [i for i in range(23, 30)] |
| | | # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, |
| | | pst_trade) |
| | | run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, |
| | | (order_ipc_addr, cancel_order_ipc_addr)) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | | # l2Process.join() |
| | | tradeProcess.join() |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_system.exception(e) |