package org.yeshi.utils.mq;
|
|
import com.qcloud.cmq.Account;
|
import com.qcloud.cmq.Message;
|
import com.tencentcloudapi.common.Credential;
|
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
|
import com.tencentcloudapi.common.profile.ClientProfile;
|
import com.tencentcloudapi.common.profile.HttpProfile;
|
import com.tencentcloudapi.tdmq.v20200217.TdmqClient;
|
import com.tencentcloudapi.tdmq.v20200217.models.*;
|
|
import java.util.Arrays;
|
import java.util.List;
|
|
//腾讯CMQ消息
|
public class TDMQUtil {
|
|
private static TDMQUtil cmqUtil;
|
|
public static TDMQUtil getInstance() {
|
if (cmqUtil == null) {
|
cmqUtil = new TDMQUtil();
|
}
|
return cmqUtil;
|
}
|
|
private String secretId = "";
|
private String secretKey = "";
|
private String region = "ap-guangzhou";
|
// 内网 http://gz.mqadapter.cmq.tencentyun.com
|
// 公网 https://cmq-gz.public.tencenttdmq.com
|
private static String endpoint = "http://gz.mqadapter.cmq.tencentyun.com";
|
// private static String endpoint =
|
// "http://cmq-queue-gz.api.tencentyun.com";
|
|
private Account account;
|
|
/**
|
* @return void
|
* @author hxh
|
* @description 初始化
|
* @date 17:00 2022/3/21
|
* @param: secretId
|
* @param: secretKey
|
* @param: region 区域,如: ap-guangzhou
|
* @param: apiRegion 接口区域,如: gz
|
* @param: isPub 是否为公网
|
**/
|
public void init(String secretId, String secretKey, String region, String apiRegion, boolean isPub) {
|
this.secretId = secretId;
|
this.secretKey = secretKey;
|
this.region = region;
|
String ep;
|
if (isPub) {
|
ep = String.format("https://cmq-%s.public.tencenttdmq.com", apiRegion);
|
} else {
|
ep = String.format("http://%s.mqadapter.cmq.tencentyun.com", apiRegion);
|
}
|
endpoint = ep;
|
account = new Account(endpoint, this.secretId, this.secretKey);
|
}
|
|
|
public void init(String secretId, String secretKey, boolean isPub) {
|
this.secretId = secretId;
|
this.secretKey = secretKey;
|
String ep;
|
if (isPub) {
|
ep = String.format("https://cmq-%s.public.tencenttdmq.com", "gz");
|
} else {
|
ep = String.format("http://%s.mqadapter.cmq.tencentyun.com", "gz");
|
}
|
endpoint = ep;
|
account = new Account(endpoint, this.secretId, this.secretKey);
|
}
|
|
private TdmqClient getClient() {
|
return getClient(this.region);
|
}
|
|
private TdmqClient getClient(String region) {
|
Credential cred = new Credential(secretId, secretKey);
|
// 实例化一个 http 选项,可选的,没有特殊需求可以跳过
|
HttpProfile httpProfile = new HttpProfile();
|
httpProfile.setEndpoint("tdmq.tencentcloudapi.com");
|
// 实例化一个client选项,可选的,没有特殊需求可以跳过
|
ClientProfile clientProfile = new ClientProfile();
|
clientProfile.setHttpProfile(httpProfile);
|
// 实例化要请求产品的 client 对象,clientProfile 是可选的
|
TdmqClient client = new TdmqClient(cred, region, clientProfile);
|
return client;
|
}
|
|
private TdmqClient getMsgClient() {
|
return getClient(this.region);
|
}
|
|
|
public boolean existQueue(String queueName) {
|
if (getQueue(queueName) != null) {
|
return true;
|
}
|
return false;
|
}
|
|
|
// 创建队列
|
public boolean createQueue(String queueName) {
|
return this.createQueue(queueName, 10L, 5 * 60L, 65536L);
|
}
|
|
// 创建队列
|
public boolean createQueue(String queueName, int maxMsgSize) {
|
return this.createQueue(queueName, 10L, 5 * 60L, maxMsgSize);
|
}
|
|
/**
|
* 指定参数创建队列
|
*
|
* @param queueName
|
* @param pollingWaitSeconds -长轮训等待时间
|
* @param visibilityTimeout -消息消费后再次可见的时间
|
* @return
|
*/
|
public boolean createQueue(String queueName, long pollingWaitSeconds, long visibilityTimeout, long maxMsgSize) {
|
|
if (existQueue(queueName)) {
|
return true;
|
}
|
CreateCmqQueueRequest request = new CreateCmqQueueRequest();
|
request.setQueueName(queueName);
|
request.setPollingWaitSeconds(pollingWaitSeconds);
|
request.setVisibilityTimeout(visibilityTimeout);
|
request.setMaxMsgSize(maxMsgSize);
|
request.setMsgRetentionSeconds(345600L);
|
try {
|
getClient().CreateCmqQueue(request);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
|
return false;
|
|
|
}
|
|
// 删除队列
|
public boolean deleteQueue(String queueName) {
|
DeleteCmqQueueRequest request = new DeleteCmqQueueRequest();
|
request.setQueueName(queueName);
|
try {
|
getClient().DeleteCmqQueue(request);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
// 获取队列列表
|
public List<CmqQueue> getQueueNameList(String key) {
|
DescribeCmqQueuesRequest request = new DescribeCmqQueuesRequest();
|
request.setOffset(0L);
|
request.setLimit(1000L);
|
try {
|
DescribeCmqQueuesResponse response = getClient().DescribeCmqQueues(request);
|
CmqQueue[] queues = response.getQueueList();
|
return Arrays.asList(queues);
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
// 获取队列
|
public CmqQueue getQueue(String queueName) {
|
DescribeCmqQueueDetailRequest req = new DescribeCmqQueueDetailRequest();
|
req.setQueueName(queueName);
|
// 返回的 resp 是一个 CreateCmqQueueResponse 的实例,与请求对象对应
|
try {
|
DescribeCmqQueueDetailResponse resp = getClient().DescribeCmqQueueDetail(req);
|
return resp.getQueueDescribe();
|
} catch (TencentCloudSDKException e) {
|
}
|
return null;
|
}
|
|
|
// 发送消息
|
public String sendMsg(String queueName, String msg, long delaySeconds) {
|
SendCmqMsgRequest request = new SendCmqMsgRequest();
|
request.setQueueName(queueName);
|
request.setMsgContent(msg);
|
request.setDelaySeconds(delaySeconds);
|
try {
|
SendCmqMsgResponse response = getMsgClient().SendCmqMsg(request);
|
return response.getMsgId();
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
public String sendMsg(String queueName, String msg) {
|
return this.sendMsg(queueName, msg, 0L);
|
}
|
|
|
// 消费消息
|
public Message recieveMsg(String queueName) throws Exception {
|
com.qcloud.cmq.Queue queue = account.getQueue(queueName);
|
return queue.receiveMessage();
|
}
|
|
/**
|
* 消费消息
|
*
|
* @param count 1-16
|
* @param queueName 队列名字
|
* @return
|
*/
|
public List<Message> recieveMsg(int count, String queueName) {
|
com.qcloud.cmq.Queue queue = account.getQueue(queueName);
|
|
if (queue == null) {
|
return null;
|
}
|
|
List<Message> msgList = null;
|
try {
|
msgList = queue.batchReceiveMessage(count);
|
return msgList;
|
} catch (Exception e) {
|
if (e.getMessage() != null && !e.getMessage().contains("no message"))
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
// 删除消息
|
public boolean deleteMsg(String queueName, String receiptHandle) {
|
try {
|
com.qcloud.cmq.Queue queue = account.getQueue(queueName);
|
queue.deleteMessage(receiptHandle);
|
return true;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
/**
|
* 订阅消息相关
|
*/
|
|
|
/**
|
* 主题名称是否已经存在
|
*
|
* @param topicName
|
* @return
|
*/
|
private boolean topicNameExist(String topicName) {
|
DescribeCmqTopicDetailRequest req = new DescribeCmqTopicDetailRequest();
|
req.setTopicName(topicName);
|
try {
|
getClient().DescribeCmqTopicDetail(req);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
return false;
|
}
|
}
|
|
/**
|
* 创建订阅主题
|
*
|
* @param topicName-主题名称
|
* @param maxMsgSize-消息最大长度
|
* @param filterType-过滤类型
|
* @return
|
*/
|
public boolean createTopic(String topicName, Long maxMsgSize, Long filterType) {
|
if (topicNameExist(topicName)) {
|
return true;
|
}
|
CreateCmqTopicRequest req = new CreateCmqTopicRequest();
|
if (filterType != null) {
|
req.setFilterType(filterType);
|
}
|
|
if (maxMsgSize != null) {
|
req.setMaxMsgSize(maxMsgSize);
|
}
|
req.setTopicName(topicName);
|
try {
|
getClient().CreateCmqTopic(req);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
//刪除主題
|
public boolean deleteTopic(String topicName) {
|
DeleteCmqTopicRequest req = new DeleteCmqTopicRequest();
|
req.setTopicName(topicName);
|
try {
|
getClient().DeleteCmqTopic(req);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
/**
|
* 创建默认参数的主题
|
*
|
* @param topicName
|
* @return
|
*/
|
public boolean createTopic(String topicName) {
|
return createTopic(topicName, null, null);
|
}
|
|
|
/**
|
* 是否已经订阅
|
*
|
* @param topicName
|
* @param subscriptionName
|
* @return
|
*/
|
private boolean isAlreadySubscribe(String topicName, String subscriptionName) {
|
DescribeCmqSubscriptionDetailRequest req = new DescribeCmqSubscriptionDetailRequest();
|
req.setSubscriptionName(subscriptionName);
|
req.setTopicName(topicName);
|
req.setLimit(1L);
|
req.setOffset(0L);
|
try {
|
DescribeCmqSubscriptionDetailResponse response = getClient().DescribeCmqSubscriptionDetail(req);
|
if (response.getTotalCount() > 0)
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
/**
|
* 订阅主题
|
*
|
* @param topicName-主题名称
|
* @param subscriptionName-订阅名称
|
* @param queueName-接受消息的队列名称
|
* @return
|
*/
|
public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
|
return subscribeTopic(topicName, subscriptionName, queueName, null);
|
}
|
|
public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List<String> filterTags) {
|
|
try {
|
if (isAlreadySubscribe(topicName, subscriptionName)) {
|
return true;
|
}
|
} catch (Exception e) {
|
}
|
CreateCmqSubscribeRequest req = new CreateCmqSubscribeRequest();
|
req.setTopicName(topicName);
|
req.setEndpoint(queueName);
|
req.setSubscriptionName(subscriptionName);
|
req.setProtocol("queue");
|
req.setNotifyContentFormat("SIMPLIFIED");
|
if (filterTags != null && filterTags.size() > 0) {
|
String[] tags = new String[filterTags.size()];
|
filterTags.toArray(tags);
|
req.setFilterTag(tags);
|
}
|
try {
|
getClient().CreateCmqSubscribe(req);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
return false;
|
}
|
}
|
|
/**
|
* 删除订阅
|
*
|
* @param topicName
|
* @param subscriptionName
|
* @return
|
*/
|
public boolean deleteSubscribeTopic(String topicName, String subscriptionName) {
|
DeleteCmqSubscribeRequest req = new DeleteCmqSubscribeRequest();
|
req.setSubscriptionName(subscriptionName);
|
req.setTopicName(topicName);
|
try {
|
getClient().DeleteCmqSubscribe(req);
|
return true;
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
|
/**
|
* 发布订阅消息
|
*
|
* @param topicName
|
* @param message
|
* @return
|
*/
|
public String publishTopicMessage(String topicName, String message) {
|
|
return publishTopicMessage(topicName, null, message);
|
}
|
|
/**
|
* 发布订阅消息
|
*
|
* @param topicName
|
* @param tagList -标签
|
* @param message
|
* @return
|
*/
|
public String publishTopicMessage(String topicName, List<String> tagList, String message) {
|
PublishCmqMsgRequest req = new PublishCmqMsgRequest();
|
req.setMsgContent(message);
|
req.setTopicName(topicName);
|
if (tagList != null && tagList.size() > 0) {
|
String[] tags = new String[tagList.size()];
|
tagList.toArray(tags);
|
req.setMsgTag(tags);
|
}
|
try {
|
PublishCmqMsgResponse response = getMsgClient().PublishCmqMsg(req);
|
return response.getMsgId();
|
} catch (TencentCloudSDKException e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
|
public static void main(String[] args) {
|
TDMQUtil.getInstance().init("AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25", "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo", true);
|
|
String queueName = "test1";
|
String topicName = "topic_test";
|
|
//创建
|
TDMQUtil.getInstance().createQueue(queueName);
|
|
//发送消息
|
TDMQUtil.getInstance().sendMsg(queueName, "测试消息");
|
|
//创建主题
|
TDMQUtil.getInstance().createTopic(topicName);
|
|
//创建订阅
|
TDMQUtil.getInstance().subscribeTopic(topicName, topicName + queueName, queueName);
|
|
//发送订阅消息
|
TDMQUtil.getInstance().publishTopicMessage(topicName, "主题消息");
|
|
//消费消息
|
try {
|
Message message = TDMQUtil.getInstance().recieveMsg(queueName);
|
System.out.println("接受到的消息:" + message.msgBody);
|
TDMQUtil.getInstance().deleteMsg(queueName, message.receiptHandle);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
try {
|
List<Message> messages = TDMQUtil.getInstance().recieveMsg(10, queueName);
|
for (Message message : messages) {
|
System.out.println("批量接受到的消息:" + message.msgBody);
|
TDMQUtil.getInstance().deleteMsg(queueName, message.receiptHandle);
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
|
//删除订阅
|
TDMQUtil.getInstance().deleteSubscribeTopic(topicName, topicName + queueName);
|
|
//删除主题
|
TDMQUtil.getInstance().deleteTopic(topicName);
|
|
//删除队列
|
TDMQUtil.getInstance().deleteQueue(queueName);
|
}
|
|
|
}
|