Administrator
2024-02-27 70dd500bca31be56b2c2c98a2f9b967f001e9153
zeromq集成测试
1个文件已添加
48 ■■■■■ 已修改文件
test/test_pyzmq.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_pyzmq.py
New file
@@ -0,0 +1,48 @@
import multiprocessing
import threading
import time
import zmq
def __create_server(context, port):
    socket = context.socket(zmq.REP)
    socket.bind(f"ipc://order{port}.ipc")
    while True:
        msg = socket.recv_json()
        print(f"Got Msg({port}): {msg}")
        socket.send_string("SUCCESS")
def create_server():
    context = zmq.Context()
    for i in range(50):
        threading.Thread(target=lambda: __create_server(context, 10000 + i), daemon=True).start()
socket_client_dict = {}
def send_data(context, i):
    if i not in socket_client_dict:
        socket = context.socket(zmq.REQ)
        socket.connect(f"ipc://order{10000 + i}.ipc")
        socket_client_dict[i] = socket
    socket_client_dict[i].send_json({'msg': i})
    response = socket_client_dict[i].recv_string()
if __name__ == "__main__":
    serverProcess = multiprocessing.Process(
        target=create_server)
    serverProcess.start()
    time.sleep(5)
    context = zmq.Context()
    while True:
        for i in range(50):
            threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
        time.sleep(5)
    input()