From 4a2ac13b0ac30601fbd0fcf63ac533011d17dfb7 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 07 三月 2025 18:24:38 +0800 Subject: [PATCH] bug修复/新版L2订阅初步改造 --- huaxin_client/code_queue_distribute_manager.py | 87 +++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 87 insertions(+), 0 deletions(-) diff --git a/huaxin_client/code_queue_distribute_manager.py b/huaxin_client/code_queue_distribute_manager.py index 782cdbf..67d6fae 100644 --- a/huaxin_client/code_queue_distribute_manager.py +++ b/huaxin_client/code_queue_distribute_manager.py @@ -3,6 +3,8 @@ """ import copy +import zmq + from log_module.log import logger_local_huaxin_l2_error @@ -113,3 +115,88 @@ """ codes = self.distibuted_code_callback_dict.keys() return copy.deepcopy(codes) + + +# 鏁版嵁閫氶亾鍒嗛厤 +class CodeDataChannelDistributeManager: + + def __init__(self, channel_list: list): + """ + # [(閫愮瑪濮旀墭鍏变韩鍐呭瓨, 閫愮瑪鎴愪氦鍏变韩鍐呭瓨, 閫氱煡鍦板潃)] + @param channel_list:[((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))] + """ + flist = [] + for channel_data in channel_list: + flist.append((id(channel_data), channel_data)) + self.channel_list = flist + self.distibuted_code_channel_dict = {} + self.zmq_host_socket_dict = {} + self.zmq_context = zmq.Context() + for channel in channel_list: + socket = self.zmq_context.socket(zmq.REQ) + host = channel[0][2] + socket.connect(host) + self.zmq_host_socket_dict[host] = socket + socket = self.zmq_context.socket(zmq.REQ) + host = channel[1][2] + socket.connect(host) + self.zmq_host_socket_dict[host] = socket + + def get_zmq_socket(self, host): + return self.zmq_host_socket_dict.get(host) + + # 鑾峰彇鍙敤鐨勯槦鍒� + def get_available_channel(self): + # 宸茬粡鍒嗛厤鐨勫洖璋僆D + distibuted_channels_ids = set() + for code in self.distibuted_code_channel_dict: + distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0]) + for channel in self.channel_list: + if channel[0] not in distibuted_channels_ids: + return channel + return None + + # 涓轰唬鐮佸垎閰嶉槦鍒� + def distribute_channel(self, code, target_codes=None): + if code in self.distibuted_code_channel_dict: + return self.distibuted_code_channel_dict.get(code) + callback_info = self.get_available_channel() + if not callback_info: + distibuted_callbacks_ids = set() + need_release_codes = set() + for code in self.distibuted_code_channel_dict: + distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0]) + # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎 + if target_codes and code not in target_codes: + need_release_codes.add(code) + for c in need_release_codes: + self.release_distribute_channel(c) + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_channel_dict.keys()}") + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}") + # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤 + raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄") + self.distibuted_code_channel_dict[code] = callback_info + return callback_info + + # 鑾峰彇浠g爜鍒嗛厤鐨勯槦鍒� + def get_distributed_channel(self, code): + if code in self.distibuted_code_channel_dict: + return self.distibuted_code_channel_dict.get(code)[1] + else: + return None + + def release_distribute_channel(self, code): + if code in self.distibuted_code_channel_dict: + self.distibuted_code_channel_dict.pop(code) + + # 鑾峰彇绌洪棽鐨勪綅缃暟閲� + def get_free_channel_count(self): + return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys()) + + def get_distributed_codes(self): + """ + 鑾峰彇宸茬粡鍒嗛厤鐨勪唬鐮� + @return: + """ + codes = self.distibuted_code_channel_dict.keys() + return copy.deepcopy(codes) -- Gitblit v1.8.0