package com.yeshi.buwan.util.mq.rabbit;
|
|
import com.google.gson.Gson;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.core.MessageProperties;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
import java.nio.charset.StandardCharsets;
|
import java.util.Map;
|
|
/**
|
* @author hxh
|
* @title: RabbitmqUtil
|
* @description: mq消息发送工具
|
* @date 2024/10/12 13:04
|
*/
|
public class RabbitmqSenderUtil {
|
|
// 延迟交换机
|
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
|
// 延迟队列
|
public static final String DELAY_QUEUE_NAME = "delay_queue";
|
|
//常规延迟队列的routingKey
|
public static final String ROUTING_KEY_COMMON_DELAY = "delay";
|
// 消费失败的延迟消息
|
public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay";
|
// 消费失败处理队列
|
public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue";
|
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 发送队列消息
|
* @date 13:06 2024/10/12
|
* @param: rabbitTemplate
|
* @param: queueName
|
* @param: message
|
**/
|
public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) {
|
rabbitTemplate.convertAndSend(queueName, message);
|
}
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 发送带头部的消息
|
* @date 13:54 2024/10/12
|
* @param: rabbitTemplate
|
* @param: queueName
|
* @param: msg
|
* @param: headers
|
**/
|
public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map<String, String> headers) {
|
MessageProperties messageProperties = new MessageProperties();
|
if (headers != null) {
|
for (String key : headers.keySet()) {
|
messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除
|
}
|
}
|
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
|
rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致
|
}
|
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 发送队列延迟消息
|
* @date 13:07 2024/10/12
|
* @param: rabbitTemplate
|
* @param: queueName
|
* @param: msg
|
* @param: delayMS
|
**/
|
public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) {
|
DelayMsgInfo msgInfo = new DelayMsgInfo();
|
msgInfo.setDelayMs(delayMS);
|
msgInfo.setQueueName(queueName);
|
msgInfo.setMsg(msg);
|
MessageProperties messageProperties = new MessageProperties();
|
messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除
|
Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
|
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致
|
}
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 发送交换机消息
|
* @date 13:13 2024/10/12
|
* @param: rabbitTemplate
|
* @param: exchangeName
|
* @param: routingKey
|
* @param: message
|
**/
|
public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) {
|
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
|
}
|
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 发送延迟交换机消息
|
* @date 13:39 2024/10/12
|
* @param: rabbitTemplate
|
* @param: exchangeName
|
* @param: routingKey
|
* @param: msg
|
* @param: delayMs
|
**/
|
public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) {
|
DelayMsgInfo msgInfo = new DelayMsgInfo();
|
msgInfo.setDelayMs(delayMs);
|
msgInfo.setExchangeName(exchangeName);
|
msgInfo.setMsg(msg);
|
msgInfo.setRoutingKey(routingKey);
|
MessageProperties messageProperties = new MessageProperties();
|
messageProperties.setHeader("x-delay", delayMs);
|
Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
|
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);
|
}
|
|
|
/**
|
* @author hxh
|
* @description 添加消息到重新消费队列
|
* @date 14:10 2024/10/12
|
* @param: rabbitTemplate
|
* @param: message
|
* @return boolean 加入重新消费队列
|
**/
|
public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) {
|
Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
|
int retryCount = 0;
|
if (messageHeaders.containsKey("retry_count")) {
|
retryCount = Integer.parseInt(messageHeaders.get("retry_count") + "");
|
}
|
// 重试5次
|
if (retryCount >= 5) {
|
return false;
|
}
|
|
// 发送延迟消息
|
DelayMsgInfo msgInfo = new DelayMsgInfo();
|
msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue());
|
msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8));
|
msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
|
msgInfo.setSendCount(retryCount);
|
MessageProperties messageProperties = new MessageProperties();
|
messageProperties.setHeader("x-delay", 5*60*1000);
|
// messageProperties.setHeader("x-delay", 5*1000);// 测试为5s
|
Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
|
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew);
|
return true;
|
}
|
|
|
}
|