Administrator
2024-02-27 b86a707eb861bf746783f20e197cba40bed0b978
zeromq集成测试
1个文件已修改
8 ■■■■ 已修改文件
test/test_pyzmq.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_pyzmq.py
@@ -1,3 +1,4 @@
import json
import multiprocessing
import threading
import time
@@ -10,7 +11,7 @@
    socket.bind(f"ipc://order_{index}.ipc")
    print("绑定成功", index)
    while True:
        msg = socket.recv_json()
        msg = socket.recv_string()
        print(f"Got Msg({index}): {msg}")
        socket.send_string("SUCCESS")
@@ -27,11 +28,10 @@
def send_data(context, i):
    if i not in socket_client_dict:
        socket = context.socket(zmq.REQ)
        result = socket.connect(f"ipc://order_{i}.ipc")
        print("连接结果:", result)
        socket.connect(f"ipc://order_{i}.ipc")
        socket_client_dict[i] = socket
    socket_client_dict[i].send_json({'msg': i})
    socket_client_dict[i].send_string(json.dumps({'msg': i}))
    response = socket_client_dict[i].recv_string()