package com.ks.push.manager.rabbitmq; import com.google.gson.Gson; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import java.nio.charset.StandardCharsets; import java.util.Map; /** * @author hxh * @title: RabbitmqUtil * @description: mq消息发送工具 * @date 2024/10/12 13:04 */ public class RabbitmqSenderUtil { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay_exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME = "delay_queue"; //常规延迟队列的routingKey public static final String ROUTING_KEY_COMMON_DELAY = "delay"; // 消费失败的延迟消息 public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay"; // 消费失败处理队列 public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue"; /** * @return void * @author hxh * @description 发送队列消息 * @date 13:06 2024/10/12 * @param: rabbitTemplate * @param: queueName * @param: message **/ public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) { rabbitTemplate.convertAndSend(queueName, message); } /** * @return void * @author hxh * @description 发送带头部的消息 * @date 13:54 2024/10/12 * @param: rabbitTemplate * @param: queueName * @param: msg * @param: headers **/ public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map headers) { MessageProperties messageProperties = new MessageProperties(); if (headers != null) { for (String key : headers.keySet()) { messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除 } } Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致 } /** * @return void * @author hxh * @description 发送队列延迟消息 * @date 13:07 2024/10/12 * @param: rabbitTemplate * @param: queueName * @param: msg * @param: delayMS **/ public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) { DelayMsgInfo msgInfo = new DelayMsgInfo(); msgInfo.setDelayMs(delayMS); msgInfo.setQueueName(queueName); msgInfo.setMsg(msg); MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除 Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致 } /** * @return void * @author hxh * @description 发送交换机消息 * @date 13:13 2024/10/12 * @param: rabbitTemplate * @param: exchangeName * @param: routingKey * @param: message **/ public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) { rabbitTemplate.convertAndSend(exchangeName, routingKey, message); } /** * @return void * @author hxh * @description 发送延迟交换机消息 * @date 13:39 2024/10/12 * @param: rabbitTemplate * @param: exchangeName * @param: routingKey * @param: msg * @param: delayMs **/ public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) { DelayMsgInfo msgInfo = new DelayMsgInfo(); msgInfo.setDelayMs(delayMs); msgInfo.setExchangeName(exchangeName); msgInfo.setMsg(msg); msgInfo.setRoutingKey(routingKey); MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay", delayMs); Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message); } /** * @author hxh * @description 添加消息到重新消费队列 * @date 14:10 2024/10/12 * @param: rabbitTemplate * @param: message * @return boolean 加入重新消费队列 **/ public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) { Map messageHeaders = message.getMessageProperties().getHeaders(); int retryCount = 0; if (messageHeaders.containsKey("retry_count")) { retryCount = Integer.parseInt(messageHeaders.get("retry_count") + ""); } // 重试5次 if (retryCount >= 5) { return false; } // 发送延迟消息 DelayMsgInfo msgInfo = new DelayMsgInfo(); msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue()); msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8)); msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); msgInfo.setSendCount(retryCount); MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay", 5*60*1000); // messageProperties.setHeader("x-delay", 5*1000);// 测试为5s Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew); return true; } }