| | |
| | | package com.yeshi.fanli.service.manger.msg;
|
| | |
|
| | | import javax.annotation.Resource;
|
| | |
|
| | | import org.springframework.stereotype.Component;
|
| | |
|
| | | import com.aliyun.openservices.ons.api.Message;
|
| | | import com.aliyun.openservices.ons.api.Producer;
|
| | | import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
|
| | | import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
|
| | | import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
|
| | |
|
| | | /**
|
| | | * RocketMQ管理器
|
| | | * @author Administrator
|
| | | *
|
| | | */
|
| | | @Component
|
| | | public class RocketMQManager {
|
| | | @Resource(name = "producer")
|
| | | private Producer producer;
|
| | | @Resource
|
| | | private TransactionProducer orderTransactionProducer;
|
| | |
|
| | | /**
|
| | | * 发送普通消息
|
| | | * @Title: sendNormalMsg
|
| | | * @Description: |
| | | * @param message |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendNormalMsg(Message message, String key) {
|
| | | sendNormalMsg(message, null, key);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 发送普通定时消息
|
| | | * @Title: sendNormalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param delayTimeMS 延时ms
|
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendNormalMsg(Message message, Long delayTimeMS, String key) {
|
| | | if (message == null)
|
| | | return;
|
| | | if (key != null)
|
| | | message.setKey(key);
|
| | | if (delayTimeMS != null)
|
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
|
| | | producer.send(message);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 发送事务消息
|
| | | * @Title: sendTransactionalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param delayTimeMS 延时ms
|
| | | * @param key
|
| | | * @param mqEvent
|
| | | * @throws Exception |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendTransactionalMsg(Message message, Long delayTimeMS, String key,ITransactionalMQEvent mqEvent) throws Exception{
|
| | | if (key != null)
|
| | | message.setKey(key);
|
| | | if (delayTimeMS != null)
|
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);
|
| | |
|
| | | orderTransactionProducer.send(message, new LocalTransactionExecuter() {
|
| | | @Override
|
| | | public TransactionStatus execute(Message arg0, Object arg1) {
|
| | | if(mqEvent!=null)
|
| | | mqEvent.excute();
|
| | | return TransactionStatus.CommitTransaction;
|
| | | }
|
| | | }, null);
|
| | | }
|
| | | |
| | | /**
|
| | | * 发送事务消息
|
| | | * @Title: sendTransactionalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param key
|
| | | * @param mqEvent
|
| | | * @throws Exception |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendTransactionalMsg(Message message, String key,ITransactionalMQEvent mqEvent) throws Exception{
|
| | | sendTransactionalMsg(message,null,key,mqEvent);
|
| | | }
|
| | | |
| | | |
| | | interface ITransactionalMQEvent{
|
| | | public void excute();
|
| | | }
|
| | | }
|
| | | package com.yeshi.fanli.service.manger.msg; |
| | | |
| | | import javax.annotation.Resource; |
| | | |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import com.aliyun.openservices.ons.api.Message; |
| | | import com.aliyun.openservices.ons.api.Producer; |
| | | import com.aliyun.openservices.ons.api.SendResult; |
| | | import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter; |
| | | import com.aliyun.openservices.ons.api.transaction.TransactionProducer; |
| | | import com.aliyun.openservices.ons.api.transaction.TransactionStatus; |
| | | |
| | | /** |
| | | * RocketMQ管理器 |
| | | * @author Administrator |
| | | * |
| | | */ |
| | | @Component |
| | | public class RocketMQManager { |
| | | @Resource(name = "producer") |
| | | private Producer producer; |
| | | @Resource |
| | | private TransactionProducer orderTransactionProducer; |
| | | |
| | | /** |
| | | * 发送普通消息 |
| | | * @Title: sendNormalMsg |
| | | * @Description: |
| | | * @param message |
| | | * void 返回类型 |
| | | * @throws |
| | | */ |
| | | public SendResult sendNormalMsg(Message message, String key) { |
| | | return sendNormalMsg(message, null, key); |
| | | } |
| | | |
| | | /** |
| | | * 发送普通定时消息 |
| | | * @Title: sendNormalMsg |
| | | * @Description: |
| | | * @param message |
| | | * @param delayTimeMS 延时ms |
| | | * void 返回类型 |
| | | * @throws |
| | | */ |
| | | public SendResult sendNormalMsg(Message message, Long delayTimeMS, String key) { |
| | | if (message == null) |
| | | return null; |
| | | if (key != null) |
| | | message.setKey(key); |
| | | if (delayTimeMS != null) |
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息 |
| | | return producer.send(message); |
| | | } |
| | | |
| | | /** |
| | | * 发送事务消息 |
| | | * @Title: sendTransactionalMsg |
| | | * @Description: |
| | | * @param message |
| | | * @param delayTimeMS 延时ms |
| | | * @param key |
| | | * @param mqEvent |
| | | * @throws Exception |
| | | * void 返回类型 |
| | | * @throws |
| | | */ |
| | | public void sendTransactionalMsg(Message message, Long delayTimeMS, String key, ITransactionalMQEvent mqEvent) |
| | | throws Exception { |
| | | if (key != null) |
| | | message.setKey(key); |
| | | if (delayTimeMS != null) |
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS); |
| | | |
| | | orderTransactionProducer.send(message, new LocalTransactionExecuter() { |
| | | @Override |
| | | public TransactionStatus execute(Message arg0, Object arg1) { |
| | | if (mqEvent != null) |
| | | return mqEvent.excute( arg0, arg1); |
| | | return TransactionStatus.CommitTransaction; |
| | | } |
| | | }, null); |
| | | } |
| | | |
| | | /** |
| | | * 发送事务消息 |
| | | * @Title: sendTransactionalMsg |
| | | * @Description: |
| | | * @param message |
| | | * @param key |
| | | * @param mqEvent |
| | | * @throws Exception |
| | | * void 返回类型 |
| | | * @throws |
| | | */ |
| | | public void sendTransactionalMsg(Message message, String key, ITransactionalMQEvent mqEvent) throws Exception { |
| | | sendTransactionalMsg(message, null, key, mqEvent); |
| | | } |
| | | |
| | | public interface ITransactionalMQEvent { |
| | | public TransactionStatus excute(Message arg0, Object arg1); |
| | | } |
| | | } |