New file |
| | |
| | | package com.yeshi.fanli.service.manger.msg;
|
| | |
|
| | | import javax.annotation.Resource;
|
| | |
|
| | | import org.springframework.stereotype.Component;
|
| | |
|
| | | import com.aliyun.openservices.ons.api.Message;
|
| | | import com.aliyun.openservices.ons.api.Producer;
|
| | | import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
|
| | | import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
|
| | | import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
|
| | |
|
| | | /**
|
| | | * RocketMQ管理器
|
| | | * @author Administrator
|
| | | *
|
| | | */
|
| | | @Component
|
| | | public class RocketMQManager {
|
| | | @Resource(name = "producer")
|
| | | private Producer producer;
|
| | | @Resource
|
| | | private TransactionProducer orderTransactionProducer;
|
| | |
|
| | | /**
|
| | | * 发送普通消息
|
| | | * @Title: sendNormalMsg
|
| | | * @Description: |
| | | * @param message |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendNormalMsg(Message message, String key) {
|
| | | sendNormalMsg(message, null, key);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 发送普通定时消息
|
| | | * @Title: sendNormalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param delayTimeMS 延时ms
|
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendNormalMsg(Message message, Long delayTimeMS, String key) {
|
| | | if (message == null)
|
| | | return;
|
| | | if (key != null)
|
| | | message.setKey(key);
|
| | | if (delayTimeMS != null)
|
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
|
| | | producer.send(message);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 发送事务消息
|
| | | * @Title: sendTransactionalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param delayTimeMS 延时ms
|
| | | * @param key
|
| | | * @param mqEvent
|
| | | * @throws Exception |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendTransactionalMsg(Message message, Long delayTimeMS, String key,ITransactionalMQEvent mqEvent) throws Exception{
|
| | | if (key != null)
|
| | | message.setKey(key);
|
| | | if (delayTimeMS != null)
|
| | | message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);
|
| | |
|
| | | orderTransactionProducer.send(message, new LocalTransactionExecuter() {
|
| | | @Override
|
| | | public TransactionStatus execute(Message arg0, Object arg1) {
|
| | | if(mqEvent!=null)
|
| | | mqEvent.excute();
|
| | | return TransactionStatus.CommitTransaction;
|
| | | }
|
| | | }, null);
|
| | | }
|
| | | |
| | | /**
|
| | | * 发送事务消息
|
| | | * @Title: sendTransactionalMsg
|
| | | * @Description: |
| | | * @param message
|
| | | * @param key
|
| | | * @param mqEvent
|
| | | * @throws Exception |
| | | * void 返回类型
|
| | | * @throws
|
| | | */
|
| | | public void sendTransactionalMsg(Message message, String key,ITransactionalMQEvent mqEvent) throws Exception{
|
| | | sendTransactionalMsg(message,null,key,mqEvent);
|
| | | }
|
| | | |
| | | |
| | | interface ITransactionalMQEvent{
|
| | | public void excute();
|
| | | }
|
| | | }
|