Administrator
2024-04-18 eb106c82e31fd1c68bf7c95156940d07ea04b9d6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import logging
import multiprocessing
import threading
import time
 
import zmq
 
import constant
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
 
 
class L2Strategy:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def rec_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(delegate_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔委托数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except Exception as e:
                logging.exception(e)
            finally:
                socket.send_string("SUCCESS")
 
    def rec_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(deal_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔成交数据:  数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
            except Exception as e:
                logging.exception(e)
            finally:
                socket.send_string("SUCCESS")
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.rec_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
            threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
 
 
class L2DataCollector:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def send_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(delegate_ipc_addr)
        while True:
            try:
                print("发送委托数据")
                datas = [('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except Exception as e:
                logging.exception(e)
            finally:
                time.sleep(10)
 
    def send_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(deal_ipc_addr)
        while True:
            try:
                datas = [('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100]
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except Exception as e:
                logging.exception(e)
            finally:
                time.sleep(10)
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.send_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,),
                             daemon=True).start()
            threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
 
 
if __name__ == "__main__":
    channel_count = 100
    params = []
    for i in range(0, channel_count):
        delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        deal_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        param = L2ChannelCommunicationParams(
            delegate_ipc_addr=f"ipc://order_{i}.ipc",
            deal_ipc_addr=f"ipc://deal_{i}.ipc",
            delegate_data_shared_memory=delegate_data_array,
            deal_data_shared_memory=deal_data_array)
        if constant.is_windows():
            param.delegate_ipc_addr = f"tcp://127.0.0.1:{15000 + i}"
            param.deal_ipc_addr = f"tcp://127.0.0.1:{16000 + i}"
        params.append(param)
    strategy = L2Strategy(params)
    dataCollector = L2DataCollector(params)
    strategyProcess = multiprocessing.Process(target=strategy.run, args=())
    l2Process = multiprocessing.Process(target=dataCollector.run, args=())
    strategyProcess.start()
    l2Process.start()
    while True:
        time.sleep(100)