package com.ks.push.manager;
|
|
import com.ks.push.dao.BPushTaskDao;
|
import com.ks.push.dto.BPushDeviceDataSet;
|
import com.ks.push.mapper.BPushTaskExcuteResultMapper;
|
import com.ks.push.pojo.DO.BPushTask;
|
import com.ks.push.pojo.DO.BPushTaskExcuteResult;
|
import com.ks.push.pojo.DO.PushPlatform;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.stereotype.Component;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import javax.annotation.Resource;
|
import java.util.Date;
|
import java.util.List;
|
|
@Component
|
public class PushExcuteResultManager {
|
|
private Logger logger = LoggerFactory.getLogger(PushExcuteResultManager.class);
|
|
@Resource
|
private BPushTaskExcuteResultMapper bPushTaskExcuteResultMapper;
|
|
|
@Resource
|
private RedisTemplate<String, String> redisTemplate;
|
|
@Resource
|
private BPushTaskDao bPushTaskDao;
|
|
@Transactional(rollbackFor = Exception.class)
|
public BPushTaskExcuteResult initPushExcuteResult(String taskId, PushPlatform pushPlatform, long deviceCount) {
|
BPushTaskExcuteResult result = new BPushTaskExcuteResult();
|
result.setCreateTime(new Date());
|
result.setFinish(false);
|
result.setPushPlatform(pushPlatform);
|
result.setPushSuccessDeviceCount(0L);
|
result.setTaskId(taskId);
|
result.setTotalDeviceCount(deviceCount);
|
result.setId(BPushTaskExcuteResult.createId(taskId, pushPlatform));
|
//初始化
|
bPushTaskExcuteResultMapper.insertSelective(result);
|
String key = getRedisKey(taskId, pushPlatform);
|
redisTemplate.delete(key);
|
return result;
|
}
|
|
/**
|
* 批量推送成功
|
*
|
* @param dataSet
|
* @param pushPlatform
|
*/
|
@Transactional(rollbackFor = Exception.class)
|
public void batchPushSuccess(BPushDeviceDataSet dataSet, PushPlatform pushPlatform) {
|
String id = BPushTaskExcuteResult.createId(dataSet.getTaskId(), pushPlatform);
|
BPushTaskExcuteResult result = bPushTaskExcuteResultMapper.selectByPrimaryKeyForUpdate(id);
|
if (result != null) {
|
BPushTaskExcuteResult update = new BPushTaskExcuteResult();
|
update.setId(result.getId());
|
update.setPushSuccessDeviceCount(dataSet.getDeviceTokenList().size() + (result.getPushSuccessDeviceCount() == null ? 0 : result.getPushSuccessDeviceCount()));
|
update.setUpdateTime(new Date());
|
bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
|
}
|
}
|
|
/**
|
* 批量推送完成
|
*
|
* @param dataSet
|
* @param pushPlatform
|
*/
|
public void batchPushFinish(BPushDeviceDataSet dataSet, PushPlatform pushPlatform) {
|
String key = getRedisKey(dataSet.getTaskId(), pushPlatform);
|
Long size = redisTemplate.opsForSet().size(key);
|
redisTemplate.opsForSet().remove(key, dataSet.getBatchId());
|
Long afterSize = redisTemplate.opsForSet().size(key);
|
logger.info("{}推送剩余数量 taskId-{} size-{} afterSize-{}", pushPlatform.name(), dataSet.getTaskId(), size + "", afterSize + "");
|
//判断是否推送完成
|
if (afterSize == null || afterSize == 0L) {
|
//已经推送完了
|
String id = BPushTaskExcuteResult.createId(dataSet.getTaskId(), pushPlatform);
|
BPushTaskExcuteResult update = new BPushTaskExcuteResult();
|
update.setId(id);
|
update.setFinish(true);
|
update.setFinishTime(new Date());
|
update.setUpdateTime(new Date());
|
bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
|
redisTemplate.delete(key);
|
//判断整个推送任务是否已经完成了
|
if (pushTaskFinish(dataSet.getTaskId())) {
|
//完成推送了
|
BPushTask updateTask = new BPushTask();
|
updateTask.setId(dataSet.getTaskId());
|
updateTask.setState(BPushTask.STATE_FINSIH);
|
updateTask.setStateDesc("推送完成");
|
updateTask.setUpdateTime(new Date());
|
bPushTaskDao.updateSelective(updateTask);
|
logger.info("{}推送完成 taskId-{}", pushPlatform.name(), dataSet.getTaskId());
|
}
|
}
|
}
|
|
|
/**
|
* 整个推送任务是否已经完成
|
*
|
* @param taskId
|
* @return
|
*/
|
public boolean pushTaskFinish(String taskId) {
|
BPushTaskExcuteResultMapper.DaoQuery daoQuery = new BPushTaskExcuteResultMapper.DaoQuery();
|
daoQuery.taskId = taskId;
|
long count = bPushTaskExcuteResultMapper.count(daoQuery);
|
daoQuery.count = (int) count;
|
List<BPushTaskExcuteResult> list = bPushTaskExcuteResultMapper.list(daoQuery);
|
int finishCount = 0;
|
for (BPushTaskExcuteResult result : list) {
|
if (result.getFinish() != null && result.getFinish()) {
|
finishCount++;
|
}
|
}
|
if (finishCount == list.size()) {
|
return true;
|
}
|
return false;
|
}
|
|
|
/**
|
* 开始批量推送
|
*
|
* @param taskId
|
* @param pushPlatform
|
* @param batchId
|
*/
|
public void startBatchPush(String taskId, PushPlatform pushPlatform, String batchId) {
|
String key = getRedisKey(taskId, pushPlatform);
|
redisTemplate.opsForSet().add(key, batchId);
|
}
|
|
private String getRedisKey(String taskId, PushPlatform pushPlatform) {
|
return String.format("push-task-batch-%s-%s", taskId, pushPlatform.name());
|
}
|
|
|
/**
|
* 设置设备数量
|
*
|
* @param id
|
* @param count
|
*/
|
public void setDeviceCount(String id, Long count) {
|
BPushTaskExcuteResult update = new BPushTaskExcuteResult();
|
update.setId(id);
|
update.setTotalDeviceCount(count);
|
update.setUpdateTime(new Date());
|
bPushTaskExcuteResultMapper.updateByPrimaryKeySelective(update);
|
}
|
|
/**
|
* 根据任务ID查询推送结果
|
*
|
* @param taskId
|
* @return
|
*/
|
public List<BPushTaskExcuteResult> listByTaskId(String taskId) {
|
BPushTaskExcuteResultMapper.DaoQuery daoQuery = new BPushTaskExcuteResultMapper.DaoQuery();
|
daoQuery.taskId = taskId;
|
daoQuery.count = 100;
|
return bPushTaskExcuteResultMapper.list(daoQuery);
|
}
|
|
|
}
|