package org.yeshi.utils; import java.util.ArrayList; import java.util.List; import com.qcloud.cmq.*; import org.springframework.data.annotation.Transient; import org.springframework.data.redis.core.RedisTemplate; //腾讯CMQ消息 public class CMQUtil { private static CMQUtil cmqUtil; public static CMQUtil getInstance(String secretId, String secretKey) { if (cmqUtil == null) { cmqUtil = new CMQUtil(); cmqUtil.init(secretId, secretKey); } return cmqUtil; } private String secretId = ""; private String secretKey = ""; // 内网 http://cmq-queue-gz.api.tencentyun.com // 外网 http://cmq-queue-gz.api.qcloud.com private static String endpoint = "http://cmq-queue-gz.api.qcloud.com"; private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com"; // private static String endpoint = // "http://cmq-queue-gz.api.tencentyun.com"; private Account account; private Account topicAccount; static { // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX) // endpoint = "http://cmq-queue-gz.api.tencentyun.com"; // else endpoint = "http://cmq-queue-gz.api.qcloud.com"; } public void init(String secretId, String secretKey) { this.secretId = secretId; this.secretKey = secretKey; account = new Account(endpoint, this.secretId, this.secretKey); topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey); } public boolean existQueue(String queueName) { ArrayList vtQueue = new ArrayList(); try { int totalCount = account.listQueue(queueName, 0, 1, vtQueue); if (totalCount <= 0) return false; } catch (Exception e) { e.printStackTrace(); } if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName)) return true; return false; } private boolean queueNameExist(String queueName) { List resultList = new ArrayList<>(); try { account.listQueue(queueName, 0, 100, resultList); } catch (Exception e) { e.printStackTrace(); } //已经创建了 if (resultList.contains(queueName)) { return true; } return false; } // 创建队列 public boolean createQueue(String queueName) { if (queueNameExist(queueName)) { return true; } QueueMeta meta = new QueueMeta(); meta.pollingWaitSeconds = 10; meta.visibilityTimeout = 5 * 60;// 消息可见性超时 meta.maxMsgSize = 65536; meta.msgRetentionSeconds = 345600; try { account.createQueue(queueName, meta); return true; } catch (Exception e) { return false; } } // 创建队列 public boolean createQueue(String queueName, int maxMsgSize) { if (queueNameExist(queueName)) { return true; } QueueMeta meta = new QueueMeta(); meta.pollingWaitSeconds = 10; meta.visibilityTimeout = 5 * 60;// 消息可见性超时 meta.maxMsgSize = maxMsgSize; meta.msgRetentionSeconds = 345600; try { account.createQueue(queueName, meta); return true; } catch (Exception e) { return false; } } /** * 指定参数创建队列 * * @param queueName * @param pollingWaitSeconds -长轮训等待时间 * @param visibilityTimeout -消息消费后再次可见的时间 * @return */ public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) { if (queueNameExist(queueName)) { return true; } QueueMeta meta = new QueueMeta(); meta.pollingWaitSeconds = pollingWaitSeconds; meta.visibilityTimeout = visibilityTimeout;// 消息可见性超时 meta.maxMsgSize = 65536; meta.msgRetentionSeconds = 345600; try { account.createQueue(queueName, meta); return true; } catch (Exception e) { return false; } } // 删除队列 public boolean deleteQueue(String queueName) { try { account.deleteQueue(queueName); return true; } catch (Exception e) { return false; } } // 获取队列列表 public List getQueueNameList(String key) { account = new Account(endpoint, this.secretId, this.secretKey); ArrayList vtQueue = new ArrayList(); try { int totalCount = account.listQueue(key, 0, 100, vtQueue); } catch (Exception e) { e.printStackTrace(); } return vtQueue; } // 获取队列 public Queue getQueue(String queueName) { account = new Account(endpoint, this.secretId, this.secretKey); Queue queue = account.getQueue(queueName); return queue; } // 获取队列属性 public QueueMeta getQueueAtrribute(String queueName) { Queue queue = account.getQueue(queueName); QueueMeta meta2 = null; try { meta2 = queue.getQueueAttributes(); } catch (Exception e) { e.printStackTrace(); } return meta2; } // 发送消息 public String sendMsg(String queueName, String msg) { try { Queue queue = getQueue(queueName); String msgId = queue.sendMessage(msg); return msgId; } catch (Exception e) { e.printStackTrace(); } return null; } // 消费消息 public Message recieveMsg(String queueName) { try { Queue queue = getQueue(queueName); Message msg = queue.receiveMessage(10); return msg; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 消费消息 * * @param count 1-16 * @param queueName 队列名字 * @return */ public List recieveMsg(int count, String queueName) { Queue queue = getQueue(queueName); if (queue == null) { return null; } List msgList = null; try { msgList = queue.batchReceiveMessage(count, 20); return msgList; } catch (Exception e) { if (e.getMessage() != null && !e.getMessage().contains("no message")) e.printStackTrace(); } return null; } public List recieveMsg(int count, String queueName, int waitSeconds) { Queue queue = getQueue(queueName); List msgList = null; try { msgList = queue.batchReceiveMessage(count, waitSeconds); return msgList; } catch (Exception e) { } return null; } // 删除消息 public boolean deleteMsg(String queueName, String receiptHandle) { try { Queue queue = getQueue(queueName); queue.deleteMessage(receiptHandle); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 订阅消息相关 */ /** * 主题名称是否已经存在 * * @param topicName * @return */ private boolean topicNameExist(String topicName) { List resultList = new ArrayList<>(); try { account.listTopic(topicName, resultList, 0, 100); } catch (Exception e) { e.printStackTrace(); } //已经创建了 if (resultList.contains(topicName)) { return true; } return false; } /** * 创建订阅主题 * * @param topicName-主题名称 * @param maxMsgSize-消息最大长度 * @param filterType-过滤类型 * @return */ public boolean createTopic(String topicName, int maxMsgSize, int filterType) { if (topicNameExist(topicName)) { return true; } try { topicAccount.createTopic(topicName, maxMsgSize, filterType); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 创建默认参数的主题 * * @param topicName * @return */ public boolean createTopic(String topicName) { if (topicNameExist(topicName)) { return true; } try { topicAccount.createTopic(topicName, 65536); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 是否已经订阅 * * @param topicName * @param subscriptionName * @return */ private boolean isAlreadySubscribe(String topicName, String subscriptionName) throws Exception { Topic topic = topicAccount.getTopic(topicName); List resultList = new ArrayList<>(); topic.ListSubscription(0, 100, subscriptionName, resultList); if (resultList.contains(subscriptionName)) { return true; } return false; } /** * 订阅主题 * * @param topicName-主题名称 * @param subscriptionName-订阅名称 * @param queueName-接受消息的队列名称 * @return */ public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) { try { if (isAlreadySubscribe(topicName, subscriptionName)) { return true; } } catch (Exception e) { } try { topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue"); } catch (Exception e) { e.printStackTrace(); } return false; } public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List filterTags) { try { if (isAlreadySubscribe(topicName, subscriptionName)) { return true; } } catch (Exception e) { } try { topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue", filterTags, null, "BACKOFF_RETRY", "JSON"); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 删除订阅 * * @param topicName * @param subscriptionName * @return */ public boolean deleteSubscribeTopic(String topicName, String subscriptionName) { try { topicAccount.deleteSubscribe(topicName, subscriptionName); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 发布订阅消息 * * @param topicName * @param message * @return */ public String publishTopicMessage(String topicName, String message) { try { Topic topic = topicAccount.getTopic(topicName); if (topic == null) return null; return topic.publishMessage(message); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 发布订阅消息 * * @param topicName * @param tagList -标签 * @param message * @return */ public boolean publishTopicMessage(String topicName, List tagList, String message) { try { Topic topic = topicAccount.getTopic(topicName); if (topic == null) return false; topic.publishMessage(message, tagList, null); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 批量发布消息 * * @param topicName * @param msgList * @return */ public boolean batchPublishTopicMessage(String topicName, List msgList) { try { Topic topic = topicAccount.getTopic(topicName); if (topic == null) return false; topic.batchPublishMessage(msgList); return true; } catch (Exception e) { e.printStackTrace(); } return false; } }