admin
2024-10-16 7fa83e5dd03f7896bd1d1e8c47f5e926ff3d4ba0
CMQ改造为rabbitmq
2个文件已删除
9个文件已修改
10个文件已添加
1302 ■■■■■ 已修改文件
service-push/pom.xml 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/PushApplication.java 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/consumer/mq/PushTaskConsumer.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/controller/PushCallbackController.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/CMQManager.java 216 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/PushManager.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/DelayMsgInfo.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitDelayConsumeFailConsumer.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitDelayConsumer.java 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqConfig.java 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqManager.java 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqMsgConsumeUtil.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqSenderUtil.java 161 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/InvalidDeviceTokenListener.java 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/PushMessageListener.java 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/TestListener.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/java/com/ks/push/utils/push/JpushUtil.java 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/resources/application-dev.yml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/resources/application-pro.yml 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/main/resources/application.yml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/src/test/java/com/ks/push/TaskTest.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service-push/pom.xml
@@ -28,7 +28,7 @@
        <dependency>
            <groupId>com.ks</groupId>
            <artifactId>facade-push</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <version>0.0.2-SNAPSHOT</version>
        </dependency>
        <dependency>
@@ -75,6 +75,13 @@
                相当于compile,但是打包阶段做了exclude操作-->
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
@@ -346,6 +353,8 @@
    </dependencies>
    <build>
service-push/src/main/java/com/ks/push/PushApplication.java
@@ -1,11 +1,5 @@
package com.ks.push;
import com.ks.push.consumer.mq.PushTaskConsumer;
import com.ks.push.dto.mq.InvalidDeviceTokenInfo;
import com.ks.push.manager.CMQManager;
import com.ks.push.manager.PushDeviceTokenManager;
import com.ks.push.pojo.DO.BPushDeviceToken;
import com.ks.push.pojo.DO.PushPlatform;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
@@ -15,10 +9,6 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.yeshi.utils.mq.JobThreadExecutorServiceImpl;
import javax.annotation.Resource;
import java.util.List;
@SpringBootApplication
@EnableTransactionManagement
@@ -26,12 +16,6 @@
@EnableDubbo(scanBasePackages = "com.ks.push.service.remote")
public class PushApplication implements ApplicationListener<ContextRefreshedEvent> {
    private final static Logger logger = LoggerFactory.getLogger(PushApplication.class);
    @Resource
    private PushTaskConsumer pushTaskConsumer;
    @Resource
    private PushDeviceTokenManager pushDeviceTokenManager;
    public static void main(String[] args) {
        SpringApplication.run(PushApplication.class, args);
@@ -42,48 +26,5 @@
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        logger.info("容器加载完毕");
        initMQMsgConsumer();
    }
    private void initMQMsgConsumer() {
        final int THREAD_NUM = 3;
        for (PushPlatform pushPlatform : PushPlatform.values()) {
            //创建三条队列处理
            for (int i = 0; i < THREAD_NUM; i++) {
                new JobThreadExecutorServiceImpl().run(new Runnable() {
                    @Override
                    public void run() {
                        pushTaskConsumer.consumeMsg(pushPlatform);
                    }
                });
            }
        }
        //清理无效token
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                try {
                    List<CMQManager.MQMsgConsumeResult> list = CMQManager.getInstance().consumeInvalidDeviceTokenQueue(16);
                    if (list != null) {
                        logger.info("清理无效token数量:" + list.size());
                        for (CMQManager.MQMsgConsumeResult result : list) {
                            InvalidDeviceTokenInfo tokenInfo = (InvalidDeviceTokenInfo) result.getData();
                            List<BPushDeviceToken> tokenList = pushDeviceTokenManager.list(tokenInfo.getAppCode(), tokenInfo.getPushPlatform(), tokenInfo.getToken(), 1, 20);
                            if (tokenList != null) {
                                for (BPushDeviceToken token : tokenList) {
                                    logger.info("清理无效token id-{}" + token.getId());
                                    //删除
                                    pushDeviceTokenManager.deleteByPrimaryKey(token.getId());
                                }
                            }
                            CMQManager.getInstance().deleteMsg(CMQManager.PUSH_TOKEN_INVALID, result.getReceiptHandle());
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
service-push/src/main/java/com/ks/push/consumer/mq/PushTaskConsumer.java
File was deleted
service-push/src/main/java/com/ks/push/controller/PushCallbackController.java
@@ -4,7 +4,7 @@
import com.alibaba.fastjson.JSONObject;
import com.ks.push.dto.mq.InvalidDeviceTokenInfo;
import com.ks.push.manager.BPushPlatformAppInfoManager;
import com.ks.push.manager.CMQManager;
import com.ks.push.manager.rabbitmq.RabbitmqManager;
import com.ks.push.pojo.DO.PushPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +30,9 @@
    @Resource
    private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
    @Resource
    private RabbitmqManager rabbitmqManager;
    //回调接口详情:https://developer.huawei.com/consumer/cn/doc/development/HMSCore-Guides/msg-receipt-guide-0000001050040176#ZH-CN_TOPIC_0000001087208860__p121151147184318
@@ -65,7 +68,7 @@
                logger.debug("华为消息回执-推送失败:bigTag-{}  token-{} status-{}", bigTag, token, status);
                if (status == 2 || status == 5 || status == 10) {
                    //删除无效设备
                    CMQManager.getInstance().addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.hw, token));
                   rabbitmqManager.addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.hw, token));
                }
            }
        }
@@ -104,7 +107,7 @@
                    String[] regIds = regIdStr.split(",");
                    for (String rid : regIds) {
                        //删除无效设备
                        CMQManager.getInstance().addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.xm, rid));
                        rabbitmqManager.addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.xm, rid));
                    }
                    break;
            }
service-push/src/main/java/com/ks/push/manager/CMQManager.java
File was deleted
service-push/src/main/java/com/ks/push/manager/PushManager.java
@@ -4,6 +4,7 @@
import com.ks.push.dao.BPushTaskDao;
import com.ks.push.dto.BPushDeviceDataSet;
import com.ks.push.exception.BPushTaskException;
import com.ks.push.manager.rabbitmq.RabbitmqManager;
import com.ks.push.pojo.DO.BPushDeviceToken;
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
import com.ks.push.pojo.DO.BPushTask;
@@ -40,6 +41,9 @@
    @Resource
    private RedisTemplate redisTemplate;
    @Resource
    private RabbitmqManager rabbitmqManager;
    /**
@@ -138,7 +142,7 @@
                        //修改总数
                        pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
                    }
                    CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet);
                    rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet);
                }
                logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count);
            }
service-push/src/main/java/com/ks/push/manager/rabbitmq/DelayMsgInfo.java
New file
@@ -0,0 +1,65 @@
package com.ks.push.manager.rabbitmq;
/**
 * @author hxh
 * @title: DelayMsgInfo
 * @description: 延迟消息的数据
 * @date 2024/10/12 13:31
 */
public class DelayMsgInfo {
    private String queueName; // 队列名称
    private String exchangeName; // 交换机名称
    private String routingKey;
    private int delayMs; // 延迟的时间
    private int sendCount;// 发送次数
    private String msg;// 消息内容
    public String getRoutingKey() {
        return routingKey;
    }
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
    public String getQueueName() {
        return queueName;
    }
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    public String getExchangeName() {
        return exchangeName;
    }
    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }
    public int getDelayMs() {
        return delayMs;
    }
    public void setDelayMs(int delayMs) {
        this.delayMs = delayMs;
    }
    public int getSendCount() {
        return sendCount;
    }
    public void setSendCount(int sendCount) {
        this.sendCount = sendCount;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitDelayConsumeFailConsumer.java
New file
@@ -0,0 +1,39 @@
package com.ks.push.manager.rabbitmq;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitDelayConsumer
 * @description: 消费重试队列处理
 * @date 2024/10/12 11:21
 */
@Component
public class RabbitDelayConsumeFailConsumer {
    private Logger logger = LoggerFactory.getLogger("infoLog");
    @Resource
    private RabbitTemplate rabbitTemplate;
    @RabbitListener(queues = RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.info("RabbitDelayConsumeFailConsumer-{}", msg);
        DelayMsgInfo msgInfo = new Gson().fromJson(msg, DelayMsgInfo.class);
        Map<String, String> headers = new HashMap<>();
        headers.put("retry_count", (msgInfo.getSendCount() + 1) + "");
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg(), headers);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitDelayConsumer.java
New file
@@ -0,0 +1,45 @@
package com.ks.push.manager.rabbitmq;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: RabbitDelayConsumer
 * @description: 延时队列消费处理
 * @date 2024/10/12 11:21
 */
@Component
public class RabbitDelayConsumer  {
   private  Logger logger = LoggerFactory.getLogger("infoLog");
    @Resource
    private RabbitTemplate rabbitTemplate;
    @RabbitListener(queues = RabbitmqSenderUtil.DELAY_QUEUE_NAME, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg =  new String( message.getBody(), StandardCharsets.UTF_8);
        logger.info("RabbitDelayConsumer-{}",msg);
        DelayMsgInfo msgInfo =  new Gson().fromJson(msg, DelayMsgInfo.class);
        if(!StringUtil.isNullOrEmpty(msgInfo.getQueueName())){
          // 队列消息
            RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg());
        }else{
            // 交换机消息
            RabbitmqSenderUtil.sendExchangeMsg(rabbitTemplate, msgInfo.getExchangeName(), msgInfo.getRoutingKey(), msgInfo.getMsg());
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqConfig.java
New file
@@ -0,0 +1,103 @@
package com.ks.push.manager.rabbitmq;
import com.ks.push.manager.rabbitmq.consumer.PushMessageListener;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitmqConfig
 * @description: Rabbitmq配置
 * @date 2024/10/16 11:27
 */
@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue tokenInvalidQueue() {
        return new Queue("bpush-token-invalid");
    }
    @Bean
    public Queue xmPushQueue() {
        return new Queue("bpush-xm");
    }
    @Bean
    public Queue vivoPushQueue() {
        return new Queue("bpush-vivo");
    }
    @Bean
    public Queue oppoPushQueue() {
        return new Queue("bpush-oppo");
    }
    @Bean
    public Queue mzPushQueue() {
        return new Queue("bpush-mz");
    }
    @Bean
    public Queue ipushPushQueue() {
        return new Queue("bpush-jpush");
    }
    @Bean
    public Queue huaweiPushQueue() {
        return new Queue("bpush-huawei");
    }
    @Bean
    public Queue testQueue() {
        return new Queue("test");
    }
    @Bean("pluginDelayExchange")
    public CustomExchange pluginDelayExchange() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
        //第二个参数必须为x-delayed-message
        return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap);
    }
    @Bean("pluginDelayQueue")
    public Queue pluginDelayQueue() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-message-ttl", 24 * 60 * 60 * 1000);
        return new Queue(RabbitmqSenderUtil.DELAY_QUEUE_NAME, true, false, false, headers);
    }
    @Bean("pluginDelayConsumeFailQueue")
    public Queue pluginDelayConsumeFailQueue() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-message-ttl", 24 * 60 * 60 * 1000);
        return new Queue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, true, false, false, headers);
    }
    @Bean
    public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs();
    }
    @Bean
    public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs();
    }
    // 多创建几个消费者
    @Autowired
    private PushMessageListener pushMessageListener1;
    @Autowired
    private PushMessageListener pushMessageListener2;
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqManager.java
New file
@@ -0,0 +1,139 @@
package com.ks.push.manager.rabbitmq;
import com.google.gson.Gson;
import com.ks.push.dto.BPushDeviceDataSet;
import com.ks.push.dto.mq.InvalidDeviceTokenInfo;
import com.ks.push.pojo.DO.PushPlatform;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author hxh
 * @title: RabbitmqManager
 * @description: Rabbitmq管理器
 * @date 2024/10/16 13:23
 */
@Component
public class RabbitmqManager {
    /**
     * 小米推送队列
     */
    public static String PUSH_XM = "bpush-xm";
    /**
     * 华为推送队列
     */
    public static String PUSH_HUAWEI = "bpush-huawei";
    /**
     * oppo推送队列
     */
    public static String PUSH_OPPO = "bpush-oppo";
    /**
     * vivo推送队列
     */
    public static String PUSH_VIVO = "bpush-vivo";
    /**
     * 魅族推送队列
     */
    public static String PUSH_MZ = "bpush-mz";
    /**
     * 极光推送队列
     */
    public static String PUSH_JPUSH = "bpush-jpush";
    /**
     * 无效设备队列
     */
    public static String PUSH_TOKEN_INVALID = "bpush-token-invalid";
    @Resource
    private RabbitTemplate rabbitTemplate;
    private String getQueueName(PushPlatform platform) {
        String queueName = null;
        if (platform == PushPlatform.xm) {
            queueName = PUSH_XM;
        } else if (platform == PushPlatform.hw) {
            queueName = PUSH_HUAWEI;
        } else if (platform == PushPlatform.oppo) {
            queueName = PUSH_OPPO;
        } else if (platform == PushPlatform.vivo) {
            queueName = PUSH_VIVO;
        } else if (platform == PushPlatform.mz) {
            queueName = PUSH_MZ;
        }else if (platform == PushPlatform.jpush) {
            queueName = PUSH_JPUSH;
        }
        return queueName;
    }
    /**
     * 添加到推送队列
     *
     * @param platform
     * @param dataSet
     */
    public void addToPushQueue(PushPlatform platform, BPushDeviceDataSet dataSet) {
        String queueName = getQueueName(platform);
        if (queueName == null) {
            return;
        }
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, queueName, new Gson().toJson(dataSet));
    }
    /**
     * 发送无效设备消息
     *
     * @param info
     */
    public void addInvalidDevieToken(InvalidDeviceTokenInfo info) {
        if (info == null) {
            return;
        }
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, PUSH_TOKEN_INVALID, new Gson().toJson(info));
    }
    public static class MQMsgConsumeResult {
        private String queueName;
        private Object data;
        private String receiptHandle;
        public MQMsgConsumeResult(Object data, String queueName, String receiptHandle) {
            this.data = data;
            this.queueName = queueName;
            this.receiptHandle = receiptHandle;
        }
        public Object getData() {
            return data;
        }
        public void setData(Object data) {
            this.data = data;
        }
        public String getReceiptHandle() {
            return receiptHandle;
        }
        public void setReceiptHandle(String receiptHandle) {
            this.receiptHandle = receiptHandle;
        }
        public String getQueueName() {
            return queueName;
        }
        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqMsgConsumeUtil.java
New file
@@ -0,0 +1,35 @@
package com.ks.push.manager.rabbitmq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.io.IOException;
/**
 * @author hxh
 * @title: RabbitmqMsgConsumeUtil
 * @description: 消息消费工具
 * @date 2024/10/12 14:43
 */
public class RabbitmqMsgConsumeUtil {
    public interface  IMessageProcess{
        public void onProcess() throws Exception;
    }
    public static void processMessage(Message message, Channel channel, RabbitTemplate rabbitTemplate, IMessageProcess messageProcessor) throws IOException {
        try {
            messageProcessor.onProcess();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch(Exception e){
            if(RabbitmqSenderUtil.retryConsume(rabbitTemplate, message)){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }else{
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/RabbitmqSenderUtil.java
New file
@@ -0,0 +1,161 @@
package com.ks.push.manager.rabbitmq;
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitmqUtil
 * @description: mq消息发送工具
 * @date 2024/10/12 13:04
 */
public class RabbitmqSenderUtil {
    // 延迟交换机
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
    // 延迟队列
    public static final String DELAY_QUEUE_NAME = "delay_queue";
    //常规延迟队列的routingKey
    public static final String ROUTING_KEY_COMMON_DELAY = "delay";
    // 消费失败的延迟消息
    public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay";
    // 消费失败处理队列
    public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue";
    /**
     * @return void
     * @author hxh
     * @description 发送队列消息
     * @date 13:06 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: message
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) {
        rabbitTemplate.convertAndSend(queueName, message);
    }
    /**
     * @return void
     * @author hxh
     * @description 发送带头部的消息
     * @date 13:54 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: headers
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map<String, String> headers) {
        MessageProperties messageProperties = new MessageProperties();
        if (headers != null) {
            for (String key : headers.keySet()) {
                messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除
            }
        }
        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致
    }
    /**
     * @return void
     * @author hxh
     * @description 发送队列延迟消息
     * @date 13:07 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: delayMS
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMS);
        msgInfo.setQueueName(queueName);
        msgInfo.setMsg(msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致
    }
    /**
     * @return void
     * @author hxh
     * @description 发送交换机消息
     * @date 13:13 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: message
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
    /**
     * @return void
     * @author hxh
     * @description 发送延迟交换机消息
     * @date 13:39 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: msg
     * @param: delayMs
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMs);
        msgInfo.setExchangeName(exchangeName);
        msgInfo.setMsg(msg);
        msgInfo.setRoutingKey(routingKey);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMs);
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);
    }
    /**
     * @author hxh
     * @description 添加消息到重新消费队列
     * @date 14:10 2024/10/12
     * @param: rabbitTemplate
     * @param: message
     * @return boolean 加入重新消费队列
     **/
    public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) {
        Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
        int retryCount = 0;
        if (messageHeaders.containsKey("retry_count")) {
            retryCount = Integer.parseInt(messageHeaders.get("retry_count") + "");
        }
        // 重试5次
        if (retryCount >= 5) {
            return false;
        }
        // 发送延迟消息
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue());
        msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8));
        msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
        msgInfo.setSendCount(retryCount);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", 5*60*1000);
//        messageProperties.setHeader("x-delay", 5*1000);// 测试为5s
        Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew);
        return true;
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/InvalidDeviceTokenListener.java
New file
@@ -0,0 +1,64 @@
package com.ks.push.manager.rabbitmq.consumer;
import com.google.gson.Gson;
import com.ks.push.dto.BPushDeviceDataSet;
import com.ks.push.dto.mq.InvalidDeviceTokenInfo;
import com.ks.push.manager.BPushPlatformAppInfoManager;
import com.ks.push.manager.PushDeviceTokenManager;
import com.ks.push.manager.PushExcuteResultManager;
import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil;
import com.ks.push.pojo.DO.BPushDeviceToken;
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
import com.ks.push.pojo.DO.BPushTask;
import com.ks.push.pojo.DO.PushPlatform;
import com.ks.push.service.BPushTaskService;
import com.ks.push.utils.PushUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class InvalidDeviceTokenListener {
    static Logger logger = LoggerFactory.getLogger(InvalidDeviceTokenListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private PushDeviceTokenManager pushDeviceTokenManager;
    @RabbitListener(queues = {"bpush-xm","bpush-vivo","bpush-oppo","bpush-mz","bpush-jpush","bpush-huawei"}, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                InvalidDeviceTokenInfo tokenInfo = new Gson().fromJson(result, InvalidDeviceTokenInfo.class);
                List<BPushDeviceToken> tokenList = pushDeviceTokenManager.list(tokenInfo.getAppCode(), tokenInfo.getPushPlatform(), tokenInfo.getToken(), 1, 20);
                if (tokenList != null) {
                    for (BPushDeviceToken token : tokenList) {
                        logger.info("清理无效token id-{}" + token.getId());
                        //删除
                        pushDeviceTokenManager.deleteByPrimaryKey(token.getId());
                    }
                }
            }
        });
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/PushMessageListener.java
New file
@@ -0,0 +1,115 @@
package com.ks.push.manager.rabbitmq.consumer;
import com.google.gson.Gson;
import com.ks.push.dto.BPushDeviceDataSet;
import com.ks.push.manager.BPushPlatformAppInfoManager;
import com.ks.push.manager.PushExcuteResultManager;
import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil;
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
import com.ks.push.pojo.DO.BPushTask;
import com.ks.push.pojo.DO.PushPlatform;
import com.ks.push.service.BPushTaskService;
import com.ks.push.utils.PushUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
@Scope("prototype")
public class PushMessageListener {
    static Logger logger = LoggerFactory.getLogger(PushMessageListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private BPushTaskService bPushTaskService;
    @Resource
    private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
    @Resource
    private PushExcuteResultManager pushExcuteResultManager;
    @RabbitListener(queues = {"bpush-xm","bpush-vivo","bpush-oppo","bpush-mz","bpush-jpush","bpush-huawei"}, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                String queueName = message.getMessageProperties().getConsumerQueue();
                logger.info("队列名称:{} 消息内容:{}", queueName, result);
                BPushDeviceDataSet dataSet = new Gson().fromJson(result, BPushDeviceDataSet.class);
                try {
                    PushPlatform platform=null;
                    switch(queueName){
                        case "bpush-xm":
                            platform = PushPlatform.xm;
                            break;
                        case "bpush-vivo":
                            platform = PushPlatform.vivo;
                            break;
                        case "bpush-oppo":
                            platform = PushPlatform.oppo;
                            break;
                        case "bpush-mz":
                            platform = PushPlatform.mz;
                            break;
                        case "bpush-jpush":
                            platform = PushPlatform.jpush;
                            break;
                        case "bpush-huawei":
                            platform = PushPlatform.hw;
                            break;
                    }
                    BPushTask task = bPushTaskService.getTask(dataSet.getTaskId());
                    if (task != null) {
                        if (task.getState() == BPushTask.STATE_PUSHING) {
                            BPushPlatformAppInfo platformAppInfo = bPushPlatformAppInfoManager.selectByAppCodeAndPlatform(task.getAppCode(), platform);
                            if (platformAppInfo != null) {
                                try {
                                    PushUtil.pushNotifyCation(platform, platformAppInfo.getPushAppInfo(), task.getMessage(), task.getId() + "#" + dataSet.getBatchId(), dataSet.getDeviceTokenList());
                                    logger.info("{}推送任务执行成功,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    logger.error("推送出错:ttaskId-{},batchId-{},错误原因:{}", task.getId(), dataSet.getBatchId(), e.getMessage());
                                    logger.error("推送出错", e);
                                }
                            }
                        } else {
                            if (task.getState() == BPushTask.STATE_PAUSED) {
                                //任务暂停,不删除消息等待下次消费
                                return;
                            } else if (task.getState() == BPushTask.STATE_CANCELED) {
                                //任务取消,删除消息
                            }
                        }
                    } else {
                        logger.error("任务为空,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId());
                    }
                    pushExcuteResultManager.batchPushFinish(dataSet, platform);
                    logger.info("{}推送任务执行结束,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId());
                } catch (Exception e) {
                    logger.error("推送出错:", e);
                    throw e;
                }
            }
        });
    }
}
service-push/src/main/java/com/ks/push/manager/rabbitmq/consumer/TestListener.java
New file
@@ -0,0 +1,44 @@
package com.ks.push.manager.rabbitmq.consumer;
import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
@Scope("prototype")
public class TestListener {
    static Logger logger = LoggerFactory.getLogger(TestListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @RabbitListener(queues = {"test"}, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                String queueName = message.getMessageProperties().getConsumerQueue();
                logger.info("队列名称:{} 消息内容:{} 线程ID:{}", queueName, result, Thread.currentThread().getId());
            }
        });
    }
}
service-push/src/main/java/com/ks/push/utils/push/JpushUtil.java
@@ -9,9 +9,12 @@
import cn.jpush.api.push.model.Platform;
import cn.jpush.api.push.model.PushPayload;
import cn.jpush.api.push.model.audience.Audience;
import cn.jpush.api.push.model.notification.AndroidNotification;
import cn.jpush.api.push.model.notification.Notification;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.ks.push.utils.JPushUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yeshi.utils.push.entity.PushAppInfo;
@@ -55,20 +58,25 @@
            params = message.getActivityParams();
        }
        PushPayload.Builder payloadBuilder = PushPayload.newBuilder().
                setPlatform(Platform.all())
                .setNotification(Notification.android(message.getContent(), message.getTitle(), params));
        AndroidNotification.Builder androidBuilder = AndroidNotification.newBuilder().setAlert(message.getContent()).setTitle(message.getTitle());
        if (regIds != null) {
            payloadBuilder.setAudience(Audience.registrationId(regIds));
        } else if (aliasList != null) {
            payloadBuilder.setAudience(Audience.alias(aliasList));
        //通过intent推送
        if (message.getActivity().startsWith("intent:#")) {
            JsonObject intent = new JsonObject();
            intent.addProperty("url", message.getActivity());
            androidBuilder = androidBuilder.setIntent(intent).addExtras(message.getActivityParams());
        } else {
            payloadBuilder.setAudience(Audience.all());
            androidBuilder = androidBuilder.addExtras(params);
        }
        PushPayload.Builder payloadBuilder = PushPayload.newBuilder().
                setPlatform(Platform.all())
                .setNotification(Notification.newBuilder()
                        .addPlatformNotification(androidBuilder.build())
                        .build());
        loadTarget(payloadBuilder, regIds, aliasList);
        PushPayload payload = payloadBuilder.build();
        try {
            PushResult result = jpushClient.sendPush(payload);
@@ -83,6 +91,16 @@
            LOG.info("Error Message: " + e.getErrorMessage());
        }
        return null;
    }
    private static void loadTarget(PushPayload.Builder builder, List<String> regIds, List<String> aliasList) {
        if (regIds != null) {
            builder.setAudience(Audience.registrationId(regIds));
        } else if (aliasList != null) {
            builder.setAudience(Audience.alias(aliasList));
        } else {
            builder.setAudience(Audience.all());
        }
    }
@@ -104,13 +122,7 @@
                .setMessage(Message.newBuilder()
                        .setMsgContent(data.toJSONString())
                        .build());
        if (regIds != null) {
            payloadBuilder.setAudience(Audience.registrationId(regIds));
        } else if (aliasList != null) {
            payloadBuilder.setAudience(Audience.alias(aliasList));
        } else {
            payloadBuilder.setAudience(Audience.all());
        }
        loadTarget(payloadBuilder, regIds, aliasList);
        PushPayload payload = payloadBuilder.build();
@@ -131,22 +143,22 @@
    public static void main(String[] args) {
        Map<String, String> activityParams = new HashMap<>();
        activityParams.put("activity", "test");
        activityParams.put("type", "requestLocation");
        activityParams.put("type", "web");
        JSONObject jumpParams = new JSONObject();
        jumpParams.put("id", 123123);
        jumpParams.put("url", "http://www.baidu.com");
        activityParams.put("params", jumpParams.toJSONString());
        PushAppInfo appInfo = new PushAppInfo();
        appInfo.setAppKey("987d3d50f209994fa522d001");
        appInfo.setAppSecret("e4053447d0014576ea2c47ab");
//        PushMessage message = new PushMessage("你好啊", "下午好,hello world", null, null, null, activityParams);
        appInfo.setAppKey("6f219d3bfa78428537090fef");
        appInfo.setAppSecret("c48f35cd1cef4b717f721e4b");
        Map<String,Object> dataMap=new HashMap<>();
        dataMap.put("type","requestLocation");
        JSONObject data = new  JSONObject(dataMap);
        pushMessage(appInfo, data, Arrays.asList(new String[]{"100d8559098624d84e6"}), null);
        PushMessage pushMessage = new PushMessage();
        pushMessage.setActivity(JPushUtil.createIntent("com.yeshi.makemoney.video.app.push.PushOpenClickActivity", "com.yeshi.makemoney.video"));
        pushMessage.setActivityParams(activityParams);
        pushMessage.setTitle("测试标题");
        pushMessage.setContent("测试内容");
        pushNotification(appInfo, pushMessage, Arrays.asList(new String[]{"1507bfd3f737761f2f2"}), null);
    }
service-push/src/main/resources/application-dev.yml
@@ -42,6 +42,12 @@
          test_on_borrow: true
          min-idle: 8
  rabbitmq:
    host: 134.175.68.214
    port: 5672
    username: root
    password: yeshi2014
    virtual-host: /push
  datasource:
    url: jdbc:mysql://gz-cdb-r13d0yi9.sql.tencentcdb.com:62929/bpush?serverTimezone=GMT%2B8
@@ -50,6 +56,8 @@
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      stat-view-servlet:
        enabled: false
      initial-size: 8
      min-idle: 1
      max-active: 20
service-push/src/main/resources/application-pro.yml
@@ -41,7 +41,12 @@
        max-idle: 200
        test_on_borrow: true
        min-idle: 8
  rabbitmq:
    host: 172.16.16.38
    port: 5672
    username: root
    password: yeshi2014
    virtual-host: /push
  datasource:
    #jdbc:mysql://gz-cdb-r13d0yi9.sql.tencentcdb.com:62929/ks_goldcorn?serverTimezone=GMT%2B8
@@ -51,6 +56,8 @@
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      stat-view-servlet:
        enabled: false
      initial-size: 8
      min-idle: 1
      max-active: 20
service-push/src/main/resources/application.yml
@@ -1,3 +1,3 @@
spring:
  profiles:
    active: pro
    active: dev
service-push/src/test/java/com/ks/push/TaskTest.java
@@ -1,11 +1,14 @@
package com.ks.push;
import com.google.gson.Gson;
import com.ks.push.exception.BPushTaskException;
import com.ks.push.manager.rabbitmq.RabbitmqSenderUtil;
import com.ks.push.pojo.DO.BPushFilter;
import com.ks.push.pojo.DO.BPushMessage;
import com.ks.push.pojo.DO.BPushTask;
import com.ks.push.service.BPushTaskService;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
@@ -37,4 +40,14 @@
        bPushTaskService.createTask(task);
    }
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testRabbitMq(){
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, "test", "测试内容");
    }
}