| | |
| | | import com.ks.push.dao.BPushTaskDao; |
| | | import com.ks.push.dto.BPushDeviceDataSet; |
| | | import com.ks.push.exception.BPushTaskException; |
| | | import com.ks.push.manager.rabbitmq.RabbitmqManager; |
| | | import com.ks.push.pojo.DO.BPushDeviceToken; |
| | | import com.ks.push.pojo.DO.BPushPlatformAppInfo; |
| | | import com.ks.push.pojo.DO.BPushTask; |
| | | import com.ks.push.pojo.DO.BPushTaskExcuteResult; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | |
| | | |
| | | import javax.annotation.Resource; |
| | | import javax.validation.Valid; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.*; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | |
| | | private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; |
| | | |
| | | @Resource |
| | | private RedisTemplate<String, Object> redisTemplate; |
| | | |
| | | @Resource |
| | | private PushExcuteResultManager pushExcuteResultManager; |
| | | |
| | | @Resource |
| | | private CMQManager cmqManager; |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | @Resource |
| | | private RabbitmqManager rabbitmqManager; |
| | | |
| | | |
| | | /** |
| | |
| | | task.setCreateTime(new Date()); |
| | | } |
| | | task.setId(BPushTask.createId()); |
| | | |
| | | if (task.getState() == null) { |
| | | task.setState(BPushTask.STATE_NOT_START); |
| | | } |
| | | |
| | | bPushTaskDao.save(task); |
| | | logger.info("创建推送成功#taskId:{}", task.getId()); |
| | | return task.getId(); |
| | |
| | | } |
| | | |
| | | String key = "task-" + taskId; |
| | | if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) { |
| | | |
| | | Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS); |
| | | |
| | | if (s == null || !s) { |
| | | throw new BPushTaskException(BPushTaskException.CODE_BUSY, "服务器繁忙,请稍后再试"); |
| | | } |
| | | logger.info("开始启动推送#taskId:{}", task.getId()); |
| | | updateState(taskId, BPushTask.STATE_PUSHING, null); |
| | | boolean hasDevice = false; |
| | | |
| | | |
| | | try { |
| | | logger.info("开始启动推送#taskId:{}", task.getId()); |
| | | updateState(taskId, BPushTask.STATE_PUSHING, null); |
| | | boolean hasDevice = false; |
| | | |
| | | // 查询可推送的平台 |
| | | List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode()); |
| | | |
| | | //先初始化推送结果数据数据 |
| | | List<BPushTaskExcuteResult> resultList = new ArrayList<>(); |
| | | Map<String, Long> resultCountMap = new HashMap<>(); |
| | | for (BPushPlatformAppInfo appInfo : list) { |
| | | long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter()); |
| | | if (count > 0) { |
| | | hasDevice = true; |
| | | //初始化推送结果数据 |
| | | pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); |
| | | //将数据加入到cmq |
| | | int pageSize = 500; |
| | | int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); |
| | | for (int page = 0; page < totalPage; page++) { |
| | | List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize); |
| | | List<String> tokenList = new ArrayList<>(); |
| | | for (BPushDeviceToken deviceToken : deviceTokenList) { |
| | | tokenList.add(deviceToken.getToken()); |
| | | } |
| | | BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); |
| | | cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet); |
| | | } |
| | | |
| | | logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), appInfo.getPlatform().name(), count); |
| | | BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); |
| | | resultCountMap.put(result.getId(), count); |
| | | resultList.add(result); |
| | | } |
| | | } |
| | | |
| | | |
| | | for (BPushTaskExcuteResult result : resultList) { |
| | | long count = resultCountMap.get(result.getId()); |
| | | int pageSize = 500; |
| | | int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); |
| | | long totalValidCount = 0L; |
| | | |
| | | //初始化批量推送的计数 |
| | | for (int page = 0; page < totalPage; page++) { |
| | | pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + ""); |
| | | } |
| | | |
| | | for (int page = 0; page < totalPage; page++) { |
| | | List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize); |
| | | |
| | | List<String> tokenList = new ArrayList<>(); |
| | | for (BPushDeviceToken deviceToken : deviceTokenList) { |
| | | //TODO 时间判断 |
| | | tokenList.add(deviceToken.getToken()); |
| | | } |
| | | totalValidCount += tokenList.size(); |
| | | BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); |
| | | //最后一页 |
| | | if (page == totalPage - 1 && totalValidCount != count) { |
| | | //修改总数 |
| | | pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); |
| | | } |
| | | rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet); |
| | | } |
| | | logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count); |
| | | } |
| | | |
| | | if (!hasDevice) { |
| | | updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备"); |
| | | } |
| | | } finally { |
| | | redisTemplate.delete(key); |
| | | } |
| | | |
| | | //没有可推送的设备 |
| | | if (!hasDevice) { |
| | | updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备"); |
| | | } |
| | | |
| | | logger.info("启动推送结束#taskId:{}", task.getId()); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 暂停推送 |
| | | * |
| | | * @param taskId |
| | | * @throws BPushTaskException |
| | | */ |
| | | public void pausePush(String taskId) throws BPushTaskException { |
| | | //验证状态 |
| | | BPushTask task = bPushTaskDao.get(taskId); |
| | |
| | | if (task.getState() != BPushTask.STATE_PUSHING) { |
| | | throw new BPushTaskException(2, "处于推送中状态的推送才可暂停"); |
| | | } |
| | | |
| | | updateState(taskId, BPushTask.STATE_PAUSED, null); |
| | | |
| | | logger.info("暂停推送#taskId:{}", task.getId()); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 重新推送(处于暂停状态的数据才能重新推送) |
| | | * |
| | | * @param taskId |
| | | * @throws BPushTaskException |
| | | */ |
| | | public void reStartPush(String taskId) throws BPushTaskException { |
| | | //验证状态 |
| | | BPushTask task = bPushTaskDao.get(taskId); |
| | |
| | | if (task.getState() != BPushTask.STATE_PAUSED) { |
| | | throw new BPushTaskException(2, "处于暂停状态的推送才可重新开始推送"); |
| | | } |
| | | |
| | | updateState(taskId, BPushTask.STATE_PUSHING, "重新推送"); |
| | | |
| | | logger.info("重新推送#taskId:{}", task.getId()); |
| | | } |
| | | |
| | | /** |
| | | * 取消推送(处于暂停/推送中状态的推送才可取消) |
| | | * |
| | | * @param taskId |
| | | * @throws BPushTaskException |
| | | */ |
| | | public void cancelPush(String taskId) throws BPushTaskException { |
| | | //验证状态 |
| | | BPushTask task = bPushTaskDao.get(taskId); |