import multiprocessing import threading import time import zmq def __create_server(context, index): socket = context.socket(zmq.REP) socket.bind(f"ipc://order_{index}.ipc") print("绑定成功", index) while True: msg = socket.recv_json() use_time = (time.time() - msg['time']) * 1000 print(f"Got Msg({index}-{time.time()}): 耗时:{use_time}") socket.send_string("SUCCESS") def create_server(): context = zmq.Context() for i in range(70): threading.Thread(target=lambda: __create_server(context, i), daemon=True).start() while True: time.sleep(5) socket_client_dict = {} def send_data(context, i): if i not in socket_client_dict: socket = context.socket(zmq.REQ) socket.connect(f"ipc://order_{i}.ipc") socket_client_dict[i] = socket print(f"开始发送消息({i}):") datas = [] max_count = 500 if i % 3 == 0: max_count = 100 for j in range(max_count): datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0)) socket_client_dict[i].send_json({'data': datas, "time": time.time()}) response = socket_client_dict[i].recv_string() if __name__ == "__main__1": serverProcess = multiprocessing.Process( target=create_server) serverProcess.start() time.sleep(5) context = zmq.Context() while True: print("=======准备发送消息========") for i in range(70): threading.Thread(target=lambda: send_data(context, i), daemon=True).start() time.sleep(5) input() # L2上传数据协议管理器 class L2DataUploadProtocolManager: # ipchosts IPC协议 def __init__(self, ipchosts): self.ipchosts = ipchosts # 所有的client self.socket_client_dict = {} # 保存代码分配的client 格式:{code:(host, socket)} self.code_socket_client_dict = {} self.rlock = threading.RLock() context = zmq.Context() for host in self.ipchosts: socket = context.socket(zmq.REQ) socket.connect(host) self.socket_client_dict[host] = socket # 获取 def __get_available_ipchost(self): if len(self.code_socket_client_dict) >= len(self.socket_client_dict): raise Exception("无可用host") used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) for host in self.socket_client_dict: if host not in used_hosts: return host, self.socket_client_dict[host] raise Exception("无可用host") # 分配HOST def distribute_upload_host(self, code): self.rlock.acquire() try: host_info = self.__get_available_ipchost() if host_info: self.code_socket_client_dict[code] = host_info finally: self.rlock.release() def release_distributed_upload_host(self, code): self.rlock.acquire() try: if code in self.code_socket_client_dict: self.code_socket_client_dict.pop(code) finally: self.rlock.release() def upload_data_as_json(self, code, data): if code not in self.code_socket_client_dict: raise Exception("尚未分配host") host, socket = self.code_socket_client_dict[code] socket.send_json(data) socket.recv_string() if __name__ == "__main__": ipclist = [] for i in range(0, 70): ipclist.append(f"ipc://l2order{i}.ipc") manager = L2DataUploadProtocolManager(ipclist) code = "000333" # for i in range(0, 70): # manager.distribute_upload_host(f"{i}" * 6) # manager.distribute_upload_host(code) manager.upload_data_as_json(code, {"test": "test"})