Administrator
2025-04-22 2276eff74f91a07c6a5abd34e295aa83ca3a676e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
GUI管理
"""
import math
 
import psutil
 
import constant
from code_attribute import gpcode_manager
from l2.subscript import l2_subscript_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
import multiprocessing
import os
import threading
 
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from utils import shared_memery_util
 
logger_system.info("程序启动Pre:{}", os.getpid())
 
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client, l2_client_v2
 
from servers import server_util, huaxin_trade_server, server
 
 
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
                 queue_l1_w_strategy_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
                 trade_ipc_addr):
    """
    策略进程
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    server.global_data_loader.init()
 
    # 开启数据服务器
    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
 
    # 运行数据监听服务
    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
                     daemon=True).start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            trade_ipc_addr)
 
 
# 主服务
def createServer(pipe):
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
                                                pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
 
 
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
 
 
def __create_l2_subscript():
    channel_list = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        # 创建委托/成交的共享数组和ZMQ通信通道
        delegate_ipc_addr = f"ipc://order_{i}.ipc"
        deal_ipc_addr = f"ipc://deal_{i}.ipc"
        delegate = [0, shared_memery_util.create_array(), delegate_ipc_addr]
        delegate[0] = shared_memery_util.get_number(delegate[1])
        deal = [0, shared_memery_util.create_array(), deal_ipc_addr]
        deal[0] = shared_memery_util.get_number(deal[1])
        channel_list.append((delegate, deal))
 
    # L2进程数量
    l2_process_count = 8
 
    base_channel_count = len(channel_list) // l2_process_count
    left_count = len(channel_list) % l2_process_count
    index = 0
    # ======分组======
    # 记录每个分组的数量
    channel_count_list = []
    # 数据回调队列
    data_callback_queue_list = []
    # 消息传递队列
    sub_single_queue_list = []
 
    for i in range(l2_process_count):
        channel_count = base_channel_count + (1 if i < left_count else 0)
        channel_count_list.append(channel_count)
        # 该进程下的通道
        channels = channel_list[index:index + channel_count]
        index += channel_count
        # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调)
        sub_single_queue, data_callback_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
        sub_single_queue_list.append(sub_single_queue)
        data_callback_queue_list.append(data_callback_queue)
        l2_process = multiprocessing.Process(target=l2_client_v2.run,
                                             args=(sub_single_queue, data_callback_queue, channels, i, ))
        l2_process.start()
 
    l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list)
    # 监听L2市场行情数据
    huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list)
    # 启动ZMQserver,针对委托队列与成交队列进行监听
    l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback)
 
 
if __name__ == '__main__':
    # 可绑定16-31之间的核
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
 
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000)
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000)
        queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000)
 
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000)
        queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000)
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000)
 
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
 
        logger_system.info("主进程ID:{}", os.getpid())
 
        fix_codes = set()
        open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if open_buy_codes:
            fix_codes |= set(open_buy_codes)
        # 要监控的高标
        high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
        if high_codes:
            fix_codes |= set(high_codes)
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  fix_codes,))
        l1Process.start()
 
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                                  args=(queue_l1_w_strategy_r,))
        l2MarketProcess.start()
 
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
 
        # 测试L2单独进程
 
        if constant.IS_L2_NEW:
            __create_l2_subscript()
        else:
            # 将L2的进程改为线程执行
            threading.Thread(target=huaxin_client.l2_client.run, args=(
                queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
 
        # 运行华鑫增值服务进程
        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
 
        # 绑核运行
        # psutil.Process(l1Process.pid).cpu_affinity([20])
        # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
        # cpu_indexes = [i for i in range(23, 30)]
        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
        # 主进程
        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                     queue_strategy_w_trade_r_for_read,
                     (order_ipc_addr, cancel_order_ipc_addr))
 
        # 将tradeServer作为主进程
        l1Process.join()
        # l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
        logger_system.exception(e)