Administrator
2024-06-25 42e65f7d1d763c48c26a71d5f461f52255b70f67
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
GUI管理
"""
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
import multiprocessing
import os
import threading
 
logger_system.info("程序启动Pre:{}", os.getpid())
 
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
 
 
import server
 
# 交易服务
from third_data import data_server
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
 
# from huaxin_api import trade_client, l2_client, l1_client
from utils import tool
 
 
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
    """
    策略进程
    @param pipe_server:
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    server.global_data_loader.init()
 
    # # 数据服务
    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
 
 
# 主服务
def createServer(pipe):
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle,
                                                pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
 
 
def createDataServer():
    logger_system.info("create DataServer")
    logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{9004} 启动失败")
 
 
if __name__ == '__main__1':
    huaxin_client.l2_client.test()
 
if __name__ == '__main__':
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
 
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
 
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
        queue_strategy_w_trade_r_for_read = multiprocessing.Queue()
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
 
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
 
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
 
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,))
        l1Process.start()
 
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                                  args=(queue_l1_w_strategy_r,))
        l2MarketProcess.start()
 
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
            args=(order_ipc_addr, cancel_order_ipc_addr, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 此处将L2的进程与策略进程合并
        # L2
        # l2Process = multiprocessing.Process(
        #     target=huaxin_client.l2_client.run,
        #     args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback))
        # l2Process.start()
        # 将L2的进程改为进程执行
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
 
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
 
        # 将tradeServer作为主进程
        l1Process.join()
        # l2Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
        logger_system.exception(e)