Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
test/test_pyzmq.py
@@ -69,7 +69,7 @@
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.lock = threading.Lock()
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
@@ -78,34 +78,31 @@
    # 获取
    def __get_available_ipchost(self):
        try:
            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
                raise Exception("无可用host")
            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
            for host in self.socket_client_dict:
                if host not in used_hosts:
                    return host, self.socket_client_dict[host]
        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
            raise Exception("无可用host")
        finally:
            self.lock.release()
        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
        for host in self.socket_client_dict:
            if host not in used_hosts:
                return host, self.socket_client_dict[host]
        raise Exception("无可用host")
    # 分配HOST
    def distribute_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.lock.release()
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.lock.release()
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
@@ -121,5 +118,7 @@
        ipclist.append(f"ipc://l2order{i}.ipc")
    manager = L2DataUploadProtocolManager(ipclist)
    code = "000333"
    manager.distribute_upload_host(code)
    # for i in range(0, 70):
    #     manager.distribute_upload_host(f"{i}" * 6)
    # manager.distribute_upload_host(code)
    manager.upload_data_as_json(code, {"test": "test"})