Administrator
2024-04-18 2811588dbd78314051c60c5c53cfefdd12dddd08
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import multiprocessing
import threading
import time
 
import zmq
 
import constant
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
 
 
class L2Strategy:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def rec_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(delegate_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔委托数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except:
                pass
            finally:
                socket.send_string("SUCCESS")
 
    def rec_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(deal_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔成交数据: 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except:
                pass
            finally:
                socket.send_string("SUCCESS")
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.rec_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
            threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
 
 
class L2DataCollector:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def send_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(delegate_ipc_addr)
        while True:
            try:
                datas = [('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except:
                pass
            finally:
                time.sleep(10)
 
    def send_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(deal_ipc_addr)
        while True:
            try:
                datas = [('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except:
                pass
            finally:
                time.sleep(10)
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.send_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,),
                             daemon=True).start()
            threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
 
 
if __name__ == "__main__":
    channel_count = 100
    params = []
    for i in range(0, channel_count):
        delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        deal_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        param = L2ChannelCommunicationParams(
            delegate_ipc_addr=f"ipc://order_{i}.ipc",
            deal_ipc_addr=f"ipc://deal_{i}.ipc",
            delegate_data_shared_memory=delegate_data_array,
            deal_data_shared_memory=deal_data_array)
        if constant.is_windows():
            param.delegate_ipc_addr = f"tcp://*:{10000 + i}"
            param.deal_ipc_addr = f"tcp://*:{11000 + i}"
        params.append(param)
    strategy = L2Strategy(params)
    dataCollector = L2DataCollector(params)
    strategyProcess = multiprocessing.Process(target=strategy.run, args=())
    l2Process = multiprocessing.Process(target=dataCollector.run, args=())
    strategyProcess.start()
    l2Process.start()
    while True:
        time.sleep(100)