Administrator
2023-09-06 3784c8cc817beca104630d6a1e2c2d3fefa44e52
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
GUI管理
"""
 
import multiprocessing
import os
import sys
 
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
 
from server import *
 
# 交易服务
from third_data import data_server
from trade.huaxin import trade_server, trade_api_server
 
 
# from huaxin_api import trade_client, l2_client, l1_client
 
 
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    global_data_loader.init()
 
    # 数据服务
    t1 = threading.Thread(target=createDataServer, daemon=True)
    t1.start()
 
    # 交易接口服务
    t1 = threading.Thread(target=trade_api_server.run, args=(pipe_server, pipe_l2), daemon=True)
    t1.start()
 
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, daemon=True)
    t1.start()
 
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
 
    # 启动华鑫交易服务
    t1 = threading.Thread(target=huaxin_client.trade_client.run,
                          args=(trade_server.my_trade_response, ptl2_trade, pst_trade),
                          daemon=True)
    t1.start()
 
    # 交易服务
    trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd)
 
 
# 主服务
def createServer(pipe):
    logger_system.info("create Server")
    laddr = "", 9001
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
 
 
def createDataServer():
    logger_system.info("create DataServer")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{9004} 启动失败")
 
 
if __name__ == '__main__':
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 策略与交易间的通信
        pst_trade, pst_strategy = multiprocessing.Pipe()
        # 交易与l2之间的通信
        ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
 
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
 
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
 
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process.start()
 
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, pst_trade)
 
        # 将tradeServer作为主进程
        l1Process.join()
    except Exception as e:
        logger_system.exception(e)