package com.ks.push.manager; import com.ks.lib.common.exception.ParamsException; import com.ks.push.dao.BPushTaskDao; import com.ks.push.dto.BPushDeviceDataSet; import com.ks.push.exception.BPushTaskException; import com.ks.push.manager.rabbitmq.RabbitmqManager; import com.ks.push.pojo.DO.BPushDeviceToken; import com.ks.push.pojo.DO.BPushPlatformAppInfo; import com.ks.push.pojo.DO.BPushTask; import com.ks.push.pojo.DO.BPushTaskExcuteResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.Resource; import javax.validation.Valid; import java.util.*; import java.util.concurrent.TimeUnit; /** * 推送管理器 */ @Component public class PushManager { Logger logger = LoggerFactory.getLogger(PushManager.class); @Resource private BPushTaskDao bPushTaskDao; @Resource private PushDeviceTokenManager pushDeviceTokenManager; @Resource private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; @Resource private PushExcuteResultManager pushExcuteResultManager; @Resource private RedisTemplate redisTemplate; @Resource private RabbitmqManager rabbitmqManager; /** * 创建推送任务 * * @param task * @throws BPushTaskException */ @Validated public String createTask(@Valid BPushTask task) throws ParamsException, BPushTaskException { if (task.getCreateTime() == null) { task.setCreateTime(new Date()); } task.setId(BPushTask.createId()); if (task.getState() == null) { task.setState(BPushTask.STATE_NOT_START); } bPushTaskDao.save(task); logger.info("创建推送成功#taskId:{}", task.getId()); return task.getId(); } /** * 开始推送 * * @param taskId * @throws BPushTaskException */ public void startPush(String taskId) throws BPushTaskException { //验证状态 BPushTask task = bPushTaskDao.get(taskId); if (task == null) { throw new BPushTaskException(1, "任务ID不存在"); } if (task.getState() != BPushTask.STATE_NOT_START) { throw new BPushTaskException(2, "任务未处于待执行状态"); } String key = "task-" + taskId; Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS); if (s == null || !s) { throw new BPushTaskException(BPushTaskException.CODE_BUSY, "服务器繁忙,请稍后再试"); } try { logger.info("开始启动推送#taskId:{}", task.getId()); updateState(taskId, BPushTask.STATE_PUSHING, null); boolean hasDevice = false; // 查询可推送的平台 List list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode()); //先初始化推送结果数据数据 List resultList = new ArrayList<>(); Map resultCountMap = new HashMap<>(); for (BPushPlatformAppInfo appInfo : list) { long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter()); if (count > 0) { hasDevice = true; //初始化推送结果数据 BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); resultCountMap.put(result.getId(), count); resultList.add(result); } } for (BPushTaskExcuteResult result : resultList) { long count = resultCountMap.get(result.getId()); int pageSize = 500; int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); long totalValidCount = 0L; //初始化批量推送的计数 for (int page = 0; page < totalPage; page++) { pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + ""); } for (int page = 0; page < totalPage; page++) { List deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize); List tokenList = new ArrayList<>(); for (BPushDeviceToken deviceToken : deviceTokenList) { //TODO 时间判断 tokenList.add(deviceToken.getToken()); } totalValidCount += tokenList.size(); BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); //最后一页 if (page == totalPage - 1 && totalValidCount != count) { //修改总数 pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); } rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet); } logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count); } if (!hasDevice) { updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备"); } } finally { redisTemplate.delete(key); } //没有可推送的设备 logger.info("启动推送结束#taskId:{}", task.getId()); } /** * 暂停推送 * * @param taskId * @throws BPushTaskException */ public void pausePush(String taskId) throws BPushTaskException { //验证状态 BPushTask task = bPushTaskDao.get(taskId); if (task == null) { throw new BPushTaskException(1, "任务ID不存在"); } if (task.getState() != BPushTask.STATE_PUSHING) { throw new BPushTaskException(2, "处于推送中状态的推送才可暂停"); } updateState(taskId, BPushTask.STATE_PAUSED, null); logger.info("暂停推送#taskId:{}", task.getId()); } /** * 重新推送(处于暂停状态的数据才能重新推送) * * @param taskId * @throws BPushTaskException */ public void reStartPush(String taskId) throws BPushTaskException { //验证状态 BPushTask task = bPushTaskDao.get(taskId); if (task == null) { throw new BPushTaskException(1, "任务ID不存在"); } if (task.getState() != BPushTask.STATE_PAUSED) { throw new BPushTaskException(2, "处于暂停状态的推送才可重新开始推送"); } updateState(taskId, BPushTask.STATE_PUSHING, "重新推送"); logger.info("重新推送#taskId:{}", task.getId()); } /** * 取消推送(处于暂停/推送中状态的推送才可取消) * * @param taskId * @throws BPushTaskException */ public void cancelPush(String taskId) throws BPushTaskException { //验证状态 BPushTask task = bPushTaskDao.get(taskId); if (task == null) { throw new BPushTaskException(1, "任务ID不存在"); } if (task.getState() != BPushTask.STATE_PAUSED && task.getState() != BPushTask.STATE_PUSHING) { throw new BPushTaskException(2, "处于暂停/推送中状态的推送才可取消"); } updateState(taskId, BPushTask.STATE_CANCELED, null); logger.info("取消推送#taskId:{}", task.getId()); } private void updateState(String taskId, int state, String stateDesc) { BPushTask task = new BPushTask(); task.setId(taskId); task.setState(state); task.setStateDesc(stateDesc); bPushTaskDao.updateSelective(task); } }