package com.ks.push.manager.rabbitmq;
|
|
import com.ks.push.manager.rabbitmq.consumer.PushMessageListener;
|
import org.springframework.amqp.core.Binding;
|
import org.springframework.amqp.core.BindingBuilder;
|
import org.springframework.amqp.core.CustomExchange;
|
import org.springframework.amqp.core.Queue;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.lang.Nullable;
|
|
import java.util.HashMap;
|
import java.util.Map;
|
|
/**
|
* @author hxh
|
* @title: RabbitmqConfig
|
* @description: Rabbitmq配置
|
* @date 2024/10/16 11:27
|
*/
|
@Configuration
|
public class RabbitmqConfig {
|
|
|
private Queue createQueue(String queueName) {
|
Map<String, Object> arguments = new HashMap<>();
|
// 设置消息的有效时间
|
arguments.put("x-message-ttl", 1000 * 60 * 60 * 24L);
|
return new Queue(queueName, true, false, false, arguments);
|
}
|
|
@Bean
|
public Queue tokenInvalidQueue() {
|
return createQueue("bpush-token-invalid");
|
}
|
|
|
|
@Bean
|
public Queue xmPushQueue() {
|
return createQueue("bpush-xm");
|
}
|
|
@Bean
|
public Queue vivoPushQueue() {
|
return createQueue("bpush-vivo");
|
}
|
|
@Bean
|
public Queue oppoPushQueue() {
|
return createQueue("bpush-oppo");
|
}
|
|
@Bean
|
public Queue mzPushQueue() {
|
return createQueue("bpush-mz");
|
}
|
|
@Bean
|
public Queue ipushPushQueue() {
|
return createQueue("bpush-jpush");
|
}
|
|
@Bean
|
public Queue huaweiPushQueue() {
|
return createQueue("bpush-huawei");
|
}
|
|
@Bean
|
public Queue testQueue() {
|
return createQueue("test");
|
}
|
|
@Bean("pluginDelayExchange")
|
public CustomExchange pluginDelayExchange() {
|
Map<String, Object> argMap = new HashMap<>();
|
argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
|
//第二个参数必须为x-delayed-message
|
return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap);
|
}
|
|
|
@Bean("pluginDelayQueue")
|
public Queue pluginDelayQueue() {
|
Map<String, Object> headers = new HashMap<>();
|
headers.put("x-message-ttl", 24 * 60 * 60 * 1000);
|
return new Queue(RabbitmqSenderUtil.DELAY_QUEUE_NAME, true, false, false, headers);
|
}
|
|
@Bean("pluginDelayConsumeFailQueue")
|
public Queue pluginDelayConsumeFailQueue() {
|
Map<String, Object> headers = new HashMap<>();
|
headers.put("x-message-ttl", 24 * 60 * 60 * 1000);
|
return new Queue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, true, false, false, headers);
|
}
|
|
@Bean
|
public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
|
return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs();
|
}
|
|
@Bean
|
public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
|
return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs();
|
}
|
|
// 多创建几个消费者
|
@Autowired
|
private PushMessageListener pushMessageListener1;
|
@Autowired
|
private PushMessageListener pushMessageListener2;
|
}
|