From 4a2ac13b0ac30601fbd0fcf63ac533011d17dfb7 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 07 三月 2025 18:24:38 +0800
Subject: [PATCH] bug修复/新版L2订阅初步改造

---
 huaxin_client/code_queue_distribute_manager.py |   87 +++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 87 insertions(+), 0 deletions(-)

diff --git a/huaxin_client/code_queue_distribute_manager.py b/huaxin_client/code_queue_distribute_manager.py
index 782cdbf..67d6fae 100644
--- a/huaxin_client/code_queue_distribute_manager.py
+++ b/huaxin_client/code_queue_distribute_manager.py
@@ -3,6 +3,8 @@
 """
 import copy
 
+import zmq
+
 from log_module.log import logger_local_huaxin_l2_error
 
 
@@ -113,3 +115,88 @@
         """
         codes = self.distibuted_code_callback_dict.keys()
         return copy.deepcopy(codes)
+
+
+# 鏁版嵁閫氶亾鍒嗛厤
+class CodeDataChannelDistributeManager:
+
+    def __init__(self, channel_list: list):
+        """
+        # [(閫愮瑪濮旀墭鍏变韩鍐呭瓨, 閫愮瑪鎴愪氦鍏变韩鍐呭瓨, 閫氱煡鍦板潃)]
+        @param channel_list:[((缂栧彿,multiprocessing.Array, zmq_address),(缂栧彿, multiprocessing.Array, zmq_address))]
+        """
+        flist = []
+        for channel_data in channel_list:
+            flist.append((id(channel_data), channel_data))
+        self.channel_list = flist
+        self.distibuted_code_channel_dict = {}
+        self.zmq_host_socket_dict = {}
+        self.zmq_context = zmq.Context()
+        for channel in channel_list:
+            socket = self.zmq_context.socket(zmq.REQ)
+            host = channel[0][2]
+            socket.connect(host)
+            self.zmq_host_socket_dict[host] = socket
+            socket = self.zmq_context.socket(zmq.REQ)
+            host = channel[1][2]
+            socket.connect(host)
+            self.zmq_host_socket_dict[host] = socket
+
+    def get_zmq_socket(self, host):
+        return self.zmq_host_socket_dict.get(host)
+
+    # 鑾峰彇鍙敤鐨勯槦鍒�
+    def get_available_channel(self):
+        # 宸茬粡鍒嗛厤鐨勫洖璋僆D
+        distibuted_channels_ids = set()
+        for code in self.distibuted_code_channel_dict:
+            distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
+        for channel in self.channel_list:
+            if channel[0] not in distibuted_channels_ids:
+                return channel
+        return None
+
+    # 涓轰唬鐮佸垎閰嶉槦鍒�
+    def distribute_channel(self, code, target_codes=None):
+        if code in self.distibuted_code_channel_dict:
+            return self.distibuted_code_channel_dict.get(code)
+        callback_info = self.get_available_channel()
+        if not callback_info:
+            distibuted_callbacks_ids = set()
+            need_release_codes = set()
+            for code in self.distibuted_code_channel_dict:
+                distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
+                # 濡傛灉浠g爜娌″湪鐩爣浠g爜涓氨绉婚櫎
+                if target_codes and code not in target_codes:
+                    need_release_codes.add(code)
+            for c in need_release_codes:
+                self.release_distribute_channel(c)
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨勪唬鐮侊細{self.distibuted_code_channel_dict.keys()}")
+            logger_local_huaxin_l2_error.info(f"宸茬粡鍒嗛厤鐨刢allbackid锛歿distibuted_callbacks_ids}")
+            # 鍒犻櫎宸茬粡娌″湪鐩爣浠g爜涓殑鍒嗛厤
+            raise Exception("鏃犲彲鐢ㄧ殑鍥炶皟瀵硅薄")
+        self.distibuted_code_channel_dict[code] = callback_info
+        return callback_info
+
+    # 鑾峰彇浠g爜鍒嗛厤鐨勯槦鍒�
+    def get_distributed_channel(self, code):
+        if code in self.distibuted_code_channel_dict:
+            return self.distibuted_code_channel_dict.get(code)[1]
+        else:
+            return None
+
+    def release_distribute_channel(self, code):
+        if code in self.distibuted_code_channel_dict:
+            self.distibuted_code_channel_dict.pop(code)
+
+    # 鑾峰彇绌洪棽鐨勪綅缃暟閲�
+    def get_free_channel_count(self):
+        return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
+
+    def get_distributed_codes(self):
+        """
+        鑾峰彇宸茬粡鍒嗛厤鐨勪唬鐮�
+        @return:
+        """
+        codes = self.distibuted_code_channel_dict.keys()
+        return copy.deepcopy(codes)

--
Gitblit v1.8.0