Administrator
2024-05-09 77fea830c2e11f94c364a37b8bda792a51d11eec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import logging
import multiprocessing
import random
import threading
import time
 
import zmq
 
import constant
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
from log_module.log import logger_debug
 
 
class L2Strategy:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def rec_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(delegate_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔委托数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
                if use_time > 1:
                    logger_debug.info(f"委托通信耗时:{use_time}")
            except Exception as e:
                logging.exception(e)
            finally:
                socket.send_string("SUCCESS")
 
    def rec_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REP)
        socket.bind(deal_ipc_addr)
        while True:
            msg = socket.recv_json()
            try:
                use_time = (time.time() - msg['time']) * 1000
                decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
                print(f"获取到逐笔成交数据:  数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
                if use_time > 1:
                    logger_debug.info(f"成交通信耗时:{use_time}")
            except Exception as e:
                logging.exception(e)
            finally:
                socket.send_string("SUCCESS")
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.rec_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
            threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
        while True:
            time.sleep(100)
 
 
class L2DataCollector:
 
    def __init__(self, commu_params):
        self.context = zmq.Context()
        self.commu_params = commu_params
 
    def send_delegate_data(self, delegate_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(delegate_ipc_addr)
        while True:
            try:
                datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
                L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except Exception as e:
                logging.exception(e)
            finally:
                t = random.randint(0, 100)
                time.sleep(t / 1000)
 
    def send_deal_data(self, deal_ipc_addr, shared_memory):
        socket = self.context.socket(zmq.REQ)
        socket.connect(deal_ipc_addr)
        while True:
            try:
                datas = ('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 150
                use_time = L2SharedMemoryDataUtil.set_data(datas, shared_memory)
                if use_time > 0.0005:
                    logger_debug.info(f"数据装载耗时:{use_time}")
 
                socket.send_json({'data': [], "time": time.time()})
                response = socket.recv_string()
            except Exception as e:
                logging.exception(e)
            finally:
                t = random.randint(0, 100)
                time.sleep(t / 1000)
 
    def run(self):
        for param in self.commu_params:
            threading.Thread(target=self.send_delegate_data,
                             args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,),
                             daemon=True).start()
            threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
                             daemon=True).start()
        while True:
            time.sleep(100)
 
 
if __name__ == "__main__":
    channel_count = 100
    params = []
    for i in range(0, channel_count):
        delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        deal_data_array = L2SharedMemoryDataUtil.create_shared_memory()
        param = L2ChannelCommunicationParams(
            delegate_ipc_addr=f"ipc://order_{i}.ipc",
            deal_ipc_addr=f"ipc://deal_{i}.ipc",
            delegate_data_shared_memory=delegate_data_array,
            deal_data_shared_memory=deal_data_array)
        if constant.is_windows():
            param.delegate_ipc_addr = f"tcp://127.0.0.1:{15000 + i}"
            param.deal_ipc_addr = f"tcp://127.0.0.1:{16000 + i}"
        params.append(param)
    strategy = L2Strategy(params)
    dataCollector = L2DataCollector(params)
    strategyProcess = multiprocessing.Process(target=strategy.run, args=())
    l2Process = multiprocessing.Process(target=dataCollector.run, args=())
    strategyProcess.start()
    l2Process.start()
    while True:
        time.sleep(100)
 
if __name__ == "__main__1":
    datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
    print(datas)