import multiprocessing
|
import threading
|
import time
|
import zmq
|
|
|
def __create_server(context, index):
|
socket = context.socket(zmq.REP)
|
socket.bind(f"ipc://order_{index}.ipc")
|
print("绑定成功", index)
|
while True:
|
msg = socket.recv_json()
|
use_time = (time.time() - msg['time']) * 1000
|
print(f"Got Msg({index}-{time.time()}): 耗时:{use_time}")
|
socket.send_string("SUCCESS")
|
|
|
def create_server():
|
context = zmq.Context()
|
for i in range(70):
|
threading.Thread(target=lambda: __create_server(context, i), daemon=True).start()
|
while True:
|
time.sleep(5)
|
|
|
socket_client_dict = {}
|
|
|
def send_data(context, i):
|
if i not in socket_client_dict:
|
socket = context.socket(zmq.REQ)
|
socket.connect(f"ipc://order_{i}.ipc")
|
socket_client_dict[i] = socket
|
print(f"开始发送消息({i}):")
|
datas = []
|
max_count = 500
|
if i % 3 == 0:
|
max_count = 100
|
for j in range(max_count):
|
datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0))
|
|
socket_client_dict[i].send_json({'data': datas, "time": time.time()})
|
response = socket_client_dict[i].recv_string()
|
|
|
if __name__ == "__main__1":
|
|
serverProcess = multiprocessing.Process(
|
target=create_server)
|
serverProcess.start()
|
|
time.sleep(5)
|
context = zmq.Context()
|
while True:
|
print("=======准备发送消息========")
|
for i in range(70):
|
threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
|
time.sleep(5)
|
input()
|
|
|
# L2上传数据协议管理器
|
class L2DataUploadProtocolManager:
|
|
# ipchosts IPC协议
|
def __init__(self, ipchosts):
|
self.ipchosts = ipchosts
|
# 所有的client
|
self.socket_client_dict = {}
|
# 保存代码分配的client 格式:{code:(host, socket)}
|
self.code_socket_client_dict = {}
|
self.rlock = threading.RLock()
|
context = zmq.Context()
|
for host in self.ipchosts:
|
socket = context.socket(zmq.REQ)
|
socket.connect(host)
|
self.socket_client_dict[host] = socket
|
|
# 获取
|
def __get_available_ipchost(self):
|
if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
|
raise Exception("无可用host")
|
used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
|
for host in self.socket_client_dict:
|
if host not in used_hosts:
|
return host, self.socket_client_dict[host]
|
raise Exception("无可用host")
|
|
# 分配HOST
|
def distribute_upload_host(self, code):
|
self.rlock.acquire()
|
try:
|
host_info = self.__get_available_ipchost()
|
if host_info:
|
self.code_socket_client_dict[code] = host_info
|
finally:
|
self.rlock.release()
|
|
def release_distributed_upload_host(self, code):
|
self.rlock.acquire()
|
try:
|
if code in self.code_socket_client_dict:
|
self.code_socket_client_dict.pop(code)
|
finally:
|
self.rlock.release()
|
|
def upload_data_as_json(self, code, data):
|
if code not in self.code_socket_client_dict:
|
raise Exception("尚未分配host")
|
host, socket = self.code_socket_client_dict[code]
|
socket.send_json(data)
|
socket.recv_string()
|
|
|
if __name__ == "__main__":
|
ipclist = []
|
for i in range(0, 70):
|
ipclist.append(f"ipc://l2order{i}.ipc")
|
manager = L2DataUploadProtocolManager(ipclist)
|
code = "000333"
|
# for i in range(0, 70):
|
# manager.distribute_upload_host(f"{i}" * 6)
|
# manager.distribute_upload_host(code)
|
manager.upload_data_as_json(code, {"test": "test"})
|