From 298be0f7e061200e76eeaa8c662b6c116c4f00f4 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 25 八月 2025 15:39:37 +0800 Subject: [PATCH] 委托记录接口修改 --- huaxin_client/code_queue_distribute_manager.py | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 103 insertions(+), 1 deletions(-) diff --git a/huaxin_client/code_queue_distribute_manager.py b/huaxin_client/code_queue_distribute_manager.py index b91cbc6..67d6fae 100644 --- a/huaxin_client/code_queue_distribute_manager.py +++ b/huaxin_client/code_queue_distribute_manager.py @@ -3,6 +3,10 @@ """ import copy +import zmq + +from log_module.log import logger_local_huaxin_l2_error + class CodeQueueDistributeManager: # queue_list @@ -58,6 +62,7 @@ # 鑾峰彇鍙敤鐨勯槦鍒� def get_available_callback(self): + # 宸茬粡鍒嗛厤鐨勫洖璋僆D distibuted_callbacks_ids = set() for code in self.distibuted_code_callback_dict: distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) @@ -67,11 +72,23 @@ return None # 涓轰唬鐮佸垎閰嶉槦鍒� - def distribute_callback(self, code): + def distribute_callback(self, code, target_codes=None): if code in self.distibuted_code_callback_dict: return self.distibuted_code_callback_dict.get(code) callback_info = self.get_available_callback() if not callback_info: + distibuted_callbacks_ids = set() + need_release_codes = set() + for code in self.distibuted_code_callback_dict: + distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) + # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎 + if target_codes and code not in target_codes: + need_release_codes.add(code) + for c in need_release_codes: + self.release_distribute_callback(c) + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_callback_dict.keys()}") + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}") + # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤 raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄") self.distibuted_code_callback_dict[code] = callback_info return callback_info @@ -98,3 +115,88 @@ """ codes = self.distibuted_code_callback_dict.keys() return copy.deepcopy(codes) + + +# 鏁版嵁閫氶亾鍒嗛厤 +class CodeDataChannelDistributeManager: + + def __init__(self, channel_list: list): + """ + # [(閫愮瑪濮旀墭鍏变韩鍐呭瓨, 閫愮瑪鎴愪氦鍏变韩鍐呭瓨, 閫氱煡鍦板潃)] + @param channel_list:[((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))] + """ + flist = [] + for channel_data in channel_list: + flist.append((id(channel_data), channel_data)) + self.channel_list = flist + self.distibuted_code_channel_dict = {} + self.zmq_host_socket_dict = {} + self.zmq_context = zmq.Context() + for channel in channel_list: + socket = self.zmq_context.socket(zmq.REQ) + host = channel[0][2] + socket.connect(host) + self.zmq_host_socket_dict[host] = socket + socket = self.zmq_context.socket(zmq.REQ) + host = channel[1][2] + socket.connect(host) + self.zmq_host_socket_dict[host] = socket + + def get_zmq_socket(self, host): + return self.zmq_host_socket_dict.get(host) + + # 鑾峰彇鍙敤鐨勯槦鍒� + def get_available_channel(self): + # 宸茬粡鍒嗛厤鐨勫洖璋僆D + distibuted_channels_ids = set() + for code in self.distibuted_code_channel_dict: + distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0]) + for channel in self.channel_list: + if channel[0] not in distibuted_channels_ids: + return channel + return None + + # 涓轰唬鐮佸垎閰嶉槦鍒� + def distribute_channel(self, code, target_codes=None): + if code in self.distibuted_code_channel_dict: + return self.distibuted_code_channel_dict.get(code) + callback_info = self.get_available_channel() + if not callback_info: + distibuted_callbacks_ids = set() + need_release_codes = set() + for code in self.distibuted_code_channel_dict: + distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0]) + # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎 + if target_codes and code not in target_codes: + need_release_codes.add(code) + for c in need_release_codes: + self.release_distribute_channel(c) + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_channel_dict.keys()}") + logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}") + # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤 + raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄") + self.distibuted_code_channel_dict[code] = callback_info + return callback_info + + # 鑾峰彇浠g爜鍒嗛厤鐨勯槦鍒� + def get_distributed_channel(self, code): + if code in self.distibuted_code_channel_dict: + return self.distibuted_code_channel_dict.get(code)[1] + else: + return None + + def release_distribute_channel(self, code): + if code in self.distibuted_code_channel_dict: + self.distibuted_code_channel_dict.pop(code) + + # 鑾峰彇绌洪棽鐨勪綅缃暟閲� + def get_free_channel_count(self): + return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys()) + + def get_distributed_codes(self): + """ + 鑾峰彇宸茬粡鍒嗛厤鐨勪唬鐮� + @return: + """ + codes = self.distibuted_code_channel_dict.keys() + return copy.deepcopy(codes) -- Gitblit v1.8.0