Administrator
75 分钟以前 2f2516749615da866e96d8d24e499b7ecbb63a3e
huaxin_client/code_queue_distribute_manager.py
@@ -3,6 +3,8 @@
"""
import copy
import zmq
from log_module.log import logger_local_huaxin_l2_error
@@ -113,3 +115,88 @@
        """
        codes = self.distibuted_code_callback_dict.keys()
        return copy.deepcopy(codes)
# 数据通道分配
class CodeDataChannelDistributeManager:
    def __init__(self, channel_list: list):
        """
        # [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)]
        @param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
        """
        flist = []
        for channel_data in channel_list:
            flist.append((id(channel_data), channel_data))
        self.channel_list = flist
        self.distibuted_code_channel_dict = {}
        self.zmq_host_socket_dict = {}
        self.zmq_context = zmq.Context()
        for channel in channel_list:
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[0][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[1][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
    def get_zmq_socket(self, host):
        return self.zmq_host_socket_dict.get(host)
    # 获取可用的队列
    def get_available_channel(self):
        # 已经分配的回调ID
        distibuted_channels_ids = set()
        for code in self.distibuted_code_channel_dict:
            distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
        for channel in self.channel_list:
            if channel[0] not in distibuted_channels_ids:
                return channel
        return None
    # 为代码分配队列
    def distribute_channel(self, code, target_codes=None):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)
        callback_info = self.get_available_channel()
        if not callback_info:
            distibuted_callbacks_ids = set()
            need_release_codes = set()
            for code in self.distibuted_code_channel_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    need_release_codes.add(code)
            for c in need_release_codes:
                self.release_distribute_channel(c)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
            raise Exception("无可用的回调对象")
        self.distibuted_code_channel_dict[code] = callback_info
        return callback_info
    # 获取代码分配的队列
    def get_distributed_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)[1]
        else:
            return None
    def release_distribute_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            self.distibuted_code_channel_dict.pop(code)
    # 获取空闲的位置数量
    def get_free_channel_count(self):
        return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
    def get_distributed_codes(self):
        """
        获取已经分配的代码
        @return:
        """
        codes = self.distibuted_code_channel_dict.keys()
        return copy.deepcopy(codes)