""" L2订阅管理 """ import math import multiprocessing import random class TargetCodeProcessManager: """ 目标代码的进程管理 """ def __init__(self, com_queues: list, max_code_count_per_queue_list): """ 初始化 @param com_queues:list 通信队列 @param max_code_count_per_queue: 每个队列最大的代码数量 """ self.__com_queues = com_queues self.__max_code_count_per_queue_dict = {id(com_queues[i]): max_code_count_per_queue_list[i] for i in range(len(com_queues))} # 队列ID与队列对象的映射 self.__com_queue_id_object_dict = {id(q): q for q in com_queues} # 队列ID对应的代码,格式:{"队列ID":{"代码1","代码2"}} self.__queue_codes = {} for q in com_queues: self.__queue_codes[id(q)] = set() # 代码所在队列ID self.__code_queue_dict = {} def add_codes(self, codes: set): add_codes = codes - self.__code_queue_dict.keys() del_codes = self.__code_queue_dict.keys() - codes # 删除代码 if del_codes: for code in del_codes: if code in self.__code_queue_dict: queue_id = self.__code_queue_dict[code] self.__queue_codes[queue_id].discard(code) self.__code_queue_dict.pop(code) # 为新增代码分配队列 for code in add_codes: # 寻找未满的队列 for queue_id in self.__queue_codes: count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues))) if len(self.__queue_codes[queue_id]) >= count_per_process: # 队列已满 continue # 队列未满,分配代码 self.__queue_codes[queue_id].add(code) self.__code_queue_dict[code] = queue_id break def get_queues_with_codes(self): """ 获取队列分配的代码 @return: [(队列对象,{代码集合})] """ results = [] for queue_id in self.__queue_codes: results.append((self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes.get(queue_id))) return results if __name__ == "__main__": queues = [multiprocessing.Queue() for i in range(7)] manager = TargetCodeProcessManager(queues, 10) counts = [70, 60, 50, 10] for i in range(4): codes = set() for i in range(counts[i]): code = random.randint(1, 1000000) code = str(code).zfill(6) codes.add(code) print(codes) manager.add_codes(codes) results = manager.get_queues_with_codes() fcodes = set() for r in results: fcodes |= r[1] if codes - fcodes or fcodes - codes: print("订阅出错") print(results)