Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
test/test_pyzmq.py
@@ -1,8 +1,6 @@
import json
import multiprocessing
import threading
import time
import zmq
@@ -34,15 +32,18 @@
        socket.connect(f"ipc://order_{i}.ipc")
        socket_client_dict[i] = socket
    print(f"开始发送消息({i}):")
    msg = "test"
    for i in range(1000):
        msg += "test123123"
    datas = []
    max_count = 500
    if i % 3 == 0:
        max_count = 100
    for j in range(max_count):
        datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0))
    socket_client_dict[i].send_json({'msg': msg, "time": time.time()})
    socket_client_dict[i].send_json({'data': datas, "time": time.time()})
    response = socket_client_dict[i].recv_string()
if __name__ == "__main__":
if __name__ == "__main__1":
    serverProcess = multiprocessing.Process(
        target=create_server)
@@ -56,3 +57,68 @@
            threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
        time.sleep(5)
    input()
# L2上传数据协议管理器
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
    def __init__(self, ipchosts):
        self.ipchosts = ipchosts
        # 所有的client
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)
            self.socket_client_dict[host] = socket
    # 获取
    def __get_available_ipchost(self):
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send_json(data)
        socket.recv_string()
if __name__ == "__main__":
    ipclist = []
    for i in range(0, 70):
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    # for i in range(0, 70):
    #     manager.distribute_upload_host(f"{i}" * 6)
    # manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})