package com.ks.push.manager;
|
|
import com.ks.lib.common.exception.ParamsException;
|
import com.ks.push.dao.BPushTaskDao;
|
import com.ks.push.dto.BPushDeviceDataSet;
|
import com.ks.push.exception.BPushTaskException;
|
import com.ks.push.pojo.DO.BPushDeviceToken;
|
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
|
import com.ks.push.pojo.DO.BPushTask;
|
import com.ks.push.pojo.DO.BPushTaskExcuteResult;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.stereotype.Component;
|
import org.springframework.validation.annotation.Validated;
|
|
import javax.annotation.Resource;
|
import javax.validation.Valid;
|
import java.util.*;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* 推送管理器
|
*/
|
@Component
|
public class PushManager {
|
|
Logger logger = LoggerFactory.getLogger(PushManager.class);
|
|
@Resource
|
private BPushTaskDao bPushTaskDao;
|
@Resource
|
private PushDeviceTokenManager pushDeviceTokenManager;
|
|
@Resource
|
private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
|
|
@Resource
|
private PushExcuteResultManager pushExcuteResultManager;
|
|
@Resource
|
private RedisTemplate redisTemplate;
|
|
|
/**
|
* 创建推送任务
|
*
|
* @param task
|
* @throws BPushTaskException
|
*/
|
@Validated
|
public String createTask(@Valid BPushTask task) throws ParamsException, BPushTaskException {
|
if (task.getCreateTime() == null) {
|
task.setCreateTime(new Date());
|
}
|
task.setId(BPushTask.createId());
|
|
if (task.getState() == null) {
|
task.setState(BPushTask.STATE_NOT_START);
|
}
|
|
bPushTaskDao.save(task);
|
logger.info("创建推送成功#taskId:{}", task.getId());
|
return task.getId();
|
}
|
|
/**
|
* 开始推送
|
*
|
* @param taskId
|
* @throws BPushTaskException
|
*/
|
public void startPush(String taskId) throws BPushTaskException {
|
//验证状态
|
BPushTask task = bPushTaskDao.get(taskId);
|
if (task == null) {
|
throw new BPushTaskException(1, "任务ID不存在");
|
}
|
if (task.getState() != BPushTask.STATE_NOT_START) {
|
throw new BPushTaskException(2, "任务未处于待执行状态");
|
}
|
|
String key = "task-" + taskId;
|
|
Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS);
|
|
if (s == null || !s) {
|
throw new BPushTaskException(BPushTaskException.CODE_BUSY, "服务器繁忙,请稍后再试");
|
}
|
|
|
logger.info("开始启动推送#taskId:{}", task.getId());
|
updateState(taskId, BPushTask.STATE_PUSHING, null);
|
boolean hasDevice = false;
|
|
// 查询可推送的平台
|
List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode());
|
|
//先初始化推送结果数据数据
|
List<BPushTaskExcuteResult> resultList = new ArrayList<>();
|
Map<String, Long> resultCountMap = new HashMap<>();
|
for (BPushPlatformAppInfo appInfo : list) {
|
long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter());
|
if (count > 0) {
|
hasDevice = true;
|
//初始化推送结果数据
|
BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
|
resultCountMap.put(result.getId(), count);
|
resultList.add(result);
|
}
|
}
|
|
|
for (BPushTaskExcuteResult result : resultList) {
|
long count = resultCountMap.get(result.getId());
|
int pageSize = 500;
|
int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
|
long totalValidCount = 0L;
|
|
//初始化批量推送的计数
|
for (int page = 0; page < totalPage; page++) {
|
pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + "");
|
}
|
|
for (int page = 0; page < totalPage; page++) {
|
List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize);
|
|
List<String> tokenList = new ArrayList<>();
|
for (BPushDeviceToken deviceToken : deviceTokenList) {
|
//TODO 时间判断
|
tokenList.add(deviceToken.getToken());
|
}
|
totalValidCount += tokenList.size();
|
BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
|
//最后一页
|
if (page == totalPage - 1 && totalValidCount != count) {
|
//修改总数
|
pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
|
}
|
CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet);
|
}
|
logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count);
|
}
|
|
|
if (!hasDevice) {
|
updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备");
|
}
|
//没有可推送的设备
|
logger.info("启动推送结束#taskId:{}", task.getId());
|
}
|
|
|
/**
|
* 暂停推送
|
*
|
* @param taskId
|
* @throws BPushTaskException
|
*/
|
public void pausePush(String taskId) throws BPushTaskException {
|
//验证状态
|
BPushTask task = bPushTaskDao.get(taskId);
|
if (task == null) {
|
throw new BPushTaskException(1, "任务ID不存在");
|
}
|
if (task.getState() != BPushTask.STATE_PUSHING) {
|
throw new BPushTaskException(2, "处于推送中状态的推送才可暂停");
|
}
|
updateState(taskId, BPushTask.STATE_PAUSED, null);
|
logger.info("暂停推送#taskId:{}", task.getId());
|
}
|
|
/**
|
* 重新推送(处于暂停状态的数据才能重新推送)
|
*
|
* @param taskId
|
* @throws BPushTaskException
|
*/
|
public void reStartPush(String taskId) throws BPushTaskException {
|
//验证状态
|
BPushTask task = bPushTaskDao.get(taskId);
|
if (task == null) {
|
throw new BPushTaskException(1, "任务ID不存在");
|
}
|
if (task.getState() != BPushTask.STATE_PAUSED) {
|
throw new BPushTaskException(2, "处于暂停状态的推送才可重新开始推送");
|
}
|
updateState(taskId, BPushTask.STATE_PUSHING, "重新推送");
|
logger.info("重新推送#taskId:{}", task.getId());
|
}
|
|
/**
|
* 取消推送(处于暂停/推送中状态的推送才可取消)
|
*
|
* @param taskId
|
* @throws BPushTaskException
|
*/
|
public void cancelPush(String taskId) throws BPushTaskException {
|
//验证状态
|
BPushTask task = bPushTaskDao.get(taskId);
|
if (task == null) {
|
throw new BPushTaskException(1, "任务ID不存在");
|
}
|
if (task.getState() != BPushTask.STATE_PAUSED && task.getState() != BPushTask.STATE_PUSHING) {
|
throw new BPushTaskException(2, "处于暂停/推送中状态的推送才可取消");
|
}
|
|
updateState(taskId, BPushTask.STATE_CANCELED, null);
|
|
logger.info("取消推送#taskId:{}", task.getId());
|
}
|
|
|
private void updateState(String taskId, int state, String stateDesc) {
|
BPushTask task = new BPushTask();
|
task.setId(taskId);
|
task.setState(state);
|
task.setStateDesc(stateDesc);
|
bPushTaskDao.updateSelective(task);
|
}
|
|
|
}
|