"""
|
代码队列管理
|
"""
|
import copy
|
|
import zmq
|
|
from log_module.log import logger_local_huaxin_l2_error
|
|
|
class CodeQueueDistributeManager:
|
# queue_list
|
def __init__(self, queue_list: list):
|
flist = []
|
for i in range(0, len(queue_list)):
|
flist.append((i, queue_list[i]))
|
self.queue_list = flist
|
self.distibuted_code_queue_dict = {}
|
|
# 获取可用的队列
|
def get_available_queue(self):
|
distibuted_queue_indexes = set()
|
for code in self.distibuted_code_queue_dict:
|
distibuted_queue_indexes.add(self.distibuted_code_queue_dict[code][0])
|
for q_info in self.queue_list:
|
if q_info[0] not in distibuted_queue_indexes:
|
return q_info
|
return None
|
|
# 为代码分配队列
|
def distribute_queue(self, code):
|
if code in self.distibuted_code_queue_dict:
|
return self.distibuted_code_queue_dict.get(code)
|
q_info = self.get_available_queue()
|
if not q_info:
|
raise Exception("无可用的队列")
|
self.distibuted_code_queue_dict[code] = q_info
|
return q_info
|
|
# 获取代码分配的队列
|
def get_distributed_queue(self, code):
|
return self.distibuted_code_queue_dict.get(code)
|
|
def release_distribute_queue(self, code):
|
if code in self.distibuted_code_queue_dict:
|
self.distibuted_code_queue_dict.pop(code)
|
|
# 获取空闲的位置数量
|
def get_free_queue_count(self):
|
return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys())
|
|
|
# 回调对象分配
|
class CodeDataCallbackDistributeManager:
|
# queue_list
|
def __init__(self, callback_list: list):
|
flist = []
|
for callback in callback_list:
|
flist.append((id(callback), callback))
|
self.callback_list = flist
|
self.distibuted_code_callback_dict = {}
|
|
# 获取可用的队列
|
def get_available_callback(self):
|
# 已经分配的回调ID
|
distibuted_callbacks_ids = set()
|
for code in self.distibuted_code_callback_dict:
|
distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
|
for callback in self.callback_list:
|
if callback[0] not in distibuted_callbacks_ids:
|
return callback
|
return None
|
|
# 为代码分配队列
|
def distribute_callback(self, code, target_codes=None):
|
if code in self.distibuted_code_callback_dict:
|
return self.distibuted_code_callback_dict.get(code)
|
callback_info = self.get_available_callback()
|
if not callback_info:
|
distibuted_callbacks_ids = set()
|
need_release_codes = set()
|
for code in self.distibuted_code_callback_dict:
|
distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
|
# 如果代码没在目标代码中就移除
|
if target_codes and code not in target_codes:
|
need_release_codes.add(code)
|
for c in need_release_codes:
|
self.release_distribute_callback(c)
|
logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}")
|
logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
|
# 删除已经没在目标代码中的分配
|
raise Exception("无可用的回调对象")
|
self.distibuted_code_callback_dict[code] = callback_info
|
return callback_info
|
|
# 获取代码分配的队列
|
def get_distributed_callback(self, code):
|
if code in self.distibuted_code_callback_dict:
|
return self.distibuted_code_callback_dict.get(code)[1]
|
else:
|
return None
|
|
def release_distribute_callback(self, code):
|
if code in self.distibuted_code_callback_dict:
|
self.distibuted_code_callback_dict.pop(code)
|
|
# 获取空闲的位置数量
|
def get_free_queue_count(self):
|
return len(self.callback_list) - len(self.distibuted_code_callback_dict.keys())
|
|
def get_distributed_codes(self):
|
"""
|
获取已经分配的代码
|
@return:
|
"""
|
codes = self.distibuted_code_callback_dict.keys()
|
return copy.deepcopy(codes)
|
|
|
# 数据通道分配
|
class CodeDataChannelDistributeManager:
|
|
def __init__(self, channel_list: list):
|
"""
|
# [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)]
|
@param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
|
"""
|
flist = []
|
for channel_data in channel_list:
|
flist.append((id(channel_data), channel_data))
|
self.channel_list = flist
|
self.distibuted_code_channel_dict = {}
|
self.zmq_host_socket_dict = {}
|
self.zmq_context = zmq.Context()
|
for channel in channel_list:
|
socket = self.zmq_context.socket(zmq.REQ)
|
host = channel[0][2]
|
socket.connect(host)
|
self.zmq_host_socket_dict[host] = socket
|
socket = self.zmq_context.socket(zmq.REQ)
|
host = channel[1][2]
|
socket.connect(host)
|
self.zmq_host_socket_dict[host] = socket
|
|
def get_zmq_socket(self, host):
|
return self.zmq_host_socket_dict.get(host)
|
|
# 获取可用的队列
|
def get_available_channel(self):
|
# 已经分配的回调ID
|
distibuted_channels_ids = set()
|
for code in self.distibuted_code_channel_dict:
|
distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
|
for channel in self.channel_list:
|
if channel[0] not in distibuted_channels_ids:
|
return channel
|
return None
|
|
# 为代码分配队列
|
def distribute_channel(self, code, target_codes=None):
|
if code in self.distibuted_code_channel_dict:
|
return self.distibuted_code_channel_dict.get(code)
|
callback_info = self.get_available_channel()
|
if not callback_info:
|
distibuted_callbacks_ids = set()
|
need_release_codes = set()
|
for code in self.distibuted_code_channel_dict:
|
distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
|
# 如果代码没在目标代码中就移除
|
if target_codes and code not in target_codes:
|
need_release_codes.add(code)
|
for c in need_release_codes:
|
self.release_distribute_channel(c)
|
logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}")
|
logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
|
# 删除已经没在目标代码中的分配
|
raise Exception("无可用的回调对象")
|
self.distibuted_code_channel_dict[code] = callback_info
|
return callback_info
|
|
# 获取代码分配的队列
|
def get_distributed_channel(self, code):
|
if code in self.distibuted_code_channel_dict:
|
return self.distibuted_code_channel_dict.get(code)[1]
|
else:
|
return None
|
|
def release_distribute_channel(self, code):
|
if code in self.distibuted_code_channel_dict:
|
self.distibuted_code_channel_dict.pop(code)
|
|
# 获取空闲的位置数量
|
def get_free_channel_count(self):
|
return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
|
|
def get_distributed_codes(self):
|
"""
|
获取已经分配的代码
|
@return:
|
"""
|
codes = self.distibuted_code_channel_dict.keys()
|
return copy.deepcopy(codes)
|