package com.ks.push.manager.rabbitmq.consumer;
|
|
import com.google.gson.Gson;
|
import com.ks.push.dto.BPushDeviceDataSet;
|
import com.ks.push.manager.BPushPlatformAppInfoManager;
|
import com.ks.push.manager.PushExcuteResultManager;
|
import com.ks.push.manager.rabbitmq.RabbitmqMsgConsumeUtil;
|
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
|
import com.ks.push.pojo.DO.BPushTask;
|
import com.ks.push.pojo.DO.PushPlatform;
|
import com.ks.push.service.BPushTaskService;
|
import com.ks.push.utils.PushUtil;
|
import com.rabbitmq.client.Channel;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.context.annotation.Scope;
|
import org.springframework.stereotype.Component;
|
import org.yeshi.utils.StringUtil;
|
|
import javax.annotation.Resource;
|
import java.nio.charset.StandardCharsets;
|
|
/**
|
* @author hxh
|
* @title: QueueHelloWorldListener
|
* @description:
|
* @date 2024/9/26 13:47
|
*/
|
@Component
|
@Scope("prototype")
|
public class PushMessageListener {
|
|
static Logger logger = LoggerFactory.getLogger(PushMessageListener.class);
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
@Resource
|
private BPushTaskService bPushTaskService;
|
|
@Resource
|
private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
|
|
@Resource
|
private PushExcuteResultManager pushExcuteResultManager;
|
|
|
@RabbitListener(queues = {"bpush-xm","bpush-vivo","bpush-oppo","bpush-mz","bpush-jpush","bpush-huawei"}, ackMode = "MANUAL")
|
public void onMessage(Message message, Channel channel) throws Exception {
|
RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
|
String result = new String(message.getBody(), StandardCharsets.UTF_8);
|
if (!StringUtil.isNullOrEmpty(result)) {
|
String queueName = message.getMessageProperties().getConsumerQueue();
|
logger.info("队列名称:{} 消息内容:{}", queueName, result);
|
BPushDeviceDataSet dataSet = new Gson().fromJson(result, BPushDeviceDataSet.class);
|
try {
|
PushPlatform platform=null;
|
switch(queueName){
|
case "bpush-xm":
|
platform = PushPlatform.xm;
|
break;
|
case "bpush-vivo":
|
platform = PushPlatform.vivo;
|
break;
|
case "bpush-oppo":
|
platform = PushPlatform.oppo;
|
break;
|
case "bpush-mz":
|
platform = PushPlatform.mz;
|
break;
|
case "bpush-jpush":
|
platform = PushPlatform.jpush;
|
break;
|
case "bpush-huawei":
|
platform = PushPlatform.hw;
|
break;
|
}
|
BPushTask task = bPushTaskService.getTask(dataSet.getTaskId());
|
if (task != null) {
|
if (task.getState() == BPushTask.STATE_PUSHING) {
|
BPushPlatformAppInfo platformAppInfo = bPushPlatformAppInfoManager.selectByAppCodeAndPlatform(task.getAppCode(), platform);
|
if (platformAppInfo != null) {
|
try {
|
PushUtil.pushNotifyCation(platform, platformAppInfo.getPushAppInfo(), task.getMessage(), task.getId() + "#" + dataSet.getBatchId(), dataSet.getDeviceTokenList());
|
logger.info("{}推送任务执行成功,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId());
|
} catch (Exception e) {
|
e.printStackTrace();
|
logger.error("推送出错:ttaskId-{},batchId-{},错误原因:{}", task.getId(), dataSet.getBatchId(), e.getMessage());
|
logger.error("推送出错", e);
|
}
|
}
|
} else {
|
if (task.getState() == BPushTask.STATE_PAUSED) {
|
//任务暂停,不删除消息等待下次消费
|
return;
|
} else if (task.getState() == BPushTask.STATE_CANCELED) {
|
//任务取消,删除消息
|
}
|
}
|
} else {
|
logger.error("任务为空,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId());
|
}
|
pushExcuteResultManager.batchPushFinish(dataSet, platform);
|
logger.info("{}推送任务执行结束,taskId-{},batchId-{}", platform.name(), dataSet.getTaskId(), dataSet.getBatchId());
|
} catch (Exception e) {
|
logger.error("推送出错:", e);
|
throw e;
|
}
|
}
|
});
|
}
|
}
|