package com.ks.push.manager.rabbitmq; import com.ks.push.manager.rabbitmq.consumer.PushMessageListener; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; import java.util.HashMap; import java.util.Map; /** * @author hxh * @title: RabbitmqConfig * @description: Rabbitmq配置 * @date 2024/10/16 11:27 */ @Configuration public class RabbitmqConfig { private Queue createQueue(String queueName) { Map arguments = new HashMap<>(); // 设置消息的有效时间 arguments.put("x-message-ttl", 1000 * 60 * 60 * 24L); return new Queue(queueName, true, false, false, arguments); } @Bean public Queue tokenInvalidQueue() { return createQueue("bpush-token-invalid"); } @Bean public Queue xmPushQueue() { return createQueue("bpush-xm"); } @Bean public Queue vivoPushQueue() { return createQueue("bpush-vivo"); } @Bean public Queue oppoPushQueue() { return createQueue("bpush-oppo"); } @Bean public Queue mzPushQueue() { return createQueue("bpush-mz"); } @Bean public Queue ipushPushQueue() { return createQueue("bpush-jpush"); } @Bean public Queue huaweiPushQueue() { return createQueue("bpush-huawei"); } @Bean public Queue testQueue() { return createQueue("test"); } @Bean("pluginDelayExchange") public CustomExchange pluginDelayExchange() { Map argMap = new HashMap<>(); argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout //第二个参数必须为x-delayed-message return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap); } @Bean("pluginDelayQueue") public Queue pluginDelayQueue() { Map headers = new HashMap<>(); headers.put("x-message-ttl", 24 * 60 * 60 * 1000); return new Queue(RabbitmqSenderUtil.DELAY_QUEUE_NAME, true, false, false, headers); } @Bean("pluginDelayConsumeFailQueue") public Queue pluginDelayConsumeFailQueue() { Map headers = new HashMap<>(); headers.put("x-message-ttl", 24 * 60 * 60 * 1000); return new Queue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, true, false, false, headers); } @Bean public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs(); } @Bean public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs(); } // 多创建几个消费者 @Autowired private PushMessageListener pushMessageListener1; @Autowired private PushMessageListener pushMessageListener2; }