| | |
| | | import multiprocessing |
| | | import random |
| | | import threading |
| | | import time |
| | | |
| | | import msgpack |
| | | import zmq |
| | | |
| | | from huaxin_client import l2_data_transform_protocol |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | from utils import shared_memery_util |
| | | |
| | | process_manager = None |
| | |
| | | self.__queue_codes[queue_id].add(code) |
| | | self.__code_queue_dict[code] = queue_id |
| | | break |
| | | # 分配订阅信息 |
| | | logger_debug.info(f"订阅通道分配:{self.__code_queue_dict}") |
| | | return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in |
| | | self.__queue_codes] |
| | | |
| | |
| | | """ |
| | | L2数据监听 |
| | | """ |
| | | |
| | | last_log_time = time.time() |
| | | |
| | | def __init__(self, channel_list): |
| | | """ |
| | |
| | | while True: |
| | | data = socket.recv() |
| | | try: |
| | | #接收数据 |
| | | # 接收数据 |
| | | data = msgpack.unpackb(data) |
| | | shared_memery_id = data["data"]["memery_number"] |
| | | datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) |
| | | |
| | | if time.time() - self.last_log_time > 10: |
| | | async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}") |
| | | self.last_log_time = time.time() |
| | | if data["type"] == 1: |
| | | # 委托 |
| | | code, data_list, timestamp = datas[0], datas[1], datas[2] |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | queues = [multiprocessing.Queue() for i in range(7)] |
| | | queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)] |
| | | manager = TargetCodeProcessManager(queues, 10) |
| | | counts = [70, 60, 50, 10] |
| | | for i in range(4): |