admin
2024-10-16 bd885c7015446c6c0495d3299ef64068a4c9b30e
service-push/src/main/java/com/ks/push/manager/PushExcuteResultManager.java
@@ -1,21 +1,38 @@
package com.ks.push.manager;
import com.ks.push.dao.BPushTaskDao;
import com.ks.push.dto.BPushDeviceDataSet;
import com.ks.push.mapper.BPushTaskExcuteResultMapper;
import com.ks.push.pojo.DO.BPushTask;
import com.ks.push.pojo.DO.BPushTaskExcuteResult;
import com.ks.push.pojo.DO.PushPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@Component
public class PushExcuteResultManager {
    private Logger logger = LoggerFactory.getLogger(PushExcuteResultManager.class);
    @Resource
    private BPushTaskExcuteResultMapper bPushTaskExcuteResultMapper;
    @Resource
    private RedisTemplate<String, String> redisTemplate;
    @Resource
    private BPushTaskDao bPushTaskDao;
    @Transactional(rollbackFor = Exception.class)
    public void initPushExcuteResult(String taskId, PushPlatform pushPlatform, long deviceCount) {
    public BPushTaskExcuteResult initPushExcuteResult(String taskId, PushPlatform pushPlatform, long deviceCount) {
        BPushTaskExcuteResult result = new BPushTaskExcuteResult();
        result.setCreateTime(new Date());
        result.setFinish(false);
@@ -26,6 +43,135 @@
        result.setId(BPushTaskExcuteResult.createId(taskId, pushPlatform));
        //初始化
        bPushTaskExcuteResultMapper.insertSelective(result);
        String key = getRedisKey(taskId, pushPlatform);
        redisTemplate.delete(key);
        return result;
    }
    /**
     * 批量推送成功
     *
     * @param dataSet
     * @param pushPlatform
     */
    @Transactional(rollbackFor = Exception.class)
    public void batchPushSuccess(BPushDeviceDataSet dataSet, PushPlatform pushPlatform) {
        String id = BPushTaskExcuteResult.createId(dataSet.getTaskId(), pushPlatform);
        BPushTaskExcuteResult result = bPushTaskExcuteResultMapper.selectByPrimaryKeyForUpdate(id);
        if (result != null) {
            BPushTaskExcuteResult update = new BPushTaskExcuteResult();
            update.setId(result.getId());
            update.setPushSuccessDeviceCount(dataSet.getDeviceTokenList().size() + (result.getPushSuccessDeviceCount() == null ? 0 : result.getPushSuccessDeviceCount()));
            update.setUpdateTime(new Date());
            bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
        }
    }
    /**
     * 批量推送完成
     *
     * @param dataSet
     * @param pushPlatform
     */
    public void batchPushFinish(BPushDeviceDataSet dataSet, PushPlatform pushPlatform) {
        String key = getRedisKey(dataSet.getTaskId(), pushPlatform);
        Long size = redisTemplate.opsForSet().size(key);
        redisTemplate.opsForSet().remove(key, dataSet.getBatchId());
        Long afterSize = redisTemplate.opsForSet().size(key);
        logger.info("{}推送剩余数量 taskId-{} size-{} afterSize-{}", pushPlatform.name(), dataSet.getTaskId(), size + "", afterSize + "");
        //判断是否推送完成
        if (afterSize == null || afterSize == 0L) {
            //已经推送完了
            String id = BPushTaskExcuteResult.createId(dataSet.getTaskId(), pushPlatform);
            BPushTaskExcuteResult update = new BPushTaskExcuteResult();
            update.setId(id);
            update.setFinish(true);
            update.setFinishTime(new Date());
            update.setUpdateTime(new Date());
            bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
            redisTemplate.delete(key);
            //判断整个推送任务是否已经完成了
            if (pushTaskFinish(dataSet.getTaskId())) {
                //完成推送了
                BPushTask updateTask = new BPushTask();
                updateTask.setId(dataSet.getTaskId());
                updateTask.setState(BPushTask.STATE_FINSIH);
                updateTask.setStateDesc("推送完成");
                updateTask.setUpdateTime(new Date());
                bPushTaskDao.updateSelective(updateTask);
                logger.info("{}推送完成 taskId-{}", pushPlatform.name(), dataSet.getTaskId());
            }
        }
    }
    /**
     * 整个推送任务是否已经完成
     *
     * @param taskId
     * @return
     */
    public boolean pushTaskFinish(String taskId) {
        BPushTaskExcuteResultMapper.DaoQuery daoQuery = new BPushTaskExcuteResultMapper.DaoQuery();
        daoQuery.taskId = taskId;
        long count = bPushTaskExcuteResultMapper.count(daoQuery);
        daoQuery.count = (int) count;
        List<BPushTaskExcuteResult> list = bPushTaskExcuteResultMapper.list(daoQuery);
        int finishCount = 0;
        for (BPushTaskExcuteResult result : list) {
            if (result.getFinish() != null && result.getFinish()) {
                finishCount++;
            }
        }
        if (finishCount == list.size()) {
            return true;
        }
        return false;
    }
    /**
     * 开始批量推送
     *
     * @param taskId
     * @param pushPlatform
     * @param batchId
     */
    public void startBatchPush(String taskId, PushPlatform pushPlatform, String batchId) {
        String key = getRedisKey(taskId, pushPlatform);
        redisTemplate.opsForSet().add(key, batchId);
    }
    private String getRedisKey(String taskId, PushPlatform pushPlatform) {
        return String.format("push-task-batch-%s-%s", taskId, pushPlatform.name());
    }
    /**
     * 设置设备数量
     *
     * @param id
     * @param count
     */
    public void setDeviceCount(String id, Long count) {
        BPushTaskExcuteResult update = new BPushTaskExcuteResult();
        update.setId(id);
        update.setTotalDeviceCount(count);
        update.setUpdateTime(new Date());
        bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
    }
    /**
     * 根据任务ID查询推送结果
     *
     * @param taskId
     * @return
     */
    public List<BPushTaskExcuteResult> listByTaskId(String taskId) {
        BPushTaskExcuteResultMapper.DaoQuery daoQuery = new BPushTaskExcuteResultMapper.DaoQuery();
        daoQuery.taskId = taskId;
        daoQuery.count = 100;
        return bPushTaskExcuteResultMapper.list(daoQuery);
    }