package com.ks.push.consumer.mq;
|
|
import com.ks.push.dto.BPushDeviceDataSet;
|
import com.ks.push.manager.BPushPlatformAppInfoManager;
|
import com.ks.push.manager.CMQManager;
|
import com.ks.push.manager.PushExcuteResultManager;
|
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
|
import com.ks.push.pojo.DO.BPushTask;
|
import com.ks.push.pojo.DO.PushPlatform;
|
import com.ks.push.service.BPushTaskService;
|
import com.ks.push.utils.PushUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.List;
|
|
@Component
|
public class PushTaskConsumer {
|
|
Logger logger = LoggerFactory.getLogger(PushTaskConsumer.class);
|
|
|
@Resource
|
private BPushTaskService bPushTaskService;
|
|
@Resource
|
private PushExcuteResultManager pushExcuteResultManager;
|
|
@Resource
|
private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
|
|
|
/**
|
* 消费任务消息
|
*
|
* @param platform
|
*/
|
public void consumeMsg(PushPlatform platform) {
|
|
if (platform == null) {
|
return;
|
}
|
logger.info("消费消息开始:{} Thread:{}", platform.name(),Thread.currentThread().getId());
|
List<CMQManager.MQMsgConsumeResult> resultList = null;
|
try {
|
resultList = CMQManager.getInstance().consumePushQueue(platform, 1);
|
} catch (Exception e) {
|
}
|
logger.info("消费消息结束:{} Thread:{}", platform.name(),Thread.currentThread().getId());
|
if (resultList != null) {
|
for (CMQManager.MQMsgConsumeResult result : resultList) {
|
try {
|
BPushDeviceDataSet dataSet = (BPushDeviceDataSet) result.getData();
|
BPushTask task = bPushTaskService.getTask(dataSet.getTaskId());
|
if (task != null) {
|
if (task.getState() == BPushTask.STATE_PUSHING) {
|
BPushPlatformAppInfo platformAppInfo = bPushPlatformAppInfoManager.selectByAppCodeAndPlatform(task.getAppCode(), platform);
|
if (platformAppInfo != null) {
|
try {
|
PushUtil.pushNotifyCation(platform, platformAppInfo.getPushAppInfo(), task.getMessage(), dataSet.getDeviceTokenList());
|
logger.info("推送任务执行成功,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId());
|
} catch (Exception e) {
|
logger.error("推送出错:ttaskId-{},batchId-{},错误原因:{}", task.getId(), dataSet.getBatchId(), e.getMessage());
|
}
|
}
|
} else {
|
if (task.getState() == BPushTask.STATE_PAUSED) {
|
//任务暂停,不删除消息等待下次消费
|
continue;
|
} else if (task.getState() == BPushTask.STATE_CANCELED) {
|
//任务取消,删除消息
|
}
|
}
|
} else {
|
logger.error("任务为空,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId());
|
}
|
//删除消息
|
logger.info("推送任务执行完成,taskId-{},batchId-{}", dataSet.getTaskId(), dataSet.getBatchId());
|
CMQManager.getInstance().deleteMsg(result.getQueueName(), result.getReceiptHandle());
|
pushExcuteResultManager.batchPushFinish(dataSet, platform);
|
} catch (Exception e) {
|
// e.printStackTrace();
|
}
|
}
|
|
}
|
|
}
|
|
}
|