admin
2022-01-12 4a7367a869ef12375ea6678ca44e102b8919c624
service-push/src/main/java/com/ks/push/manager/PushManager.java
@@ -7,6 +7,7 @@
import com.ks.push.pojo.DO.BPushDeviceToken;
import com.ks.push.pojo.DO.BPushPlatformAppInfo;
import com.ks.push.pojo.DO.BPushTask;
import com.ks.push.pojo.DO.BPushTaskExcuteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
@@ -15,9 +16,7 @@
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@@ -37,13 +36,10 @@
    private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Resource
    private PushExcuteResultManager pushExcuteResultManager;
    @Resource
    private CMQManager cmqManager;
    private RedisTemplate redisTemplate;
    /**
@@ -58,6 +54,11 @@
            task.setCreateTime(new Date());
        }
        task.setId(BPushTask.createId());
        if (task.getState() == null) {
            task.setState(BPushTask.STATE_NOT_START);
        }
        bPushTaskDao.save(task);
        logger.info("创建推送成功#taskId:{}", task.getId());
        return task.getId();
@@ -80,52 +81,85 @@
        }
        String key = "task-" + taskId;
        if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) {
        Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS);
        if (s == null || !s) {
            throw new BPushTaskException(BPushTaskException.CODE_BUSY, "服务器繁忙,请稍后再试");
        }
        logger.info("开始启动推送#taskId:{}", task.getId());
        updateState(taskId, BPushTask.STATE_PUSHING, null);
        boolean hasDevice = false;
        try {
            logger.info("开始启动推送#taskId:{}", task.getId());
            updateState(taskId, BPushTask.STATE_PUSHING, null);
            boolean hasDevice = false;
            // 查询可推送的平台
            List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode());
            //先初始化推送结果数据数据
            List<BPushTaskExcuteResult> resultList = new ArrayList<>();
            Map<String, Long> resultCountMap = new HashMap<>();
            for (BPushPlatformAppInfo appInfo : list) {
                long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter());
                if (count > 0) {
                    hasDevice = true;
                    //初始化推送结果数据
                    pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
                    //将数据加入到cmq
                    int pageSize = 500;
                    int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
                    for (int page = 0; page < totalPage; page++) {
                        List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize);
                        List<String> tokenList = new ArrayList<>();
                        for (BPushDeviceToken deviceToken : deviceTokenList) {
                            tokenList.add(deviceToken.getToken());
                        }
                        BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
                        cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet);
                    }
                    logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), appInfo.getPlatform().name(), count);
                    BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
                    resultCountMap.put(result.getId(), count);
                    resultList.add(result);
                }
            }
            for (BPushTaskExcuteResult result : resultList) {
                long count = resultCountMap.get(result.getId());
                int pageSize = 500;
                int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
                long totalValidCount = 0L;
                //初始化批量推送的计数
                for (int page = 0; page < totalPage; page++) {
                    pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + "");
                }
                for (int page = 0; page < totalPage; page++) {
                    List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize);
                    List<String> tokenList = new ArrayList<>();
                    for (BPushDeviceToken deviceToken : deviceTokenList) {
                        //TODO 时间判断
                        tokenList.add(deviceToken.getToken());
                    }
                    totalValidCount += tokenList.size();
                    BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
                    //最后一页
                    if (page == totalPage - 1 && totalValidCount != count) {
                        //修改总数
                        pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
                    }
                    CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet);
                }
                logger.info("加入推送队列#任务Id:{}#平台:{}#推送数量:{}", task.getId(), result.getPushPlatform().name(), count);
            }
            if (!hasDevice) {
                updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备");
            }
        } finally {
            redisTemplate.delete(key);
        }
        //没有可推送的设备
        if (!hasDevice) {
            updateState(taskId, BPushTask.STATE_FINSIH, "没有满足条件的可推送设备");
        }
        logger.info("启动推送结束#taskId:{}", task.getId());
    }
    /**
     * 暂停推送
     *
     * @param taskId
     * @throws BPushTaskException
     */
    public void pausePush(String taskId) throws BPushTaskException {
        //验证状态
        BPushTask task = bPushTaskDao.get(taskId);
@@ -135,13 +169,16 @@
        if (task.getState() != BPushTask.STATE_PUSHING) {
            throw new BPushTaskException(2, "处于推送中状态的推送才可暂停");
        }
        updateState(taskId, BPushTask.STATE_PAUSED, null);
        logger.info("暂停推送#taskId:{}", task.getId());
    }
    /**
     * 重新推送(处于暂停状态的数据才能重新推送)
     *
     * @param taskId
     * @throws BPushTaskException
     */
    public void reStartPush(String taskId) throws BPushTaskException {
        //验证状态
        BPushTask task = bPushTaskDao.get(taskId);
@@ -151,12 +188,16 @@
        if (task.getState() != BPushTask.STATE_PAUSED) {
            throw new BPushTaskException(2, "处于暂停状态的推送才可重新开始推送");
        }
        updateState(taskId, BPushTask.STATE_PUSHING, "重新推送");
        logger.info("重新推送#taskId:{}", task.getId());
    }
    /**
     * 取消推送(处于暂停/推送中状态的推送才可取消)
     *
     * @param taskId
     * @throws BPushTaskException
     */
    public void cancelPush(String taskId) throws BPushTaskException {
        //验证状态
        BPushTask task = bPushTaskDao.get(taskId);