""" L2订阅管理 """ import math import multiprocessing import random import threading import time import msgpack import zmq from huaxin_client import l2_data_transform_protocol from log_module import async_log_util from log_module.log import logger_debug from utils import shared_memery_util process_manager = None class TargetCodeProcessManager: """ 目标代码的进程管理 """ def __init__(self, com_queues: list, max_code_count_per_queue_list): """ 初始化 @param com_queues:list 通信队列 @param max_code_count_per_queue: 每个队列最大的代码数量 """ self.__com_queues = com_queues self.__max_code_count_per_queue_dict = {id(com_queues[i]): max_code_count_per_queue_list[i] for i in range(len(com_queues))} # 队列ID与队列对象的映射 self.__com_queue_id_object_dict = {id(q): q for q in com_queues} # 队列ID对应的代码,格式:{"队列ID":{"代码1","代码2"}} self.__queue_codes = {} for q in com_queues: self.__queue_codes[id(q)] = set() # 代码所在队列ID self.__code_queue_dict = {} def set_codes(self, codes: set): """ 设置订阅代码 @param codes: @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...] """ add_codes = codes - self.__code_queue_dict.keys() del_codes = self.__code_queue_dict.keys() - codes # 删除代码 if del_codes: for code in del_codes: if code in self.__code_queue_dict: queue_id = self.__code_queue_dict[code] self.__queue_codes[queue_id].discard(code) self.__code_queue_dict.pop(code) # 为新增代码分配队列 for code in add_codes: # 寻找未满的队列 for queue_id in self.__queue_codes: count_per_process = self.__max_code_count_per_queue_dict.get(queue_id) if len(self.__queue_codes[queue_id]) >= count_per_process: # 队列已满 continue # 队列未满,分配代码 self.__queue_codes[queue_id].add(code) self.__code_queue_dict[code] = queue_id break # 分配订阅信息 logger_debug.info(f"订阅通道分配:{self.__code_queue_dict}") return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in self.__queue_codes] def get_queues_with_codes(self): """ 获取队列分配的代码 @return: [(队列对象,{代码集合})] """ results = [] for queue_id in self.__queue_codes: results.append((self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes.get(queue_id))) return results class L2DataListener: """ L2数据监听 """ last_log_time = time.time() def __init__(self, channel_list): """ @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))] """ self.channel_list = channel_list # 设置共享内存编号与共享内存数组映射 self.shared_memery_num_object_dict = {} for channel in self.channel_list: self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1] self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1] def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): """ 创建数据监听器 @param @return: """ for channel in self.channel_list: channel_delegate = channel[0] channel_deal = channel[1] threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,), daemon=True).start() threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,), daemon=True).start() def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): """ 创建L2逐笔委托/成交zmq服务 @param ipc_addr: @return: """ context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(ipc_addr) while True: data = socket.recv() try: # 接收数据 data = msgpack.unpackb(data) shared_memery_id = data["data"]["memery_number"] datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) if time.time() - self.last_log_time > 10: async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}") self.last_log_time = time.time() if data["type"] == 1: # 委托 code, data_list, timestamp = datas[0], datas[1], datas[2] l2_data_callback.OnL2Order(code, data_list, timestamp) elif data["type"] == 2: # 成交 code, data_list = datas[0], datas[1] l2_data_callback.OnL2Transaction(code, data_list) except Exception as e: pass finally: socket.send_string("SUCCESS") if __name__ == "__main__": queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)] manager = TargetCodeProcessManager(queues, 10) counts = [70, 60, 50, 10] for i in range(4): codes = set() for i in range(counts[i]): code = random.randint(1, 1000000) code = str(code).zfill(6) codes.add(code) print(codes) manager.add_codes(codes) results = manager.get_queues_with_codes() fcodes = set() for r in results: fcodes |= r[1] if codes - fcodes or fcodes - codes: print("订阅出错") print(results)