Administrator
2025-06-05 48e24816c88f722e40b43187fa63a8334196f5ae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import multiprocessing
import threading
import time
import zmq
 
 
def __create_server(context, index):
    socket = context.socket(zmq.REP)
    socket.bind(f"ipc://order_{index}.ipc")
    print("绑定成功", index)
    while True:
        msg = socket.recv_json()
        use_time = (time.time() - msg['time']) * 1000
        print(f"Got Msg({index}-{time.time()}): 耗时:{use_time}")
        socket.send_string("SUCCESS")
 
 
def create_server():
    context = zmq.Context()
    for i in range(70):
        threading.Thread(target=lambda: __create_server(context, i), daemon=True).start()
    while True:
        time.sleep(5)
 
 
socket_client_dict = {}
 
 
def send_data(context, i):
    if i not in socket_client_dict:
        socket = context.socket(zmq.REQ)
        socket.connect(f"ipc://order_{i}.ipc")
        socket_client_dict[i] = socket
    print(f"开始发送消息({i}):")
    datas = []
    max_count = 500
    if i % 3 == 0:
        max_count = 100
    for j in range(max_count):
        datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0))
 
    socket_client_dict[i].send_json({'data': datas, "time": time.time()})
    response = socket_client_dict[i].recv_string()
 
 
if __name__ == "__main__1":
 
    serverProcess = multiprocessing.Process(
        target=create_server)
    serverProcess.start()
 
    time.sleep(5)
    context = zmq.Context()
    while True:
        print("=======准备发送消息========")
        for i in range(70):
            threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
        time.sleep(5)
    input()
 
 
# L2上传数据协议管理器
class L2DataUploadProtocolManager:
 
    # ipchosts IPC协议
    def __init__(self, ipchosts):
        self.ipchosts = ipchosts
        # 所有的client
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
            self.socket_client_dict[host] = socket
 
    # 获取
    def __get_available_ipchost(self):
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
 
    # 分配HOST
    def distribute_upload_host(self, code):
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.rlock.release()
 
    def release_distributed_upload_host(self, code):
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.rlock.release()
 
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send_json(data)
        socket.recv_string()
 
 
if __name__ == "__main__":
    ipclist = []
    for i in range(0, 70):
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    # for i in range(0, 70):
    #     manager.distribute_upload_host(f"{i}" * 6)
    # manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})