Administrator
2025-03-06 3ff0120707ada32fb25012e1d0e8bc9a5c3df07c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
GUI管理
"""
import psutil
 
from code_attribute import gpcode_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
import multiprocessing
import os
import threading
 
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
 
logger_system.info("程序启动Pre:{}", os.getpid())
 
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
 
from servers import server_util, huaxin_trade_server, server
 
 
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
                 queue_l1_w_strategy_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
                 trade_ipc_addr):
    """
    策略进程
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    server.global_data_loader.init()
 
    # 开启数据服务器
    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
 
    # 运行数据监听服务
    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
                     daemon=True).start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            trade_ipc_addr)
 
 
# 主服务
def createServer(pipe):
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
                                                pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
 
 
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
 
if __name__ == '__main__':
    # 可绑定16-31之间的核
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
 
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue(maxsize=1000)
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue(maxsize=1000)
        queue_l1_r_strategy_w = multiprocessing.Queue(maxsize=1000)
 
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue(maxsize=1000)
        queue_strategy_w_trade_r_for_read = multiprocessing.Queue(maxsize=1000)
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue(maxsize=1000)
 
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
 
        logger_system.info("主进程ID:{}", os.getpid())
 
        fix_codes = set()
        open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if open_buy_codes:
            fix_codes |= set(open_buy_codes)
        # 要监控的高标
        high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
        if high_codes:
            fix_codes |= set(high_codes)
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  fix_codes,))
        l1Process.start()
 
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                                  args=(queue_l1_w_strategy_r,))
        l2MarketProcess.start()
 
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
        # L2
        # l2Process = multiprocessing.Process(
        #     target=huaxin_client.l2_client.run,
        #     args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback))
        # l2Process.start()
        # 将L2的进程改为线程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
 
        # 运行华鑫增值服务进程
        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
 
        # 绑核运行
        # psutil.Process(l1Process.pid).cpu_affinity([20])
        # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
        # cpu_indexes = [i for i in range(23, 30)]
        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
        # 主进程
        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                     queue_strategy_w_trade_r_for_read,
                     (order_ipc_addr, cancel_order_ipc_addr))
 
        # 将tradeServer作为主进程
        l1Process.join()
        # l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
        logger_system.exception(e)