| | |
| | | |
| | | |
| | | class MsgQueueManager: |
| | | __redisManager = RedisManager(3) |
| | | |
| | | def __get_redis(self): |
| | | return self.__redisManager.getRedis() |
| | | __queue_dict = {} |
| | | |
| | | # 添加消息,2s内有效 |
| | | def add_msg(self, client_id, msg): |
| | | RedisUtils.lpush( |
| | | self.__get_redis(), f"kp_msg_queue-{client_id}", json.dumps((time.time() + 2, msg))) |
| | | if client_id not in self.__queue_dict: |
| | | self.__queue_dict[client_id] = queue.Queue() |
| | | |
| | | self.__queue_dict[client_id].put_nowait((time.time() + 2, msg)) |
| | | |
| | | # 读取消息 |
| | | def read_msg(self, client_id): |
| | | data = RedisUtils.lpop(self.__get_redis(), f"kp_msg_queue-{client_id}") |
| | | if not data: |
| | | return None |
| | | data = json.loads(data) |
| | | return data |
| | | q = queue.Queue() |
| | | q.get(block=False) |
| | | if client_id not in self.__queue_dict: |
| | | return self.__queue_dict[client_id].get(block=False) |
| | | return None |
| | | |
| | | |
| | | # 运行采集器 |
| | |
| | | msg_data = __MsgQueueManager.read_msg(client_id) |
| | | if not msg_data: |
| | | return None |
| | | expire_time = msg_data[0] |
| | | msg = msg_data[1] |
| | | expire_time = msg_data[1] |
| | | msg = msg_data[2] |
| | | if expire_time < time.time(): |
| | | # 过期 |
| | | return None |