admin
2024-10-17 b30fb8afd3cd6228bda9b182dc412bb3c8daf69c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package com.yeshi.buwan.util.mq.rabbit;
 
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
import java.nio.charset.StandardCharsets;
import java.util.Map;
 
/**
 * @author hxh
 * @title: RabbitmqUtil
 * @description: mq消息发送工具
 * @date 2024/10/12 13:04
 */
public class RabbitmqSenderUtil {
 
    // 延迟交换机
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
    // 延迟队列
    public static final String DELAY_QUEUE_NAME = "delay_queue";
 
    //常规延迟队列的routingKey
    public static final String ROUTING_KEY_COMMON_DELAY = "delay";
    // 消费失败的延迟消息
    public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay";
    // 消费失败处理队列
    public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue";
 
 
    /**
     * @return void
     * @author hxh
     * @description 发送队列消息
     * @date 13:06 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: message
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) {
        rabbitTemplate.convertAndSend(queueName, message);
    }
 
    /**
     * @return void
     * @author hxh
     * @description 发送带头部的消息
     * @date 13:54 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: headers
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map<String, String> headers) {
        MessageProperties messageProperties = new MessageProperties();
        if (headers != null) {
            for (String key : headers.keySet()) {
                messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除
            }
        }
        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致
    }
 
 
    /**
     * @return void
     * @author hxh
     * @description 发送队列延迟消息
     * @date 13:07 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: delayMS
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMS);
        msgInfo.setQueueName(queueName);
        msgInfo.setMsg(msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致
    }
 
    /**
     * @return void
     * @author hxh
     * @description 发送交换机消息
     * @date 13:13 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: message
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
 
 
    /**
     * @return void
     * @author hxh
     * @description 发送延迟交换机消息
     * @date 13:39 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: msg
     * @param: delayMs
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMs);
        msgInfo.setExchangeName(exchangeName);
        msgInfo.setMsg(msg);
        msgInfo.setRoutingKey(routingKey);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMs);
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);
    }
 
 
    /**
     * @author hxh 
     * @description 添加消息到重新消费队列
     * @date 14:10 2024/10/12
     * @param: rabbitTemplate
     * @param: message
     * @return boolean 加入重新消费队列
     **/
    public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) {
        Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
        int retryCount = 0;
        if (messageHeaders.containsKey("retry_count")) {
            retryCount = Integer.parseInt(messageHeaders.get("retry_count") + "");
        }
        // 重试5次
        if (retryCount >= 5) {
            return false;
        }
 
        // 发送延迟消息
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue());
        msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8));
        msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
        msgInfo.setSendCount(retryCount);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", 5*60*1000);
//        messageProperties.setHeader("x-delay", 5*1000);// 测试为5s
        Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew);
        return true;
    }
 
 
}