admin
2024-04-26 5e7b0ed4a154ad067cbcf4aa1a1c7cce32f9864c
fanli/src/main/java/com/yeshi/fanli/service/manger/msg/RocketMQManager.java
@@ -1,103 +1,104 @@
package com.yeshi.fanli.service.manger.msg;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
/**
 * RocketMQ管理器
 * @author Administrator
 *
 */
@Component
public class RocketMQManager {
   @Resource(name = "producer")
   private Producer producer;
   @Resource
   private TransactionProducer orderTransactionProducer;
   /**
    * 发送普通消息
    * @Title: sendNormalMsg
    * @Description:
    * @param message
    * void 返回类型
    * @throws
    */
   public void sendNormalMsg(Message message, String key) {
      sendNormalMsg(message, null, key);
   }
   /**
    * 发送普通定时消息
    * @Title: sendNormalMsg
    * @Description:
    * @param message
    * @param delayTimeMS 延时ms
    * void 返回类型
    * @throws
    */
   public void sendNormalMsg(Message message, Long delayTimeMS, String key) {
      if (message == null)
         return;
      if (key != null)
         message.setKey(key);
      if (delayTimeMS != null)
         message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
      producer.send(message);
   }
   /**
    * 发送事务消息
    * @Title: sendTransactionalMsg
    * @Description:
    * @param message
    * @param delayTimeMS  延时ms
    * @param key
    * @param mqEvent
    * @throws Exception
    * void 返回类型
    * @throws
    */
   public void sendTransactionalMsg(Message message, Long delayTimeMS, String key,ITransactionalMQEvent mqEvent) throws Exception{
      if (key != null)
         message.setKey(key);
      if (delayTimeMS != null)
         message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);
      orderTransactionProducer.send(message, new LocalTransactionExecuter() {
         @Override
         public TransactionStatus execute(Message arg0, Object arg1) {
            if(mqEvent!=null)
               mqEvent.excute();
            return TransactionStatus.CommitTransaction;
         }
      }, null);
   }
   /**
    * 发送事务消息
    * @Title: sendTransactionalMsg
    * @Description:
    * @param message
    * @param key
    * @param mqEvent
    * @throws Exception
    * void 返回类型
    * @throws
    */
   public void sendTransactionalMsg(Message message, String key,ITransactionalMQEvent mqEvent) throws Exception{
      sendTransactionalMsg(message,null,key,mqEvent);
   }
   interface ITransactionalMQEvent{
      public void excute();
   }
}
package com.yeshi.fanli.service.manger.msg;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
/**
 * RocketMQ管理器
 * @author Administrator
 *
 */
@Component
public class RocketMQManager {
   @Resource(name = "producer")
   private Producer producer;
   @Resource
   private TransactionProducer orderTransactionProducer;
   /**
    * 发送普通消息
    * @Title: sendNormalMsg
    * @Description:
    * @param message
    * void 返回类型
    * @throws
    */
   public SendResult sendNormalMsg(Message message, String key) {
      return sendNormalMsg(message, null, key);
   }
   /**
    * 发送普通定时消息
    * @Title: sendNormalMsg
    * @Description:
    * @param message
    * @param delayTimeMS 延时ms
    * void 返回类型
    * @throws
    */
   public SendResult sendNormalMsg(Message message, Long delayTimeMS, String key) {
      if (message == null)
         return null;
      if (key != null)
         message.setKey(key);
      if (delayTimeMS != null)
         message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
      return producer.send(message);
   }
   /**
    * 发送事务消息
    * @Title: sendTransactionalMsg
    * @Description:
    * @param message
    * @param delayTimeMS  延时ms
    * @param key
    * @param mqEvent
    * @throws Exception
    * void 返回类型
    * @throws
    */
   public void sendTransactionalMsg(Message message, Long delayTimeMS, String key, ITransactionalMQEvent mqEvent)
         throws Exception {
      if (key != null)
         message.setKey(key);
      if (delayTimeMS != null)
         message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);
      orderTransactionProducer.send(message, new LocalTransactionExecuter() {
         @Override
         public TransactionStatus execute(Message arg0, Object arg1) {
            if (mqEvent != null)
               return mqEvent.excute( arg0,  arg1);
            return TransactionStatus.CommitTransaction;
         }
      }, null);
   }
   /**
    * 发送事务消息
    * @Title: sendTransactionalMsg
    * @Description:
    * @param message
    * @param key
    * @param mqEvent
    * @throws Exception
    * void 返回类型
    * @throws
    */
   public void sendTransactionalMsg(Message message, String key, ITransactionalMQEvent mqEvent) throws Exception {
      sendTransactionalMsg(message, null, key, mqEvent);
   }
   public interface ITransactionalMQEvent {
      public TransactionStatus excute(Message arg0, Object arg1);
   }
}