| | |
| | | <dependency> |
| | | <groupId>com.ks</groupId> |
| | | <artifactId>facade-push</artifactId> |
| | | <version>0.0.1-SNAPSHOT</version> |
| | | <version>0.0.2-SNAPSHOT</version> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | |
| | | 相当于compile,但是打包阶段做了exclude操作--> |
| | | <!--<scope>provided</scope>--> |
| | | </dependency> |
| | | |
| | | <dependency> |
| | | <groupId>org.springframework.boot</groupId> |
| | | <artifactId>spring-boot-starter-amqp</artifactId> |
| | | </dependency> |
| | | |
| | | |
| | | |
| | | <dependency> |
| | | <groupId>org.springframework.boot</groupId> |
| | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | </dependencies> |
| | | |
| | | <build> |
| | |
| | | package com.ks.push; |
| | | |
| | | import com.ks.push.consumer.mq.PushTaskConsumer; |
| | | import com.ks.push.dto.mq.InvalidDeviceTokenInfo; |
| | | import com.ks.push.manager.CMQManager; |
| | | import com.ks.push.manager.PushDeviceTokenManager; |
| | | import com.ks.push.pojo.DO.BPushDeviceToken; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; |
| | | import org.mybatis.spring.annotation.MapperScan; |
| | | import org.slf4j.Logger; |
| | |
| | | import org.springframework.context.ApplicationListener; |
| | | import org.springframework.context.event.ContextRefreshedEvent; |
| | | import org.springframework.transaction.annotation.EnableTransactionManagement; |
| | | import org.yeshi.utils.mq.JobThreadExecutorServiceImpl; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.List; |
| | | |
| | | @SpringBootApplication |
| | | @EnableTransactionManagement |
| | |
| | | @EnableDubbo(scanBasePackages = "com.ks.push.service.remote") |
| | | public class PushApplication implements ApplicationListener<ContextRefreshedEvent> { |
| | | private final static Logger logger = LoggerFactory.getLogger(PushApplication.class); |
| | | |
| | | @Resource |
| | | private PushTaskConsumer pushTaskConsumer; |
| | | |
| | | @Resource |
| | | private PushDeviceTokenManager pushDeviceTokenManager; |
| | | |
| | | public static void main(String[] args) { |
| | | SpringApplication.run(PushApplication.class, args); |
| | |
| | | @Override |
| | | public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { |
| | | logger.info("容器加载完毕"); |
| | | initMQMsgConsumer(); |
| | | } |
| | | |
| | | private void initMQMsgConsumer() { |
| | | final int THREAD_NUM = 3; |
| | | for (PushPlatform pushPlatform : PushPlatform.values()) { |
| | | //创建三条队列处理 |
| | | for (int i = 0; i < THREAD_NUM; i++) { |
| | | new JobThreadExecutorServiceImpl().run(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | pushTaskConsumer.consumeMsg(pushPlatform); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | //清理无效token |
| | | new JobThreadExecutorServiceImpl().run(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | List<CMQManager.MQMsgConsumeResult> list = CMQManager.getInstance().consumeInvalidDeviceTokenQueue(16); |
| | | if (list != null) { |
| | | logger.info("清理无效token数量:" + list.size()); |
| | | for (CMQManager.MQMsgConsumeResult result : list) { |
| | | InvalidDeviceTokenInfo tokenInfo = (InvalidDeviceTokenInfo) result.getData(); |
| | | List<BPushDeviceToken> tokenList = pushDeviceTokenManager.list(tokenInfo.getAppCode(), tokenInfo.getPushPlatform(), tokenInfo.getToken(), 1, 20); |
| | | if (tokenList != null) { |
| | | for (BPushDeviceToken token : tokenList) { |
| | | logger.info("清理无效token id-{}" + token.getId()); |
| | | //删除 |
| | | pushDeviceTokenManager.deleteByPrimaryKey(token.getId()); |
| | | } |
| | | } |
| | | CMQManager.getInstance().deleteMsg(CMQManager.PUSH_TOKEN_INVALID, result.getReceiptHandle()); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | |
| | | } |
| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.ks.push.dto.mq.InvalidDeviceTokenInfo; |
| | | import com.ks.push.manager.BPushPlatformAppInfoManager; |
| | | import com.ks.push.manager.CMQManager; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqManager; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | |
| | | @Resource |
| | | private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; |
| | | |
| | | @Resource |
| | | private RabbitmqManager rabbitmqManager; |
| | | |
| | | |
| | | //回调接口详情:https://developer.huawei.com/consumer/cn/doc/development/HMSCore-Guides/msg-receipt-guide-0000001050040176#ZH-CN_TOPIC_0000001087208860__p121151147184318 |
| | |
| | | logger.debug("华为消息回执-推送失败:bigTag-{} token-{} status-{}", bigTag, token, status); |
| | | if (status == 2 || status == 5 || status == 10) { |
| | | //删除无效设备 |
| | | CMQManager.getInstance().addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.hw, token)); |
| | | rabbitmqManager.addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.hw, token)); |
| | | } |
| | | } |
| | | } |
| | |
| | | String[] regIds = regIdStr.split(","); |
| | | for (String rid : regIds) { |
| | | //删除无效设备 |
| | | CMQManager.getInstance().addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.xm, rid)); |
| | | rabbitmqManager.addInvalidDevieToken(new InvalidDeviceTokenInfo(appCode, PushPlatform.xm, rid)); |
| | | } |
| | | break; |
| | | } |
| | |
| | | import com.ks.push.dao.BPushTaskDao; |
| | | import com.ks.push.dto.BPushDeviceDataSet; |
| | | import com.ks.push.exception.BPushTaskException; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqManager; |
| | | import com.ks.push.pojo.DO.BPushDeviceToken; |
| | | import com.ks.push.pojo.DO.BPushPlatformAppInfo; |
| | | import com.ks.push.pojo.DO.BPushTask; |
| | |
| | | |
| | | @Resource |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | @Resource |
| | | private RabbitmqManager rabbitmqManager; |
| | | |
| | | |
| | | /** |
| | |
| | | //修改总数 |
| | | pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); |
| | | } |
| | | CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet); |
| | | rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet); |
| | | } |
| | | logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count); |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: DelayMsgInfo |
| | | * @description: 延迟消息的数据 |
| | | * @date 2024/10/12 13:31 |
| | | */ |
| | | public class DelayMsgInfo { |
| | | |
| | | private String queueName; // 队列名称 |
| | | private String exchangeName; // 交换机名称 |
| | | private String routingKey; |
| | | private int delayMs; // 延迟的时间 |
| | | private int sendCount;// 发送次数 |
| | | private String msg;// 消息内容 |
| | | |
| | | public String getRoutingKey() { |
| | | return routingKey; |
| | | } |
| | | |
| | | public void setRoutingKey(String routingKey) { |
| | | this.routingKey = routingKey; |
| | | } |
| | | |
| | | public String getQueueName() { |
| | | return queueName; |
| | | } |
| | | |
| | | public void setQueueName(String queueName) { |
| | | this.queueName = queueName; |
| | | } |
| | | |
| | | public String getExchangeName() { |
| | | return exchangeName; |
| | | } |
| | | |
| | | public void setExchangeName(String exchangeName) { |
| | | this.exchangeName = exchangeName; |
| | | } |
| | | |
| | | public int getDelayMs() { |
| | | return delayMs; |
| | | } |
| | | |
| | | public void setDelayMs(int delayMs) { |
| | | this.delayMs = delayMs; |
| | | } |
| | | |
| | | public int getSendCount() { |
| | | return sendCount; |
| | | } |
| | | |
| | | public void setSendCount(int sendCount) { |
| | | this.sendCount = sendCount; |
| | | } |
| | | |
| | | public String getMsg() { |
| | | return msg; |
| | | } |
| | | |
| | | public void setMsg(String msg) { |
| | | this.msg = msg; |
| | | } |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.nio.charset.StandardCharsets; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitDelayConsumer |
| | | * @description: 消费重试队列处理 |
| | | * @date 2024/10/12 11:21 |
| | | */ |
| | | @Component |
| | | public class RabbitDelayConsumeFailConsumer { |
| | | private Logger logger = LoggerFactory.getLogger("infoLog"); |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | @RabbitListener(queues = RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, ackMode = "MANUAL") |
| | | public void onMessage(Message message, Channel channel) throws Exception { |
| | | String msg = new String(message.getBody(), StandardCharsets.UTF_8); |
| | | logger.info("RabbitDelayConsumeFailConsumer-{}", msg); |
| | | DelayMsgInfo msgInfo = new Gson().fromJson(msg, DelayMsgInfo.class); |
| | | Map<String, String> headers = new HashMap<>(); |
| | | headers.put("retry_count", (msgInfo.getSendCount() + 1) + ""); |
| | | RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg(), headers); |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | } |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | import org.yeshi.utils.StringUtil; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.nio.charset.StandardCharsets; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitDelayConsumer |
| | | * @description: 延时队列消费处理 |
| | | * @date 2024/10/12 11:21 |
| | | */ |
| | | @Component |
| | | public class RabbitDelayConsumer { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger("infoLog"); |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | |
| | | @RabbitListener(queues = RabbitmqSenderUtil.DELAY_QUEUE_NAME, ackMode = "MANUAL") |
| | | public void onMessage(Message message, Channel channel) throws Exception { |
| | | String msg = new String( message.getBody(), StandardCharsets.UTF_8); |
| | | logger.info("RabbitDelayConsumer-{}",msg); |
| | | DelayMsgInfo msgInfo = new Gson().fromJson(msg, DelayMsgInfo.class); |
| | | if(!StringUtil.isNullOrEmpty(msgInfo.getQueueName())){ |
| | | // 队列消息 |
| | | RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg()); |
| | | }else{ |
| | | // 交换机消息 |
| | | RabbitmqSenderUtil.sendExchangeMsg(rabbitTemplate, msgInfo.getExchangeName(), msgInfo.getRoutingKey(), msgInfo.getMsg()); |
| | | } |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | } |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.ks.push.manager.rabbitmq.consumer.PushMessageListener; |
| | | import org.springframework.amqp.core.Binding; |
| | | import org.springframework.amqp.core.BindingBuilder; |
| | | import org.springframework.amqp.core.CustomExchange; |
| | | import org.springframework.amqp.core.Queue; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitmqConfig |
| | | * @description: Rabbitmq配置 |
| | | * @date 2024/10/16 11:27 |
| | | */ |
| | | @Configuration |
| | | public class RabbitmqConfig { |
| | | |
| | | @Bean |
| | | public Queue tokenInvalidQueue() { |
| | | return new Queue("bpush-token-invalid"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue xmPushQueue() { |
| | | return new Queue("bpush-xm"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue vivoPushQueue() { |
| | | return new Queue("bpush-vivo"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue oppoPushQueue() { |
| | | return new Queue("bpush-oppo"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue mzPushQueue() { |
| | | return new Queue("bpush-mz"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue ipushPushQueue() { |
| | | return new Queue("bpush-jpush"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue huaweiPushQueue() { |
| | | return new Queue("bpush-huawei"); |
| | | } |
| | | |
| | | @Bean |
| | | public Queue testQueue() { |
| | | return new Queue("test"); |
| | | } |
| | | |
| | | @Bean("pluginDelayExchange") |
| | | public CustomExchange pluginDelayExchange() { |
| | | Map<String, Object> argMap = new HashMap<>(); |
| | | argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout |
| | | //第二个参数必须为x-delayed-message |
| | | return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap); |
| | | } |
| | | |
| | | |
| | | @Bean("pluginDelayQueue") |
| | | public Queue pluginDelayQueue() { |
| | | Map<String, Object> headers = new HashMap<>(); |
| | | headers.put("x-message-ttl", 24 * 60 * 60 * 1000); |
| | | return new Queue(RabbitmqSenderUtil.DELAY_QUEUE_NAME, true, false, false, headers); |
| | | } |
| | | |
| | | @Bean("pluginDelayConsumeFailQueue") |
| | | public Queue pluginDelayConsumeFailQueue() { |
| | | Map<String, Object> headers = new HashMap<>(); |
| | | headers.put("x-message-ttl", 24 * 60 * 60 * 1000); |
| | | return new Queue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, true, false, false, headers); |
| | | } |
| | | |
| | | @Bean |
| | | public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) { |
| | | return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs(); |
| | | } |
| | | |
| | | @Bean |
| | | public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) { |
| | | return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs(); |
| | | } |
| | | |
| | | // 多创建几个消费者 |
| | | @Autowired |
| | | private PushMessageListener pushMessageListener1; |
| | | @Autowired |
| | | private PushMessageListener pushMessageListener2; |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.ks.push.dto.BPushDeviceDataSet; |
| | | import com.ks.push.dto.mq.InvalidDeviceTokenInfo; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitmqManager |
| | | * @description: Rabbitmq管理器 |
| | | * @date 2024/10/16 13:23 |
| | | */ |
| | | @Component |
| | | public class RabbitmqManager { |
| | | |
| | | /** |
| | | * 小米推送队列 |
| | | */ |
| | | public static String PUSH_XM = "bpush-xm"; |
| | | /** |
| | | * 华为推送队列 |
| | | */ |
| | | public static String PUSH_HUAWEI = "bpush-huawei"; |
| | | /** |
| | | * oppo推送队列 |
| | | */ |
| | | public static String PUSH_OPPO = "bpush-oppo"; |
| | | /** |
| | | * vivo推送队列 |
| | | */ |
| | | public static String PUSH_VIVO = "bpush-vivo"; |
| | | /** |
| | | * 魅族推送队列 |
| | | */ |
| | | public static String PUSH_MZ = "bpush-mz"; |
| | | |
| | | /** |
| | | * 极光推送队列 |
| | | */ |
| | | public static String PUSH_JPUSH = "bpush-jpush"; |
| | | |
| | | /** |
| | | * 无效设备队列 |
| | | */ |
| | | public static String PUSH_TOKEN_INVALID = "bpush-token-invalid"; |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | |
| | | private String getQueueName(PushPlatform platform) { |
| | | String queueName = null; |
| | | if (platform == PushPlatform.xm) { |
| | | queueName = PUSH_XM; |
| | | } else if (platform == PushPlatform.hw) { |
| | | queueName = PUSH_HUAWEI; |
| | | } else if (platform == PushPlatform.oppo) { |
| | | queueName = PUSH_OPPO; |
| | | } else if (platform == PushPlatform.vivo) { |
| | | queueName = PUSH_VIVO; |
| | | } else if (platform == PushPlatform.mz) { |
| | | queueName = PUSH_MZ; |
| | | }else if (platform == PushPlatform.jpush) { |
| | | queueName = PUSH_JPUSH; |
| | | } |
| | | return queueName; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 添加到推送队列 |
| | | * |
| | | * @param platform |
| | | * @param dataSet |
| | | */ |
| | | public void addToPushQueue(PushPlatform platform, BPushDeviceDataSet dataSet) { |
| | | String queueName = getQueueName(platform); |
| | | if (queueName == null) { |
| | | return; |
| | | } |
| | | RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, queueName, new Gson().toJson(dataSet)); |
| | | } |
| | | |
| | | /** |
| | | * 发送无效设备消息 |
| | | * |
| | | * @param info |
| | | */ |
| | | public void addInvalidDevieToken(InvalidDeviceTokenInfo info) { |
| | | if (info == null) { |
| | | return; |
| | | } |
| | | RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, PUSH_TOKEN_INVALID, new Gson().toJson(info)); |
| | | } |
| | | |
| | | |
| | | public static class MQMsgConsumeResult { |
| | | private String queueName; |
| | | private Object data; |
| | | private String receiptHandle; |
| | | |
| | | public MQMsgConsumeResult(Object data, String queueName, String receiptHandle) { |
| | | this.data = data; |
| | | this.queueName = queueName; |
| | | this.receiptHandle = receiptHandle; |
| | | } |
| | | |
| | | public Object getData() { |
| | | return data; |
| | | } |
| | | |
| | | public void setData(Object data) { |
| | | this.data = data; |
| | | } |
| | | |
| | | public String getReceiptHandle() { |
| | | return receiptHandle; |
| | | } |
| | | |
| | | public void setReceiptHandle(String receiptHandle) { |
| | | this.receiptHandle = receiptHandle; |
| | | } |
| | | |
| | | public String getQueueName() { |
| | | return queueName; |
| | | } |
| | | |
| | | public void setQueueName(String queueName) { |
| | | this.queueName = queueName; |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.rabbitmq.client.Channel; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | |
| | | import java.io.IOException; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitmqMsgConsumeUtil |
| | | * @description: 消息消费工具 |
| | | * @date 2024/10/12 14:43 |
| | | */ |
| | | public class RabbitmqMsgConsumeUtil { |
| | | |
| | | public interface IMessageProcess{ |
| | | public void onProcess() throws Exception; |
| | | } |
| | | |
| | | public static void processMessage(Message message, Channel channel, RabbitTemplate rabbitTemplate, IMessageProcess messageProcessor) throws IOException { |
| | | try { |
| | | messageProcessor.onProcess(); |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | }catch(Exception e){ |
| | | if(RabbitmqSenderUtil.retryConsume(rabbitTemplate, message)){ |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | }else{ |
| | | channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq; |
| | | |
| | | import com.google.gson.Gson; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.core.MessageProperties; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | |
| | | import java.nio.charset.StandardCharsets; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: RabbitmqUtil |
| | | * @description: mq消息发送工具 |
| | | * @date 2024/10/12 13:04 |
| | | */ |
| | | public class RabbitmqSenderUtil { |
| | | |
| | | // 延迟交换机 |
| | | public static final String DELAY_EXCHANGE_NAME = "delay_exchange"; |
| | | // 延迟队列 |
| | | public static final String DELAY_QUEUE_NAME = "delay_queue"; |
| | | |
| | | //常规延迟队列的routingKey |
| | | public static final String ROUTING_KEY_COMMON_DELAY = "delay"; |
| | | // 消费失败的延迟消息 |
| | | public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay"; |
| | | // 消费失败处理队列 |
| | | public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue"; |
| | | |
| | | |
| | | /** |
| | | * @return void |
| | | * @author hxh |
| | | * @description 发送队列消息 |
| | | * @date 13:06 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: queueName |
| | | * @param: message |
| | | **/ |
| | | public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) { |
| | | rabbitTemplate.convertAndSend(queueName, message); |
| | | } |
| | | |
| | | /** |
| | | * @return void |
| | | * @author hxh |
| | | * @description 发送带头部的消息 |
| | | * @date 13:54 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: queueName |
| | | * @param: msg |
| | | * @param: headers |
| | | **/ |
| | | public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map<String, String> headers) { |
| | | MessageProperties messageProperties = new MessageProperties(); |
| | | if (headers != null) { |
| | | for (String key : headers.keySet()) { |
| | | messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除 |
| | | } |
| | | } |
| | | Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties); |
| | | rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致 |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @return void |
| | | * @author hxh |
| | | * @description 发送队列延迟消息 |
| | | * @date 13:07 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: queueName |
| | | * @param: msg |
| | | * @param: delayMS |
| | | **/ |
| | | public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) { |
| | | DelayMsgInfo msgInfo = new DelayMsgInfo(); |
| | | msgInfo.setDelayMs(delayMS); |
| | | msgInfo.setQueueName(queueName); |
| | | msgInfo.setMsg(msg); |
| | | MessageProperties messageProperties = new MessageProperties(); |
| | | messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除 |
| | | Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); |
| | | rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致 |
| | | } |
| | | |
| | | /** |
| | | * @return void |
| | | * @author hxh |
| | | * @description 发送交换机消息 |
| | | * @date 13:13 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: exchangeName |
| | | * @param: routingKey |
| | | * @param: message |
| | | **/ |
| | | public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) { |
| | | rabbitTemplate.convertAndSend(exchangeName, routingKey, message); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @return void |
| | | * @author hxh |
| | | * @description 发送延迟交换机消息 |
| | | * @date 13:39 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: exchangeName |
| | | * @param: routingKey |
| | | * @param: msg |
| | | * @param: delayMs |
| | | **/ |
| | | public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) { |
| | | DelayMsgInfo msgInfo = new DelayMsgInfo(); |
| | | msgInfo.setDelayMs(delayMs); |
| | | msgInfo.setExchangeName(exchangeName); |
| | | msgInfo.setMsg(msg); |
| | | msgInfo.setRoutingKey(routingKey); |
| | | MessageProperties messageProperties = new MessageProperties(); |
| | | messageProperties.setHeader("x-delay", delayMs); |
| | | Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); |
| | | rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @description 添加消息到重新消费队列 |
| | | * @date 14:10 2024/10/12 |
| | | * @param: rabbitTemplate |
| | | * @param: message |
| | | * @return boolean 加入重新消费队列 |
| | | **/ |
| | | public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) { |
| | | Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders(); |
| | | int retryCount = 0; |
| | | if (messageHeaders.containsKey("retry_count")) { |
| | | retryCount = Integer.parseInt(messageHeaders.get("retry_count") + ""); |
| | | } |
| | | // 重试5次 |
| | | if (retryCount >= 5) { |
| | | return false; |
| | | } |
| | | |
| | | // 发送延迟消息 |
| | | DelayMsgInfo msgInfo = new DelayMsgInfo(); |
| | | msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue()); |
| | | msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8)); |
| | | msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); |
| | | msgInfo.setSendCount(retryCount); |
| | | MessageProperties messageProperties = new MessageProperties(); |
| | | messageProperties.setHeader("x-delay", 5*60*1000); |
| | | // messageProperties.setHeader("x-delay", 5*1000);// 测试为5s |
| | | Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties); |
| | | rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew); |
| | | return true; |
| | | } |
| | | |
| | | |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq.consumer; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.ks.push.dto.BPushDeviceDataSet; |
| | | import com.ks.push.dto.mq.InvalidDeviceTokenInfo; |
| | | import com.ks.push.manager.BPushPlatformAppInfoManager; |
| | | import com.ks.push.manager.PushDeviceTokenManager; |
| | | import com.ks.push.manager.PushExcuteResultManager; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil; |
| | | import com.ks.push.pojo.DO.BPushDeviceToken; |
| | | import com.ks.push.pojo.DO.BPushPlatformAppInfo; |
| | | import com.ks.push.pojo.DO.BPushTask; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import com.ks.push.service.BPushTaskService; |
| | | import com.ks.push.utils.PushUtil; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | import org.yeshi.utils.StringUtil; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.nio.charset.StandardCharsets; |
| | | import java.util.List; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: QueueHelloWorldListener |
| | | * @description: |
| | | * @date 2024/9/26 13:47 |
| | | */ |
| | | @Component |
| | | public class InvalidDeviceTokenListener { |
| | | |
| | | static Logger logger = LoggerFactory.getLogger(InvalidDeviceTokenListener.class); |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | @Resource |
| | | private PushDeviceTokenManager pushDeviceTokenManager; |
| | | |
| | | |
| | | @RabbitListener(queues = {"bpush-xm","bpush-vivo","bpush-oppo","bpush-mz","bpush-jpush","bpush-huawei"}, ackMode = "MANUAL") |
| | | public void onMessage(Message message, Channel channel) throws Exception { |
| | | RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> { |
| | | String result = new String(message.getBody(), StandardCharsets.UTF_8); |
| | | if (!StringUtil.isNullOrEmpty(result)) { |
| | | InvalidDeviceTokenInfo tokenInfo = new Gson().fromJson(result, InvalidDeviceTokenInfo.class); |
| | | List<BPushDeviceToken> tokenList = pushDeviceTokenManager.list(tokenInfo.getAppCode(), tokenInfo.getPushPlatform(), tokenInfo.getToken(), 1, 20); |
| | | if (tokenList != null) { |
| | | for (BPushDeviceToken token : tokenList) { |
| | | logger.info("清理无效token id-{}" + token.getId()); |
| | | //删除 |
| | | pushDeviceTokenManager.deleteByPrimaryKey(token.getId()); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq.consumer; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.ks.push.dto.BPushDeviceDataSet; |
| | | import com.ks.push.manager.BPushPlatformAppInfoManager; |
| | | import com.ks.push.manager.PushExcuteResultManager; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil; |
| | | import com.ks.push.pojo.DO.BPushPlatformAppInfo; |
| | | import com.ks.push.pojo.DO.BPushTask; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import com.ks.push.service.BPushTaskService; |
| | | import com.ks.push.utils.PushUtil; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.context.annotation.Scope; |
| | | import org.springframework.stereotype.Component; |
| | | import org.yeshi.utils.StringUtil; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.nio.charset.StandardCharsets; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: QueueHelloWorldListener |
| | | * @description: |
| | | * @date 2024/9/26 13:47 |
| | | */ |
| | | @Component |
| | | @Scope("prototype") |
| | | public class PushMessageListener { |
| | | |
| | | static Logger logger = LoggerFactory.getLogger(PushMessageListener.class); |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | @Resource |
| | | private BPushTaskService bPushTaskService; |
| | | |
| | | @Resource |
| | | private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; |
| | | |
| | | @Resource |
| | | private PushExcuteResultManager pushExcuteResultManager; |
| | | |
| | | |
| | | @RabbitListener(queues = {"bpush-xm","bpush-vivo","bpush-oppo","bpush-mz","bpush-jpush","bpush-huawei"}, ackMode = "MANUAL") |
| | | public void onMessage(Message message, Channel channel) throws Exception { |
| | | RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> { |
| | | String result = new String(message.getBody(), StandardCharsets.UTF_8); |
| | | if (!StringUtil.isNullOrEmpty(result)) { |
| | | String queueName = message.getMessageProperties().getConsumerQueue(); |
| | | logger.info("队列名称:{} 消息内容:{}", queueName, result); |
| | | BPushDeviceDataSet dataSet = new Gson().fromJson(result, BPushDeviceDataSet.class); |
| | | try { |
| | | PushPlatform platform=null; |
| | | switch(queueName){ |
| | | case "bpush-xm": |
| | | platform = PushPlatform.xm; |
| | | break; |
| | | case "bpush-vivo": |
| | | platform = PushPlatform.vivo; |
| | | break; |
| | | case "bpush-oppo": |
| | | platform = PushPlatform.oppo; |
| | | break; |
| | | case "bpush-mz": |
| | | platform = PushPlatform.mz; |
| | | break; |
| | | case "bpush-jpush": |
| | | platform = PushPlatform.jpush; |
| | | break; |
| | | case "bpush-huawei": |
| | | platform = PushPlatform.hw; |
| | | break; |
| | | } |
| | | BPushTask task = bPushTaskService.getTask(dataSet.getTaskId()); |
| | | if (task != null) { |
| | | if (task.getState() == BPushTask.STATE_PUSHING) { |
| | | BPushPlatformAppInfo platformAppInfo = bPushPlatformAppInfoManager.selectByAppCodeAndPlatform(task.getAppCode(), platform); |
| | | if (platformAppInfo != null) { |
| | | try { |
| | | PushUtil.pushNotifyCation(platform, platformAppInfo.getPushAppInfo(), task.getMessage(), task.getId() + "#" + dataSet.getBatchId(), dataSet.getDeviceTokenList()); |
| | | logger.info("{}推送任务执行成功,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId()); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | logger.error("推送出错:ttaskId-{},batchId-{},错误原因:{}", task.getId(), dataSet.getBatchId(), e.getMessage()); |
| | | logger.error("推送出错", e); |
| | | } |
| | | } |
| | | } else { |
| | | if (task.getState() == BPushTask.STATE_PAUSED) { |
| | | //任务暂停,不删除消息等待下次消费 |
| | | return; |
| | | } else if (task.getState() == BPushTask.STATE_CANCELED) { |
| | | //任务取消,删除消息 |
| | | } |
| | | } |
| | | } else { |
| | | logger.error("任务为空,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId()); |
| | | } |
| | | pushExcuteResultManager.batchPushFinish(dataSet, platform); |
| | | logger.info("{}推送任务执行结束,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId()); |
| | | } catch (Exception e) { |
| | | logger.error("推送出错:", e); |
| | | throw e; |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | } |
New file |
| | |
| | | package com.ks.push.manager.rabbitmq.consumer; |
| | | |
| | | import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.context.annotation.Scope; |
| | | import org.springframework.stereotype.Component; |
| | | import org.yeshi.utils.StringUtil; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.nio.charset.StandardCharsets; |
| | | |
| | | /** |
| | | * @author hxh |
| | | * @title: QueueHelloWorldListener |
| | | * @description: |
| | | * @date 2024/9/26 13:47 |
| | | */ |
| | | |
| | | @Component |
| | | @Scope("prototype") |
| | | public class TestListener { |
| | | |
| | | static Logger logger = LoggerFactory.getLogger(TestListener.class); |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | |
| | | @RabbitListener(queues = {"test"}, ackMode = "MANUAL") |
| | | public void onMessage(Message message, Channel channel) throws Exception { |
| | | RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> { |
| | | String result = new String(message.getBody(), StandardCharsets.UTF_8); |
| | | if (!StringUtil.isNullOrEmpty(result)) { |
| | | String queueName = message.getMessageProperties().getConsumerQueue(); |
| | | logger.info("队列名称:{} 消息内容:{} 线程ID:{}", queueName, result, Thread.currentThread().getId()); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | |
| | | import cn.jpush.api.push.model.Platform; |
| | | import cn.jpush.api.push.model.PushPayload; |
| | | import cn.jpush.api.push.model.audience.Audience; |
| | | import cn.jpush.api.push.model.notification.AndroidNotification; |
| | | import cn.jpush.api.push.model.notification.Notification; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.JsonObject; |
| | | import com.ks.push.utils.JPushUtil; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.yeshi.utils.push.entity.PushAppInfo; |
| | |
| | | params = message.getActivityParams(); |
| | | } |
| | | |
| | | PushPayload.Builder payloadBuilder = PushPayload.newBuilder(). |
| | | setPlatform(Platform.all()) |
| | | .setNotification(Notification.android(message.getContent(), message.getTitle(), params)); |
| | | AndroidNotification.Builder androidBuilder = AndroidNotification.newBuilder().setAlert(message.getContent()).setTitle(message.getTitle()); |
| | | |
| | | |
| | | if (regIds != null) { |
| | | payloadBuilder.setAudience(Audience.registrationId(regIds)); |
| | | } else if (aliasList != null) { |
| | | payloadBuilder.setAudience(Audience.alias(aliasList)); |
| | | //通过intent推送 |
| | | if (message.getActivity().startsWith("intent:#")) { |
| | | JsonObject intent = new JsonObject(); |
| | | intent.addProperty("url", message.getActivity()); |
| | | androidBuilder = androidBuilder.setIntent(intent).addExtras(message.getActivityParams()); |
| | | } else { |
| | | payloadBuilder.setAudience(Audience.all()); |
| | | androidBuilder = androidBuilder.addExtras(params); |
| | | } |
| | | |
| | | PushPayload.Builder payloadBuilder = PushPayload.newBuilder(). |
| | | setPlatform(Platform.all()) |
| | | .setNotification(Notification.newBuilder() |
| | | .addPlatformNotification(androidBuilder.build()) |
| | | .build()); |
| | | loadTarget(payloadBuilder, regIds, aliasList); |
| | | PushPayload payload = payloadBuilder.build(); |
| | | |
| | | |
| | | try { |
| | | PushResult result = jpushClient.sendPush(payload); |
| | |
| | | LOG.info("Error Message: " + e.getErrorMessage()); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private static void loadTarget(PushPayload.Builder builder, List<String> regIds, List<String> aliasList) { |
| | | if (regIds != null) { |
| | | builder.setAudience(Audience.registrationId(regIds)); |
| | | } else if (aliasList != null) { |
| | | builder.setAudience(Audience.alias(aliasList)); |
| | | } else { |
| | | builder.setAudience(Audience.all()); |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | .setMessage(Message.newBuilder() |
| | | .setMsgContent(data.toJSONString()) |
| | | .build()); |
| | | if (regIds != null) { |
| | | payloadBuilder.setAudience(Audience.registrationId(regIds)); |
| | | } else if (aliasList != null) { |
| | | payloadBuilder.setAudience(Audience.alias(aliasList)); |
| | | } else { |
| | | payloadBuilder.setAudience(Audience.all()); |
| | | } |
| | | loadTarget(payloadBuilder, regIds, aliasList); |
| | | |
| | | PushPayload payload = payloadBuilder.build(); |
| | | |
| | |
| | | |
| | | public static void main(String[] args) { |
| | | Map<String, String> activityParams = new HashMap<>(); |
| | | activityParams.put("activity", "test"); |
| | | activityParams.put("type", "requestLocation"); |
| | | activityParams.put("type", "web"); |
| | | JSONObject jumpParams = new JSONObject(); |
| | | jumpParams.put("id", 123123); |
| | | jumpParams.put("url", "http://www.baidu.com"); |
| | | activityParams.put("params", jumpParams.toJSONString()); |
| | | |
| | | PushAppInfo appInfo = new PushAppInfo(); |
| | | appInfo.setAppKey("987d3d50f209994fa522d001"); |
| | | appInfo.setAppSecret("e4053447d0014576ea2c47ab"); |
| | | // PushMessage message = new PushMessage("你好啊", "下午好,hello world", null, null, null, activityParams); |
| | | appInfo.setAppKey("6f219d3bfa78428537090fef"); |
| | | appInfo.setAppSecret("c48f35cd1cef4b717f721e4b"); |
| | | |
| | | Map<String,Object> dataMap=new HashMap<>(); |
| | | dataMap.put("type","requestLocation"); |
| | | |
| | | JSONObject data = new JSONObject(dataMap); |
| | | pushMessage(appInfo, data, Arrays.asList(new String[]{"100d8559098624d84e6"}), null); |
| | | PushMessage pushMessage = new PushMessage(); |
| | | pushMessage.setActivity(JPushUtil.createIntent("com.yeshi.makemoney.video.app.push.PushOpenClickActivity", "com.yeshi.makemoney.video")); |
| | | pushMessage.setActivityParams(activityParams); |
| | | pushMessage.setTitle("测试标题"); |
| | | pushMessage.setContent("测试内容"); |
| | | pushNotification(appInfo, pushMessage, Arrays.asList(new String[]{"1507bfd3f737761f2f2"}), null); |
| | | } |
| | | |
| | | |
| | |
| | | test_on_borrow: true |
| | | min-idle: 8 |
| | | |
| | | rabbitmq: |
| | | host: 134.175.68.214 |
| | | port: 5672 |
| | | username: root |
| | | password: yeshi2014 |
| | | virtual-host: /push |
| | | |
| | | datasource: |
| | | url: jdbc:mysql://gz-cdb-r13d0yi9.sql.tencentcdb.com:62929/bpush?serverTimezone=GMT%2B8 |
| | |
| | | driver-class-name: com.mysql.jdbc.Driver |
| | | type: com.alibaba.druid.pool.DruidDataSource |
| | | druid: |
| | | stat-view-servlet: |
| | | enabled: false |
| | | initial-size: 8 |
| | | min-idle: 1 |
| | | max-active: 20 |
| | |
| | | max-idle: 200 |
| | | test_on_borrow: true |
| | | min-idle: 8 |
| | | |
| | | rabbitmq: |
| | | host: 172.16.16.38 |
| | | port: 5672 |
| | | username: root |
| | | password: yeshi2014 |
| | | virtual-host: /push |
| | | |
| | | datasource: |
| | | #jdbc:mysql://gz-cdb-r13d0yi9.sql.tencentcdb.com:62929/ks_goldcorn?serverTimezone=GMT%2B8 |
| | |
| | | driver-class-name: com.mysql.jdbc.Driver |
| | | type: com.alibaba.druid.pool.DruidDataSource |
| | | druid: |
| | | stat-view-servlet: |
| | | enabled: false |
| | | initial-size: 8 |
| | | min-idle: 1 |
| | | max-active: 20 |
| | |
| | | spring: |
| | | profiles: |
| | | active: pro |
| | | active: dev |
| | |
| | | package com.ks.push; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.ks.push.exception.BPushTaskException; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqSenderUtil; |
| | | import com.ks.push.pojo.DO.BPushFilter; |
| | | import com.ks.push.pojo.DO.BPushMessage; |
| | | import com.ks.push.pojo.DO.BPushTask; |
| | | import com.ks.push.service.BPushTaskService; |
| | | import org.junit.jupiter.api.Test; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.boot.test.context.SpringBootTest; |
| | | import org.springframework.test.context.ContextConfiguration; |
| | | |
| | |
| | | bPushTaskService.createTask(task); |
| | | } |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | |
| | | @Test |
| | | public void testRabbitMq(){ |
| | | RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, "test", "测试内容"); |
| | | |
| | | } |
| | | |
| | | } |