Administrator
2025-08-18 ae8d76a456b64c1c6c4ebf11b6ec33b7df217b1a
l2/subscript/l2_subscript_manager.py
@@ -5,11 +5,13 @@
import multiprocessing
import random
import threading
import time
import msgpack
import zmq
from huaxin_client import l2_data_transform_protocol
from log_module import async_log_util
from log_module.log import logger_debug
from utils import shared_memery_util
process_manager = None
@@ -66,6 +68,8 @@
                self.__queue_codes[queue_id].add(code)
                self.__code_queue_dict[code] = queue_id
                break
        # 分配订阅信息
        logger_debug.info(f"订阅通道分配:{self.__code_queue_dict}")
        return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
                self.__queue_codes]
@@ -84,6 +88,8 @@
    """
    L2数据监听
    """
    last_log_time = time.time()
    def __init__(self, channel_list):
        """
@@ -123,10 +129,14 @@
        while True:
            data = socket.recv()
            try:
                #接收数据
                # 接收数据
                data = msgpack.unpackb(data)
                shared_memery_id = data["data"]["memery_number"]
                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
                if time.time() - self.last_log_time > 10:
                    async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}")
                    self.last_log_time = time.time()
                if data["type"] == 1:
                    # 委托
                    code, data_list, timestamp = datas[0], datas[1], datas[2]
@@ -142,7 +152,7 @@
if __name__ == "__main__":
    queues = [multiprocessing.Queue() for i in range(7)]
    queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)
    counts = [70, 60, 50, 10]
    for i in range(4):