| | |
| | | """ |
| | | import copy |
| | | |
| | | import zmq |
| | | |
| | | from log_module.log import logger_local_huaxin_l2_error |
| | | |
| | | |
| | |
| | | callback_info = self.get_available_callback() |
| | | if not callback_info: |
| | | distibuted_callbacks_ids = set() |
| | | need_release_codes = set() |
| | | for code in self.distibuted_code_callback_dict: |
| | | distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) |
| | | # 如果代码没在目标代码中就移除 |
| | | if target_codes and code not in target_codes: |
| | | self.release_distribute_callback(code) |
| | | need_release_codes.add(code) |
| | | for c in need_release_codes: |
| | | self.release_distribute_callback(c) |
| | | logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}") |
| | | logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}") |
| | | # 删除已经没在目标代码中的分配 |
| | |
| | | """ |
| | | codes = self.distibuted_code_callback_dict.keys() |
| | | return copy.deepcopy(codes) |
| | | |
| | | |
| | | # 数据通道分配 |
| | | class CodeDataChannelDistributeManager: |
| | | |
| | | def __init__(self, channel_list: list): |
| | | """ |
| | | # [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)] |
| | | @param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))] |
| | | """ |
| | | flist = [] |
| | | for channel_data in channel_list: |
| | | flist.append((id(channel_data), channel_data)) |
| | | self.channel_list = flist |
| | | self.distibuted_code_channel_dict = {} |
| | | self.zmq_host_socket_dict = {} |
| | | self.zmq_context = zmq.Context() |
| | | for channel in channel_list: |
| | | socket = self.zmq_context.socket(zmq.REQ) |
| | | host = channel[0][2] |
| | | socket.connect(host) |
| | | self.zmq_host_socket_dict[host] = socket |
| | | socket = self.zmq_context.socket(zmq.REQ) |
| | | host = channel[1][2] |
| | | socket.connect(host) |
| | | self.zmq_host_socket_dict[host] = socket |
| | | |
| | | def get_zmq_socket(self, host): |
| | | return self.zmq_host_socket_dict.get(host) |
| | | |
| | | # 获取可用的队列 |
| | | def get_available_channel(self): |
| | | # 已经分配的回调ID |
| | | distibuted_channels_ids = set() |
| | | for code in self.distibuted_code_channel_dict: |
| | | distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0]) |
| | | for channel in self.channel_list: |
| | | if channel[0] not in distibuted_channels_ids: |
| | | return channel |
| | | return None |
| | | |
| | | # 为代码分配队列 |
| | | def distribute_channel(self, code, target_codes=None): |
| | | if code in self.distibuted_code_channel_dict: |
| | | return self.distibuted_code_channel_dict.get(code) |
| | | callback_info = self.get_available_channel() |
| | | if not callback_info: |
| | | distibuted_callbacks_ids = set() |
| | | need_release_codes = set() |
| | | for code in self.distibuted_code_channel_dict: |
| | | distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0]) |
| | | # 如果代码没在目标代码中就移除 |
| | | if target_codes and code not in target_codes: |
| | | need_release_codes.add(code) |
| | | for c in need_release_codes: |
| | | self.release_distribute_channel(c) |
| | | logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}") |
| | | logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}") |
| | | # 删除已经没在目标代码中的分配 |
| | | raise Exception("无可用的回调对象") |
| | | self.distibuted_code_channel_dict[code] = callback_info |
| | | return callback_info |
| | | |
| | | # 获取代码分配的队列 |
| | | def get_distributed_channel(self, code): |
| | | if code in self.distibuted_code_channel_dict: |
| | | return self.distibuted_code_channel_dict.get(code)[1] |
| | | else: |
| | | return None |
| | | |
| | | def release_distribute_channel(self, code): |
| | | if code in self.distibuted_code_channel_dict: |
| | | self.distibuted_code_channel_dict.pop(code) |
| | | |
| | | # 获取空闲的位置数量 |
| | | def get_free_channel_count(self): |
| | | return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys()) |
| | | |
| | | def get_distributed_codes(self): |
| | | """ |
| | | 获取已经分配的代码 |
| | | @return: |
| | | """ |
| | | codes = self.distibuted_code_channel_dict.keys() |
| | | return copy.deepcopy(codes) |