package com.yeshi.fanli.service.manger.msg;
|
|
import javax.annotation.Resource;
|
|
import org.springframework.stereotype.Component;
|
|
import com.aliyun.openservices.ons.api.Message;
|
import com.aliyun.openservices.ons.api.Producer;
|
import com.aliyun.openservices.ons.api.SendResult;
|
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
|
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
|
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
|
|
/**
|
* RocketMQ管理器
|
* @author Administrator
|
*
|
*/
|
@Component
|
public class RocketMQManager {
|
@Resource(name = "producer")
|
private Producer producer;
|
@Resource
|
private TransactionProducer orderTransactionProducer;
|
|
/**
|
* 发送普通消息
|
* @Title: sendNormalMsg
|
* @Description:
|
* @param message
|
* void 返回类型
|
* @throws
|
*/
|
public SendResult sendNormalMsg(Message message, String key) {
|
return sendNormalMsg(message, null, key);
|
}
|
|
/**
|
* 发送普通定时消息
|
* @Title: sendNormalMsg
|
* @Description:
|
* @param message
|
* @param delayTimeMS 延时ms
|
* void 返回类型
|
* @throws
|
*/
|
public SendResult sendNormalMsg(Message message, Long delayTimeMS, String key) {
|
if (message == null)
|
return null;
|
if (key != null)
|
message.setKey(key);
|
if (delayTimeMS != null)
|
message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
|
return producer.send(message);
|
}
|
|
/**
|
* 发送事务消息
|
* @Title: sendTransactionalMsg
|
* @Description:
|
* @param message
|
* @param delayTimeMS 延时ms
|
* @param key
|
* @param mqEvent
|
* @throws Exception
|
* void 返回类型
|
* @throws
|
*/
|
public void sendTransactionalMsg(Message message, Long delayTimeMS, String key, ITransactionalMQEvent mqEvent)
|
throws Exception {
|
if (key != null)
|
message.setKey(key);
|
if (delayTimeMS != null)
|
message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);
|
|
orderTransactionProducer.send(message, new LocalTransactionExecuter() {
|
@Override
|
public TransactionStatus execute(Message arg0, Object arg1) {
|
if (mqEvent != null)
|
return mqEvent.excute( arg0, arg1);
|
return TransactionStatus.CommitTransaction;
|
}
|
}, null);
|
}
|
|
/**
|
* 发送事务消息
|
* @Title: sendTransactionalMsg
|
* @Description:
|
* @param message
|
* @param key
|
* @param mqEvent
|
* @throws Exception
|
* void 返回类型
|
* @throws
|
*/
|
public void sendTransactionalMsg(Message message, String key, ITransactionalMQEvent mqEvent) throws Exception {
|
sendTransactionalMsg(message, null, key, mqEvent);
|
}
|
|
public interface ITransactionalMQEvent {
|
public TransactionStatus excute(Message arg0, Object arg1);
|
}
|
}
|