From 298be0f7e061200e76eeaa8c662b6c116c4f00f4 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 25 八月 2025 15:39:37 +0800
Subject: [PATCH] 委托记录接口修改

---
 huaxin_client/code_queue_distribute_manager.py |  104 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 103 insertions(+), 1 deletions(-)

diff --git a/huaxin_client/code_queue_distribute_manager.py b/huaxin_client/code_queue_distribute_manager.py
index b91cbc6..67d6fae 100644
--- a/huaxin_client/code_queue_distribute_manager.py
+++ b/huaxin_client/code_queue_distribute_manager.py
@@ -3,6 +3,10 @@
 """
 import copy
 
+import zmq
+
+from log_module.log import logger_local_huaxin_l2_error
+
 
 class CodeQueueDistributeManager:
     # queue_list
@@ -58,6 +62,7 @@
 
     # 鑾峰彇鍙敤鐨勯槦鍒�
     def get_available_callback(self):
+        # 宸茬粡鍒嗛厤鐨勫洖璋僆D
         distibuted_callbacks_ids = set()
         for code in self.distibuted_code_callback_dict:
             distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
@@ -67,11 +72,23 @@
         return None
 
     # 涓轰唬鐮佸垎閰嶉槦鍒�
-    def distribute_callback(self, code):
+    def distribute_callback(self, code, target_codes=None):
         if code in self.distibuted_code_callback_dict:
             return self.distibuted_code_callback_dict.get(code)
         callback_info = self.get_available_callback()
         if not callback_info:
+            distibuted_callbacks_ids = set()
+            need_release_codes = set()
+            for code in self.distibuted_code_callback_dict:
+                distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
+                # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎
+                if target_codes and code not in target_codes:
+                    need_release_codes.add(code)
+            for c in need_release_codes:
+                self.release_distribute_callback(c)
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_callback_dict.keys()}")
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}")
+            # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤
             raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄")
         self.distibuted_code_callback_dict[code] = callback_info
         return callback_info
@@ -98,3 +115,88 @@
         """
         codes = self.distibuted_code_callback_dict.keys()
         return copy.deepcopy(codes)
+
+
+# 鏁版嵁閫氶亾鍒嗛厤
+class CodeDataChannelDistributeManager:
+
+    def __init__(self, channel_list: list):
+        """
+        # [(閫愮瑪濮旀墭鍏变韩鍐呭瓨, 閫愮瑪鎴愪氦鍏变韩鍐呭瓨, 閫氱煡鍦板潃)]
+        @param channel_list:[((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))]
+        """
+        flist = []
+        for channel_data in channel_list:
+            flist.append((id(channel_data), channel_data))
+        self.channel_list = flist
+        self.distibuted_code_channel_dict = {}
+        self.zmq_host_socket_dict = {}
+        self.zmq_context = zmq.Context()
+        for channel in channel_list:
+            socket = self.zmq_context.socket(zmq.REQ)
+            host = channel[0][2]
+            socket.connect(host)
+            self.zmq_host_socket_dict[host] = socket
+            socket = self.zmq_context.socket(zmq.REQ)
+            host = channel[1][2]
+            socket.connect(host)
+            self.zmq_host_socket_dict[host] = socket
+
+    def get_zmq_socket(self, host):
+        return self.zmq_host_socket_dict.get(host)
+
+    # 鑾峰彇鍙敤鐨勯槦鍒�
+    def get_available_channel(self):
+        # 宸茬粡鍒嗛厤鐨勫洖璋僆D
+        distibuted_channels_ids = set()
+        for code in self.distibuted_code_channel_dict:
+            distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
+        for channel in self.channel_list:
+            if channel[0] not in distibuted_channels_ids:
+                return channel
+        return None
+
+    # 涓轰唬鐮佸垎閰嶉槦鍒�
+    def distribute_channel(self, code, target_codes=None):
+        if code in self.distibuted_code_channel_dict:
+            return self.distibuted_code_channel_dict.get(code)
+        callback_info = self.get_available_channel()
+        if not callback_info:
+            distibuted_callbacks_ids = set()
+            need_release_codes = set()
+            for code in self.distibuted_code_channel_dict:
+                distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
+                # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎
+                if target_codes and code not in target_codes:
+                    need_release_codes.add(code)
+            for c in need_release_codes:
+                self.release_distribute_channel(c)
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_channel_dict.keys()}")
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}")
+            # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤
+            raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄")
+        self.distibuted_code_channel_dict[code] = callback_info
+        return callback_info
+
+    # 鑾峰彇浠g爜鍒嗛厤鐨勯槦鍒�
+    def get_distributed_channel(self, code):
+        if code in self.distibuted_code_channel_dict:
+            return self.distibuted_code_channel_dict.get(code)[1]
+        else:
+            return None
+
+    def release_distribute_channel(self, code):
+        if code in self.distibuted_code_channel_dict:
+            self.distibuted_code_channel_dict.pop(code)
+
+    # 鑾峰彇绌洪棽鐨勪綅缃暟閲�
+    def get_free_channel_count(self):
+        return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
+
+    def get_distributed_codes(self):
+        """
+        鑾峰彇宸茬粡鍒嗛厤鐨勪唬鐮�
+        @return:
+        """
+        codes = self.distibuted_code_channel_dict.keys()
+        return copy.deepcopy(codes)

--
Gitblit v1.8.0