package com.ks.lucky.service.impl;
|
|
import com.ks.lucky.exception.LuckyMQException;
|
import com.ks.lucky.mapper.LuckyMQMapper;
|
import com.ks.lucky.pojo.DO.LuckyMQ;
|
import com.ks.lucky.query.MQQuery;
|
import com.ks.lucky.service.LuckyMQService;
|
import com.ks.lucky.util.mq.CMQManager;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import javax.annotation.Resource;
|
import java.util.Date;
|
import java.util.List;
|
|
@Service
|
public class LuckyMQServiceImpl implements LuckyMQService {
|
@Resource
|
private LuckyMQMapper luckyMQMapper;
|
|
@Override
|
public void addMQ(LuckyMQ mq) throws LuckyMQException {
|
LuckyMQ old = luckyMQMapper.selectByTaskId(mq.getTaskId());
|
if (old != null) {
|
throw new LuckyMQException(LuckyMQException.CODE_EXIST, "任务ID已经存在");
|
}
|
|
if (mq.getCreateTime() == null) {
|
mq.setCreateTime(new Date());
|
}
|
|
luckyMQMapper.insertSelective(mq);
|
}
|
|
@Override
|
public List<LuckyMQ> listUnSendMsg(Date now, int page, int pageCount) {
|
MQQuery query = new MQQuery();
|
query.maxPreSendTime = new Date(now.getTime() + 1L);
|
query.minPreSendTime = new Date(now.getTime() - 1000 * 60 * 60L * 24);
|
query.state = LuckyMQ.STATE_NOT_SEND;
|
query.start = (page - 1) * pageCount;
|
query.count = pageCount;
|
return luckyMQMapper.list(query);
|
}
|
|
@Override
|
public long countUnSendMsg(Date now) {
|
MQQuery query = new MQQuery();
|
query.maxPreSendTime = new Date(now.getTime() + 1L);
|
query.minPreSendTime = new Date(now.getTime() - 1000 * 60 * 60L * 24);
|
query.state = LuckyMQ.STATE_NOT_SEND;
|
return luckyMQMapper.count(query);
|
}
|
|
@Override
|
public void removeUnSendMsg(String taskId) throws LuckyMQException {
|
LuckyMQ old = luckyMQMapper.selectByTaskIdForUpdate(taskId);
|
if (old == null) {
|
return;
|
}
|
if (old.getState() != LuckyMQ.STATE_NOT_SEND) {
|
return;
|
}
|
luckyMQMapper.deleteByPrimaryKey(old.getId());
|
}
|
|
@Transactional(rollbackFor = Exception.class)
|
@Override
|
public void sendMsg(String taskId) throws LuckyMQException {
|
LuckyMQ mq = luckyMQMapper.selectByTaskIdForUpdate(taskId);
|
if (mq == null) {
|
throw new LuckyMQException(LuckyMQException.CODE_EXIST, "任务ID已经存在");
|
}
|
|
CMQManager.getInstance().addMsg(mq.getQueueName(), mq.getQueueContent());
|
LuckyMQ update = new LuckyMQ();
|
update.setId(mq.getId());
|
update.setState(LuckyMQ.STATE_SENDED);
|
update.setActualSendTime(new Date());
|
update.setUpdateTime(new Date());
|
luckyMQMapper.updateByPrimaryKeySelective(update);
|
}
|
|
|
|
|
|
}
|