Administrator
2023-10-24 fab36a8ee3f526175e4058eb5aa154ece58508c4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
GUI管理
"""
 
import multiprocessing
import os
 
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
 
from server import *
 
# 交易服务
from third_data import data_server
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
 
 
# from huaxin_api import trade_client, l2_client, l1_client
 
 
def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    global_data_loader.init()
 
    # # 数据服务
    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
    t1.start()
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r)
 
 
# 主服务
def createServer(pipe):
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
 
 
def createDataServer():
    logger_system.info("create DataServer")
    logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{9004} 启动失败")
 
 
if __name__ == '__main__':
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
 
        # 交易写L2读
        queue_trade_w_l2_r = multiprocessing.Queue()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
 
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
 
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
 
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
 
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process.start()
 
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
        tradeProcess.start()
 
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
 
        # 将tradeServer作为主进程
        l1Process.join()
        tradeProcess.join()
    except Exception as e:
        logging.exception(e)
        logger_system.exception(e)