package org.yeshi.utils.mq; import com.qcloud.cmq.Account; import com.qcloud.cmq.Message; import com.qcloud.cmq.Queue; import com.qcloud.cmq.entity.CmqResponse; import com.tencentcloudapi.common.Credential; import com.tencentcloudapi.common.exception.TencentCloudSDKException; import com.tencentcloudapi.common.profile.ClientProfile; import com.tencentcloudapi.common.profile.HttpProfile; import com.tencentcloudapi.tdmq.v20200217.TdmqClient; import com.tencentcloudapi.tdmq.v20200217.models.*; import java.util.Arrays; import java.util.Date; import java.util.List; //腾讯CMQ消息 public class TDMQUtil { private static TDMQUtil cmqUtil; public static TDMQUtil getInstance() { if (cmqUtil == null) { cmqUtil = new TDMQUtil(); } return cmqUtil; } private String secretId = ""; private String secretKey = ""; private String region = "ap-guangzhou"; // 内网 http://gz.mqadapter.cmq.tencentyun.com // 公网 https://cmq-gz.public.tencenttdmq.com private static String endpoint = "http://gz.mqadapter.cmq.tencentyun.com"; // private static String endpoint = // "http://cmq-queue-gz.api.tencentyun.com"; private Account account; /** * @return void * @author hxh * @description 初始化 * @date 17:00 2022/3/21 * @param: secretId * @param: secretKey * @param: region 区域,如: ap-guangzhou * @param: apiRegion 接口区域,如: gz * @param: isPub 是否为公网 **/ public void init(String secretId, String secretKey, String region, String apiRegion, boolean isPub) { this.secretId = secretId; this.secretKey = secretKey; this.region = region; String ep; if (isPub) { ep = String.format("https://cmq-%s.public.tencenttdmq.com", apiRegion); } else { ep = String.format("http://%s.mqadapter.cmq.tencentyun.com", apiRegion); } endpoint = ep; account = new Account(endpoint, this.secretId, this.secretKey); } public void init(String secretId, String secretKey, boolean isPub) { this.secretId = secretId; this.secretKey = secretKey; String ep; if (isPub) { ep = String.format("https://cmq-%s.public.tencenttdmq.com", "gz"); } else { ep = String.format("http://%s.mqadapter.cmq.tencentyun.com", "gz"); } endpoint = ep; account = new Account(endpoint, this.secretId, this.secretKey); } private TdmqClient getClient() { return getClient(this.region); } private TdmqClient getClient(String region) { Credential cred = new Credential(secretId, secretKey); // 实例化一个 http 选项,可选的,没有特殊需求可以跳过 HttpProfile httpProfile = new HttpProfile(); httpProfile.setEndpoint("tdmq.tencentcloudapi.com"); // 实例化一个client选项,可选的,没有特殊需求可以跳过 ClientProfile clientProfile = new ClientProfile(); clientProfile.setHttpProfile(httpProfile); // 实例化要请求产品的 client 对象,clientProfile 是可选的 TdmqClient client = new TdmqClient(cred, region, clientProfile); return client; } private TdmqClient getMsgClient() { return getClient(this.region); } public boolean existQueue(String queueName) { if (getQueue(queueName) != null) { return true; } return false; } // 创建队列 public boolean createQueue(String queueName) { return this.createQueue(queueName, 10L, 5 * 60L, 65536L); } // 创建队列 public boolean createQueue(String queueName, int maxMsgSize) { return this.createQueue(queueName, 10L, 5 * 60L, maxMsgSize); } /** * 指定参数创建队列 * * @param queueName * @param pollingWaitSeconds -长轮训等待时间 * @param visibilityTimeout -消息消费后再次可见的时间 * @return */ public boolean createQueue(String queueName, long pollingWaitSeconds, long visibilityTimeout, long maxMsgSize) { if (existQueue(queueName)) { return true; } CreateCmqQueueRequest request = new CreateCmqQueueRequest(); request.setQueueName(queueName); request.setPollingWaitSeconds(pollingWaitSeconds); request.setVisibilityTimeout(visibilityTimeout); request.setMaxMsgSize(maxMsgSize); request.setMsgRetentionSeconds(345600L); try { getClient().CreateCmqQueue(request); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } // 删除队列 public boolean deleteQueue(String queueName) { DeleteCmqQueueRequest request = new DeleteCmqQueueRequest(); request.setQueueName(queueName); try { getClient().DeleteCmqQueue(request); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } // 获取队列列表 public List getQueueNameList(String key) { DescribeCmqQueuesRequest request = new DescribeCmqQueuesRequest(); request.setOffset(0L); request.setLimit(1000L); try { DescribeCmqQueuesResponse response = getClient().DescribeCmqQueues(request); CmqQueue[] queues = response.getQueueList(); return Arrays.asList(queues); } catch (TencentCloudSDKException e) { e.printStackTrace(); } return null; } // 获取队列 public CmqQueue getQueue(String queueName) { DescribeCmqQueueDetailRequest req = new DescribeCmqQueueDetailRequest(); req.setQueueName(queueName); // 返回的 resp 是一个 CreateCmqQueueResponse 的实例,与请求对象对应 try { DescribeCmqQueueDetailResponse resp = getClient().DescribeCmqQueueDetail(req); return resp.getQueueDescribe(); } catch (TencentCloudSDKException e) { } return null; } // 发送消息 public String sendMsg(String queueName, String msg, long delaySeconds) { SendCmqMsgRequest request = new SendCmqMsgRequest(); request.setQueueName(queueName); request.setMsgContent(msg); request.setDelaySeconds(delaySeconds); try { SendCmqMsgResponse response = getMsgClient().SendCmqMsg(request); return response.getMsgId(); } catch (TencentCloudSDKException e) { e.printStackTrace(); } return null; } public String sendMsg(String queueName, String msg) { return this.sendMsg(queueName, msg, 0L); } // 发送消息 public List batchSendMsg(String queueName, List msgList, long delaySeconds) throws Exception { Queue queue = account.getQueue(queueName); if (delaySeconds > 0) { return queue.batchSend(msgList, (int) delaySeconds); } else { return queue.batchSend(msgList); } } // 发送消息 public List batchSendMsg(String queueName, List msgList) throws Exception { return batchSendMsg(queueName, msgList, 0); } // 消费消息 public Message recieveMsg(String queueName) throws Exception { com.qcloud.cmq.Queue queue = account.getQueue(queueName); return queue.receiveMessage(); } /** * 消费消息 * * @param count 1-16 * @param queueName 队列名字 * @return */ public List recieveMsg(int count, String queueName) { com.qcloud.cmq.Queue queue = account.getQueue(queueName); if (queue == null) { return null; } List msgList = null; try { msgList = queue.batchReceiveMessage(count); return msgList; } catch (Exception e) { if (e.getMessage() != null && !e.getMessage().contains("no message")) e.printStackTrace(); } return null; } // 删除消息 public boolean deleteMsg(String queueName, String receiptHandle) { try { com.qcloud.cmq.Queue queue = account.getQueue(queueName); queue.deleteMessage(receiptHandle); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 订阅消息相关 */ /** * 主题名称是否已经存在 * * @param topicName * @return */ private boolean topicNameExist(String topicName) { DescribeCmqTopicDetailRequest req = new DescribeCmqTopicDetailRequest(); req.setTopicName(topicName); try { getClient().DescribeCmqTopicDetail(req); return true; } catch (TencentCloudSDKException e) { return false; } } /** * 创建订阅主题 * * @param topicName-主题名称 * @param maxMsgSize-消息最大长度 * @param filterType-过滤类型 * @return */ public boolean createTopic(String topicName, Long maxMsgSize, Long filterType) { if (topicNameExist(topicName)) { return true; } CreateCmqTopicRequest req = new CreateCmqTopicRequest(); if (filterType != null) { req.setFilterType(filterType); } if (maxMsgSize != null) { req.setMaxMsgSize(maxMsgSize); } req.setTopicName(topicName); try { getClient().CreateCmqTopic(req); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } //刪除主題 public boolean deleteTopic(String topicName) { DeleteCmqTopicRequest req = new DeleteCmqTopicRequest(); req.setTopicName(topicName); try { getClient().DeleteCmqTopic(req); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } /** * 创建默认参数的主题 * * @param topicName * @return */ public boolean createTopic(String topicName) { return createTopic(topicName, null, null); } /** * 是否已经订阅 * * @param topicName * @param subscriptionName * @return */ private boolean isAlreadySubscribe(String topicName, String subscriptionName) { DescribeCmqSubscriptionDetailRequest req = new DescribeCmqSubscriptionDetailRequest(); req.setSubscriptionName(subscriptionName); req.setTopicName(topicName); req.setLimit(1L); req.setOffset(0L); try { DescribeCmqSubscriptionDetailResponse response = getClient().DescribeCmqSubscriptionDetail(req); if (response.getTotalCount() > 0) return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } /** * 订阅主题 * * @param topicName-主题名称 * @param subscriptionName-订阅名称 * @param queueName-接受消息的队列名称 * @return */ public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) { return subscribeTopic(topicName, subscriptionName, queueName, null); } public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List filterTags) { try { if (isAlreadySubscribe(topicName, subscriptionName)) { return true; } } catch (Exception e) { } CreateCmqSubscribeRequest req = new CreateCmqSubscribeRequest(); req.setTopicName(topicName); req.setEndpoint(queueName); req.setSubscriptionName(subscriptionName); req.setProtocol("queue"); req.setNotifyContentFormat("SIMPLIFIED"); if (filterTags != null && filterTags.size() > 0) { String[] tags = new String[filterTags.size()]; filterTags.toArray(tags); req.setFilterTag(tags); } try { getClient().CreateCmqSubscribe(req); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); return false; } } /** * 删除订阅 * * @param topicName * @param subscriptionName * @return */ public boolean deleteSubscribeTopic(String topicName, String subscriptionName) { DeleteCmqSubscribeRequest req = new DeleteCmqSubscribeRequest(); req.setSubscriptionName(subscriptionName); req.setTopicName(topicName); try { getClient().DeleteCmqSubscribe(req); return true; } catch (TencentCloudSDKException e) { e.printStackTrace(); } return false; } /** * 发布订阅消息 * * @param topicName * @param message * @return */ public String publishTopicMessage(String topicName, String message) { return publishTopicMessage(topicName, null, message); } /** * 发布订阅消息 * * @param topicName * @param tagList -标签 * @param message * @return */ public String publishTopicMessage(String topicName, List tagList, String message) { PublishCmqMsgRequest req = new PublishCmqMsgRequest(); req.setMsgContent(message); req.setTopicName(topicName); if (tagList != null && tagList.size() > 0) { String[] tags = new String[tagList.size()]; tagList.toArray(tags); req.setMsgTag(tags); } try { PublishCmqMsgResponse response = getMsgClient().PublishCmqMsg(req); return response.getMsgId(); } catch (TencentCloudSDKException e) { e.printStackTrace(); } return null; } public static void main(String[] args) { TDMQUtil.getInstance().init("AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25", "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo", true); String queueName = "makemoney-goldcorn-settle"; // String topicName = "topic_test"; // // //创建 // TDMQUtil.getInstance().createQueue(queueName); // // //发送消息 // TDMQUtil.getInstance().sendMsg(queueName, "测试消息"); // // //创建主题 // TDMQUtil.getInstance().createTopic(topicName); // // //创建订阅 // TDMQUtil.getInstance().subscribeTopic(topicName, topicName + queueName, queueName); // // //发送订阅消息 // TDMQUtil.getInstance().publishTopicMessage(topicName, "主题消息"); //消费消息 // try { // Message message = TDMQUtil.getInstance().recieveMsg(queueName); // System.out.println("接受到的消息:" + message.msgBody); // TDMQUtil.getInstance().deleteMsg(queueName, message.receiptHandle); // } catch (Exception e) { // e.printStackTrace(); // } try { List messages = TDMQUtil.getInstance().recieveMsg(10, queueName); for (Message message : messages) { System.out.println("批量接受到的消息:" + message.msgBody); TDMQUtil.getInstance().deleteMsg(queueName, message.receiptHandle); } } catch (Exception e) { e.printStackTrace(); } // //删除订阅 // TDMQUtil.getInstance().deleteSubscribeTopic(topicName, topicName + queueName); // // //删除主题 // TDMQUtil.getInstance().deleteTopic(topicName); // // //删除队列 // TDMQUtil.getInstance().deleteQueue(queueName); } }