""" 代码队列管理 """ import copy import zmq from log_module.log import logger_local_huaxin_l2_error class CodeQueueDistributeManager: # queue_list def __init__(self, queue_list: list): flist = [] for i in range(0, len(queue_list)): flist.append((i, queue_list[i])) self.queue_list = flist self.distibuted_code_queue_dict = {} # 获取可用的队列 def get_available_queue(self): distibuted_queue_indexes = set() for code in self.distibuted_code_queue_dict: distibuted_queue_indexes.add(self.distibuted_code_queue_dict[code][0]) for q_info in self.queue_list: if q_info[0] not in distibuted_queue_indexes: return q_info return None # 为代码分配队列 def distribute_queue(self, code): if code in self.distibuted_code_queue_dict: return self.distibuted_code_queue_dict.get(code) q_info = self.get_available_queue() if not q_info: raise Exception("无可用的队列") self.distibuted_code_queue_dict[code] = q_info return q_info # 获取代码分配的队列 def get_distributed_queue(self, code): return self.distibuted_code_queue_dict.get(code) def release_distribute_queue(self, code): if code in self.distibuted_code_queue_dict: self.distibuted_code_queue_dict.pop(code) # 获取空闲的位置数量 def get_free_queue_count(self): return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys()) # 回调对象分配 class CodeDataCallbackDistributeManager: # queue_list def __init__(self, callback_list: list): flist = [] for callback in callback_list: flist.append((id(callback), callback)) self.callback_list = flist self.distibuted_code_callback_dict = {} # 获取可用的队列 def get_available_callback(self): # 已经分配的回调ID distibuted_callbacks_ids = set() for code in self.distibuted_code_callback_dict: distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) for callback in self.callback_list: if callback[0] not in distibuted_callbacks_ids: return callback return None # 为代码分配队列 def distribute_callback(self, code, target_codes=None): if code in self.distibuted_code_callback_dict: return self.distibuted_code_callback_dict.get(code) callback_info = self.get_available_callback() if not callback_info: distibuted_callbacks_ids = set() need_release_codes = set() for code in self.distibuted_code_callback_dict: distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) # 如果代码没在目标代码中就移除 if target_codes and code not in target_codes: need_release_codes.add(code) for c in need_release_codes: self.release_distribute_callback(c) logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}") logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}") # 删除已经没在目标代码中的分配 raise Exception("无可用的回调对象") self.distibuted_code_callback_dict[code] = callback_info return callback_info # 获取代码分配的队列 def get_distributed_callback(self, code): if code in self.distibuted_code_callback_dict: return self.distibuted_code_callback_dict.get(code)[1] else: return None def release_distribute_callback(self, code): if code in self.distibuted_code_callback_dict: self.distibuted_code_callback_dict.pop(code) # 获取空闲的位置数量 def get_free_queue_count(self): return len(self.callback_list) - len(self.distibuted_code_callback_dict.keys()) def get_distributed_codes(self): """ 获取已经分配的代码 @return: """ codes = self.distibuted_code_callback_dict.keys() return copy.deepcopy(codes) # 数据通道分配 class CodeDataChannelDistributeManager: def __init__(self, channel_list: list): """ # [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)] @param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))] """ flist = [] for channel_data in channel_list: flist.append((id(channel_data), channel_data)) self.channel_list = flist self.distibuted_code_channel_dict = {} self.zmq_host_socket_dict = {} self.zmq_context = zmq.Context() for channel in channel_list: socket = self.zmq_context.socket(zmq.REQ) host = channel[0][2] socket.connect(host) self.zmq_host_socket_dict[host] = socket socket = self.zmq_context.socket(zmq.REQ) host = channel[1][2] socket.connect(host) self.zmq_host_socket_dict[host] = socket def get_zmq_socket(self, host): return self.zmq_host_socket_dict.get(host) # 获取可用的队列 def get_available_channel(self): # 已经分配的回调ID distibuted_channels_ids = set() for code in self.distibuted_code_channel_dict: distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0]) for channel in self.channel_list: if channel[0] not in distibuted_channels_ids: return channel return None # 为代码分配队列 def distribute_channel(self, code, target_codes=None): if code in self.distibuted_code_channel_dict: return self.distibuted_code_channel_dict.get(code) callback_info = self.get_available_channel() if not callback_info: distibuted_callbacks_ids = set() need_release_codes = set() for code in self.distibuted_code_channel_dict: distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0]) # 如果代码没在目标代码中就移除 if target_codes and code not in target_codes: need_release_codes.add(code) for c in need_release_codes: self.release_distribute_channel(c) logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}") logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}") # 删除已经没在目标代码中的分配 raise Exception("无可用的回调对象") self.distibuted_code_channel_dict[code] = callback_info return callback_info # 获取代码分配的队列 def get_distributed_channel(self, code): if code in self.distibuted_code_channel_dict: return self.distibuted_code_channel_dict.get(code)[1] else: return None def release_distribute_channel(self, code): if code in self.distibuted_code_channel_dict: self.distibuted_code_channel_dict.pop(code) # 获取空闲的位置数量 def get_free_channel_count(self): return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys()) def get_distributed_codes(self): """ 获取已经分配的代码 @return: """ codes = self.distibuted_code_channel_dict.keys() return copy.deepcopy(codes)