admin
2024-10-30 010ef2a907e66efd4702443c06cdd18f8a7ffa5b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.yeshi.buwan.util.mq.rabbit;
 
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
 
/**
 * @author hxh
 * @title: RabbitmqConfig
 * @description: TODO
 * @date 2024/10/12 11:10
 */
@Configuration
//@EnableRabbit
public class RabbitmqConfig {
 
    @Resource
    private RabbitTemplate rabbitTemplate;
 
    private Queue createQueue(String queueName){
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new Queue(queueName,true, false, false, headers);
    }
 
    @Bean("pluginDelayExchange")
    public CustomExchange pluginDelayExchange() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
        //第二个参数必须为x-delayed-message
        return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap);
    }
 
 
    @Bean("pluginDelayQueue")
    public Queue pluginDelayQueue() {
        return createQueue(RabbitmqSenderUtil.DELAY_QUEUE_NAME);
    }
 
    @Bean("pluginDelayConsumeFailQueue")
    public Queue pluginDelayConsumeFailQueue() {
        return createQueue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME);
    }
 
    @Bean
    public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs();
    }
 
    @Bean
    public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs();
    }
 
    @Bean
    public Queue solrQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_SOLR);
    }
 
    @Bean
    public Queue updateVideoIqiyi2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_IQIYI_2);
    }
 
    @Bean
    public Queue updateVideoFuntv2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_FUNTV_2);
    }
 
    @Bean
    public Queue updateVideoPPTVQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_PPTV);
    }
 
    @Bean
    public Queue videoResourceDeleteQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_RESOURCE_DELETE);
    }
 
    @Bean("updateVideoExtraInfoQueue")
    public Queue updateVideoExtraInfoQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
    }
 
    @Bean
    public Queue updateInternetSearchQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_INTERNET_SEARCH);
    }
 
    @Bean
    public Queue updateResourceVideoQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_RESOURCE_VIDEO);
    }
 
    @Bean("videoSyncDataV2Queue")
    public Queue videoSyncDataV2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_SYNCDATA_V2);
    }
 
    @Bean("videoExtraInfoExchange")
    public FanoutExchange videoExtraInfoExchange() {
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new FanoutExchange(RabbitmqManager.TOPIC_VIDEO_EXTRAINFO, true, false, headers);
    }
 
    @Bean
    public Binding videoExtraInfoExchangeBinding(@Qualifier("updateVideoExtraInfoQueue") Queue queue, @Qualifier("videoExtraInfoExchange") FanoutExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange);
    }
 
    @Bean("videoInfoChangeExchange")
    public FanoutExchange videoInfoChangeExchange() {
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new FanoutExchange(RabbitmqManager.TOPIC_VIDEO_INFO_CHANGE, true, false, headers);
    }
 
    @Bean
    public Binding videoInfoChangeExchangeBinding(@Qualifier("videoSyncDataV2Queue") Queue queue, @Qualifier("videoInfoChangeExchange") FanoutExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange);
    }
 
 
 
}