Administrator
2024-02-27 6980afa11e80996b2155884a77788faf7d78c484
zeromq集成测试
1个文件已修改
25 ■■■■■ 已修改文件
test/test_pyzmq.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_pyzmq.py
@@ -69,7 +69,7 @@
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.lock = threading.RLock()
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
@@ -78,35 +78,32 @@
    # 获取
    def __get_available_ipchost(self):
        try:
            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
                raise Exception("无可用host")
            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
            for host in self.socket_client_dict:
                if host not in used_hosts:
                    return host, self.socket_client_dict[host]
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        finally:
            self.lock.release()
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        print("锁住")
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.lock.release()
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.lock.release()
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict: