package com.yeshi.fanli.job;
|
|
import java.util.Date;
|
import java.util.List;
|
|
import javax.annotation.Resource;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import com.aliyun.openservices.ons.api.Message;
|
import com.aliyun.openservices.ons.api.Producer;
|
import com.aliyun.openservices.ons.api.SendResult;
|
import com.yeshi.fanli.entity.mq.MQUnSendInfo;
|
import com.yeshi.fanli.log.LogHelper;
|
import com.yeshi.fanli.service.inter.mq.MQUnSendInfoService;
|
import com.yeshi.fanli.util.Constant;
|
import com.yeshi.fanli.util.StringUtil;
|
|
/**
|
* 消息任务
|
*
|
* @author Administrator
|
*
|
*/
|
@Component
|
public class MQJob {
|
|
@Resource
|
private MQUnSendInfoService mqUnSendInfoService;
|
@Resource(name = "producer")
|
private Producer producer;
|
|
public MQJob() {
|
}
|
|
/**
|
* 重新发送消息(2分钟重发一次)
|
*/
|
@Scheduled(cron = "0 0/2 * * * ? ")
|
public void retrySendMsg() {
|
if (!Constant.IS_TASK)
|
return;
|
Long maxSendTime = System.currentTimeMillis() - 1000 * 60 * 10L;// 获取10分钟以前发送的消息
|
List<MQUnSendInfo> list = mqUnSendInfoService.listByMaxSendTime(new Date(maxSendTime), 1, 50);
|
if (list != null)
|
for (MQUnSendInfo sendInfo : list) {
|
Message msg = new Message(sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody().getBytes());
|
if (!StringUtil.isNullOrEmpty(sendInfo.getKey()))
|
msg.setKey(sendInfo.getKey());
|
if (sendInfo.getDeliverTime() != null)
|
msg.setStartDeliverTime(sendInfo.getDeliverTime().getTime());
|
SendResult sendResult = producer.send(msg);
|
if (sendResult != null) {
|
mqUnSendInfoService.deleteByPrimaryKey(sendInfo.getId());
|
LogHelper.mqInfo("消息重发成功",sendResult.getMessageId(), sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody());
|
}
|
}
|
}
|
|
}
|