admin
2025-02-25 30d8e227e8d823b6c38c3b9c90ac2df03b63befe
fanli/src/main/java/com/yeshi/fanli/job/MQJob.java
@@ -1,61 +1,63 @@
package com.yeshi.fanli.job;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.yeshi.fanli.entity.mq.MQUnSendInfo;
import com.yeshi.fanli.log.LogHelper;
import com.yeshi.fanli.service.inter.mq.MQUnSendInfoService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
/**
 * 消息任务
 *
 * @author Administrator
 *
 */
@Component
public class MQJob {
   @Resource
   private MQUnSendInfoService mqUnSendInfoService;
   @Resource(name = "producer")
   private Producer producer;
   public MQJob() {
   }
   /**
    * 重新发送消息(2分钟重发一次)
    */
   @Scheduled(cron = "0 0/2 * * * ? ")
   public void retrySendMsg() {
      if (!Constant.IS_TASK)
         return;
      Long maxSendTime = System.currentTimeMillis() - 1000 * 60 * 10L;// 获取10分钟以前发送的消息
      List<MQUnSendInfo> list = mqUnSendInfoService.listByMaxSendTime(new Date(maxSendTime), 1, 50);
      if (list != null)
         for (MQUnSendInfo sendInfo : list) {
            Message msg = new Message(sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody().getBytes());
            if (!StringUtil.isNullOrEmpty(sendInfo.getKey()))
               msg.setKey(sendInfo.getKey());
            if (sendInfo.getDeliverTime() != null)
               msg.setStartDeliverTime(sendInfo.getDeliverTime().getTime());
            SendResult sendResult = producer.send(msg);
            if (sendResult != null) {
               mqUnSendInfoService.deleteByPrimaryKey(sendInfo.getId());
               LogHelper.mqInfo("消息重发成功",sendResult.getMessageId(), sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody());
            }
         }
   }
}
package com.yeshi.fanli.job;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.yeshi.fanli.entity.mq.MQUnSendInfo;
import com.yeshi.fanli.log.LogHelper;
import com.yeshi.fanli.service.inter.mq.MQUnSendInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
/**
 * 消息任务
 *
 * @author Administrator
 *
 */
@Component
public class MQJob {
   @Resource
   private MQUnSendInfoService mqUnSendInfoService;
   @Resource
   private RocketMQManager rocketMQManager;
   public MQJob() {
   }
   /**
    * 重新发送消息(2分钟重发一次)
    */
   @Scheduled(cron = "0 0/2 * * * ? ")
   public void retrySendMsg() {
      if (!Constant.IS_TASK)
         return;
      Long maxSendTime = System.currentTimeMillis() - 1000 * 60 * 10L;// 获取10分钟以前发送的消息
      List<MQUnSendInfo> list = mqUnSendInfoService.listByMaxSendTime(new Date(maxSendTime), 1, 50);
      if (list != null)
         for (MQUnSendInfo sendInfo : list) {
            Message msg = new Message(sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody().getBytes());
            if (!StringUtil.isNullOrEmpty(sendInfo.getKey()))
               msg.setKey(sendInfo.getKey());
            if (sendInfo.getDeliverTime() != null)
               msg.setStartDeliverTime(sendInfo.getDeliverTime().getTime());
            SendResult sendResult =rocketMQManager.sendNormalMsg(msg, null,null);
            if (sendResult != null) {
               mqUnSendInfoService.deleteByPrimaryKey(sendInfo.getId());
               LogHelper.mqInfo("消息重发成功",sendResult.getMessageId(), sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody());
            }
         }
   }
}