From 24bed5e6b6e31090cb61600a0bdea898eac65da1 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期六, 26 十二月 2020 17:20:23 +0800 Subject: [PATCH] cmq调整 --- utils/src/main/java/org/yeshi/utils/CMQUtil.java | 724 ++++++++++++++++++++++++++++++++----------------------- 1 files changed, 416 insertions(+), 308 deletions(-) diff --git a/utils/src/main/java/org/yeshi/utils/CMQUtil.java b/utils/src/main/java/org/yeshi/utils/CMQUtil.java index 1bca5db..0a4b529 100644 --- a/utils/src/main/java/org/yeshi/utils/CMQUtil.java +++ b/utils/src/main/java/org/yeshi/utils/CMQUtil.java @@ -3,350 +3,458 @@ import java.util.ArrayList; import java.util.List; -import com.qcloud.cmq.Account; -import com.qcloud.cmq.Message; -import com.qcloud.cmq.Queue; -import com.qcloud.cmq.QueueMeta; -import com.qcloud.cmq.Topic; +import com.qcloud.cmq.*; import org.springframework.data.annotation.Transient; //鑵捐CMQ娑堟伅 public class CMQUtil { - private static CMQUtil cmqUtil; + private static CMQUtil cmqUtil; - public static CMQUtil getInstance(String secretId, String secretKey) { - if (cmqUtil == null) { - cmqUtil = new CMQUtil(); - cmqUtil.init(secretId, secretKey); - } - return cmqUtil; - } + public static CMQUtil getInstance(String secretId, String secretKey) { + if (cmqUtil == null) { + cmqUtil = new CMQUtil(); + cmqUtil.init(secretId, secretKey); + } + return cmqUtil; + } - private String secretId = ""; - private String secretKey = ""; - // 鍐呯綉 http://cmq-queue-gz.api.tencentyun.com - // 澶栫綉 http://cmq-queue-gz.api.qcloud.com - private static String endpoint = "http://cmq-queue-gz.api.qcloud.com"; - private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com"; - // private static String endpoint = - // "http://cmq-queue-gz.api.tencentyun.com"; + private String secretId = ""; + private String secretKey = ""; + // 鍐呯綉 http://cmq-queue-gz.api.tencentyun.com + // 澶栫綉 http://cmq-queue-gz.api.qcloud.com + private static String endpoint = "http://cmq-queue-gz.api.qcloud.com"; + private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com"; + // private static String endpoint = + // "http://cmq-queue-gz.api.tencentyun.com"; - private Account account; - private Account topicAccount; + private Account account; + private Account topicAccount; - static { - // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX) - // endpoint = "http://cmq-queue-gz.api.tencentyun.com"; - // else - endpoint = "http://cmq-queue-gz.api.qcloud.com"; - } + static { + // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX) + // endpoint = "http://cmq-queue-gz.api.tencentyun.com"; + // else + endpoint = "http://cmq-queue-gz.api.qcloud.com"; + } - public void init(String secretId, String secretKey) { - this.secretId = secretId; - this.secretKey = secretKey; - account = new Account(endpoint, this.secretId, this.secretKey); - topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey); - } + public void init(String secretId, String secretKey) { + this.secretId = secretId; + this.secretKey = secretKey; + account = new Account(endpoint, this.secretId, this.secretKey); + topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey); + } - public boolean existQueue(String queueName) { + public boolean existQueue(String queueName) { - ArrayList<String> vtQueue = new ArrayList<String>(); - try { - int totalCount = account.listQueue(queueName, 0, 1, vtQueue); - if (totalCount <= 0) - return false; - } catch (Exception e) { - e.printStackTrace(); - } - if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName)) - return true; - return false; - } + ArrayList<String> vtQueue = new ArrayList<String>(); + try { + int totalCount = account.listQueue(queueName, 0, 1, vtQueue); + if (totalCount <= 0) + return false; + } catch (Exception e) { + e.printStackTrace(); + } + if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName)) + return true; + return false; + } - // 鍒涘缓闃熷垪 - public boolean createQueue(String queueName) { + private boolean queueNameExist(String queueName) { + List<String> resultList = new ArrayList<>(); + try { + account.listQueue(queueName, 0, 100, resultList); + } catch (Exception e) { + e.printStackTrace(); + } - QueueMeta meta = new QueueMeta(); - meta.pollingWaitSeconds = 10; - meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃� - meta.maxMsgSize = 65536; - meta.msgRetentionSeconds = 345600; - try { - account.createQueue(queueName, meta); - return true; - } catch (Exception e) { - return false; - } - } + //宸茬粡鍒涘缓浜� + if (resultList.contains(queueName)) { + return true; + } + return false; + } - // 鍒涘缓闃熷垪 - public boolean createQueue(String queueName, int maxMsgSize) { + // 鍒涘缓闃熷垪 + public boolean createQueue(String queueName) { - QueueMeta meta = new QueueMeta(); - meta.pollingWaitSeconds = 10; - meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃� - meta.maxMsgSize = maxMsgSize; - meta.msgRetentionSeconds = 345600; - try { - account.createQueue(queueName, meta); - return true; - } catch (Exception e) { - return false; - } - } + if (queueNameExist(queueName)) { + return true; + } - /** - * 鎸囧畾鍙傛暟鍒涘缓闃熷垪 - * - * @param queueName - * @param pollingWaitSeconds - * -闀胯疆璁瓑寰呮椂闂� - * @param visibilityTimeout - * -娑堟伅娑堣垂鍚庡啀娆″彲瑙佺殑鏃堕棿 - * @return - */ - public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) { + QueueMeta meta = new QueueMeta(); + meta.pollingWaitSeconds = 10; + meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃� + meta.maxMsgSize = 65536; + meta.msgRetentionSeconds = 345600; + try { + account.createQueue(queueName, meta); + return true; + } catch (Exception e) { + return false; + } + } - QueueMeta meta = new QueueMeta(); - meta.pollingWaitSeconds = pollingWaitSeconds; - meta.visibilityTimeout = visibilityTimeout;// 娑堟伅鍙鎬ц秴鏃� - meta.maxMsgSize = 65536; - meta.msgRetentionSeconds = 345600; - try { - account.createQueue(queueName, meta); - return true; - } catch (Exception e) { - return false; - } - } + // 鍒涘缓闃熷垪 + public boolean createQueue(String queueName, int maxMsgSize) { + if (queueNameExist(queueName)) { + return true; + } + QueueMeta meta = new QueueMeta(); + meta.pollingWaitSeconds = 10; + meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃� + meta.maxMsgSize = maxMsgSize; + meta.msgRetentionSeconds = 345600; + try { + account.createQueue(queueName, meta); + return true; + } catch (Exception e) { + return false; + } + } - // 鍒犻櫎闃熷垪 - public boolean deleteQueue(String queueName) { - try { - account.deleteQueue(queueName); - return true; - } catch (Exception e) { - return false; - } - } + /** + * 鎸囧畾鍙傛暟鍒涘缓闃熷垪 + * + * @param queueName + * @param pollingWaitSeconds -闀胯疆璁瓑寰呮椂闂� + * @param visibilityTimeout -娑堟伅娑堣垂鍚庡啀娆″彲瑙佺殑鏃堕棿 + * @return + */ + public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) { - // 鑾峰彇闃熷垪鍒楄〃 - public List<String> getQueueNameList(String key) { - account = new Account(endpoint, this.secretId, this.secretKey); - ArrayList<String> vtQueue = new ArrayList<String>(); - try { - int totalCount = account.listQueue(key, 0, 100, vtQueue); - } catch (Exception e) { - e.printStackTrace(); - } - return vtQueue; - } + if (queueNameExist(queueName)) { + return true; + } - // 鑾峰彇闃熷垪 - public Queue getQueue(String queueName) { - account = new Account(endpoint, this.secretId, this.secretKey); - Queue queue = account.getQueue(queueName); - return queue; - } + QueueMeta meta = new QueueMeta(); + meta.pollingWaitSeconds = pollingWaitSeconds; + meta.visibilityTimeout = visibilityTimeout;// 娑堟伅鍙鎬ц秴鏃� + meta.maxMsgSize = 65536; + meta.msgRetentionSeconds = 345600; + try { + account.createQueue(queueName, meta); + return true; + } catch (Exception e) { + return false; + } + } - // 鑾峰彇闃熷垪灞炴�� - public QueueMeta getQueueAtrribute(String queueName) { - Queue queue = account.getQueue(queueName); - QueueMeta meta2 = null; - try { - meta2 = queue.getQueueAttributes(); - } catch (Exception e) { - e.printStackTrace(); - } - return meta2; - } + // 鍒犻櫎闃熷垪 + public boolean deleteQueue(String queueName) { + try { + account.deleteQueue(queueName); + return true; + } catch (Exception e) { + return false; + } + } - // 鍙戦�佹秷鎭� - public String sendMsg(String queueName, String msg) { - try { - Queue queue = getQueue(queueName); - String msgId = queue.sendMessage(msg); - return msgId; - } catch (Exception e) { - e.printStackTrace(); - } - return null; - } + // 鑾峰彇闃熷垪鍒楄〃 + public List<String> getQueueNameList(String key) { + account = new Account(endpoint, this.secretId, this.secretKey); + ArrayList<String> vtQueue = new ArrayList<String>(); + try { + int totalCount = account.listQueue(key, 0, 100, vtQueue); + } catch (Exception e) { + e.printStackTrace(); + } + return vtQueue; + } - // 娑堣垂娑堟伅 - public Message recieveMsg(String queueName) { - try { - Queue queue = getQueue(queueName); - Message msg = queue.receiveMessage(10); - return msg; - } catch (Exception e) { - e.printStackTrace(); - } - return null; - } + // 鑾峰彇闃熷垪 + public Queue getQueue(String queueName) { + account = new Account(endpoint, this.secretId, this.secretKey); + Queue queue = account.getQueue(queueName); + return queue; + } - /** - * 娑堣垂娑堟伅 - * - * @param count - * 1-16 - * @param queueName - * 闃熷垪鍚嶅瓧 - * @return - */ - public List<Message> recieveMsg(int count, String queueName) { - Queue queue = getQueue(queueName); + // 鑾峰彇闃熷垪灞炴�� + public QueueMeta getQueueAtrribute(String queueName) { + Queue queue = account.getQueue(queueName); + QueueMeta meta2 = null; + try { + meta2 = queue.getQueueAttributes(); + } catch (Exception e) { + e.printStackTrace(); + } + return meta2; + } - if (queue == null) { - return null; - } + // 鍙戦�佹秷鎭� + public String sendMsg(String queueName, String msg) { + try { + Queue queue = getQueue(queueName); + String msgId = queue.sendMessage(msg); + return msgId; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } - List<Message> msgList = null; - try { - msgList = queue.batchReceiveMessage(count, 20); - return msgList; - } catch (Exception e) { - if (e.getMessage() != null && !e.getMessage().contains("no message")) - e.printStackTrace(); - } - return null; - } + // 娑堣垂娑堟伅 + public Message recieveMsg(String queueName) { + try { + Queue queue = getQueue(queueName); + Message msg = queue.receiveMessage(10); + return msg; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } - public List<Message> recieveMsg(int count, String queueName, int waitSeconds) { - Queue queue = getQueue(queueName); - List<Message> msgList = null; - try { - msgList = queue.batchReceiveMessage(count, waitSeconds); - return msgList; - } catch (Exception e) { - } - return null; - } + /** + * 娑堣垂娑堟伅 + * + * @param count 1-16 + * @param queueName 闃熷垪鍚嶅瓧 + * @return + */ + public List<Message> recieveMsg(int count, String queueName) { + Queue queue = getQueue(queueName); - // 鍒犻櫎娑堟伅 - public boolean deleteMsg(String queueName, String receiptHandle) { - try { - Queue queue = getQueue(queueName); - queue.deleteMessage(receiptHandle); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + if (queue == null) { + return null; + } - /** - * 璁㈤槄娑堟伅鐩稿叧 - */ + List<Message> msgList = null; + try { + msgList = queue.batchReceiveMessage(count, 20); + return msgList; + } catch (Exception e) { + if (e.getMessage() != null && !e.getMessage().contains("no message")) + e.printStackTrace(); + } + return null; + } - /** - * 鍒涘缓璁㈤槄涓婚 - * - * @param topicName-涓婚鍚嶇О - * @param maxMsgSize-娑堟伅鏈�澶ч暱搴� - * @param filterType-杩囨护绫诲瀷 - * @return - */ - public boolean createTopic(String topicName, int maxMsgSize, int filterType) { - try { - topicAccount.createTopic(topicName, maxMsgSize, filterType); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + public List<Message> recieveMsg(int count, String queueName, int waitSeconds) { + Queue queue = getQueue(queueName); + List<Message> msgList = null; + try { + msgList = queue.batchReceiveMessage(count, waitSeconds); + return msgList; + } catch (Exception e) { + } + return null; + } - /** - * 鍒涘缓榛樿鍙傛暟鐨勪富棰� - * - * @param topicName - * @return - */ - public boolean createTopic(String topicName) { - try { - topicAccount.createTopic(topicName, 65536); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + // 鍒犻櫎娑堟伅 + public boolean deleteMsg(String queueName, String receiptHandle) { + try { + Queue queue = getQueue(queueName); + queue.deleteMessage(receiptHandle); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } - /** - * 璁㈤槄涓婚 - * - * @param topicName-涓婚鍚嶇О - * @param subscriptionName-璁㈤槄鍚嶇О - * @param queueName-鎺ュ彈娑堟伅鐨勯槦鍒楀悕绉� - * @return - */ - public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) { - try { - topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue"); - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + /** + * 璁㈤槄娑堟伅鐩稿叧 + */ - /** - * 鍒犻櫎璁㈤槄 - * - * @param topicName - * @param subscriptionName - * @return - */ - public boolean deleteSubscribeTopic(String topicName, String subscriptionName) { - try { - topicAccount.deleteSubscribe(topicName, subscriptionName); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + /** + * 涓婚鍚嶇О鏄惁宸茬粡瀛樺湪 + * + * @param topicName + * @return + */ + private boolean topicNameExist(String topicName) { + List<String> resultList = new ArrayList<>(); + try { + account.listTopic(topicName, resultList, 0, 100); + } catch (Exception e) { + e.printStackTrace(); + } - /** - * 鍙戝竷璁㈤槄娑堟伅 - * - * @param topicName - * @param message - * @return - */ - public boolean publishTopicMessage(String topicName, String message) { - try { - Topic topic = topicAccount.getTopic(topicName); - if (topic == null) - return false; - topic.publishMessage(message); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + //宸茬粡鍒涘缓浜� + if (resultList.contains(topicName)) { + return true; + } + return false; + } - /** - * 鎵归噺鍙戝竷娑堟伅 - * - * @param topicName - * @param msgList - * @return - */ - public boolean batchPublishTopicMessage(String topicName, List<String> msgList) { - try { - Topic topic = topicAccount.getTopic(topicName); - if (topic == null) - return false; - topic.batchPublishMessage(msgList); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - return false; - } + /** + * 鍒涘缓璁㈤槄涓婚 + * + * @param topicName-涓婚鍚嶇О + * @param maxMsgSize-娑堟伅鏈�澶ч暱搴� + * @param filterType-杩囨护绫诲瀷 + * @return + */ + public boolean createTopic(String topicName, int maxMsgSize, int filterType) { + if (topicNameExist(topicName)) { + return true; + } + try { + topicAccount.createTopic(topicName, maxMsgSize, filterType); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + /** + * 鍒涘缓榛樿鍙傛暟鐨勪富棰� + * + * @param topicName + * @return + */ + public boolean createTopic(String topicName) { + if (topicNameExist(topicName)) { + return true; + } + try { + topicAccount.createTopic(topicName, 65536); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + + /** + * 鏄惁宸茬粡璁㈤槄 + * + * @param topicName + * @param subscriptionName + * @return + */ + private boolean isAlreadySubscribe(String topicName, String subscriptionName) throws Exception { + Topic topic = topicAccount.getTopic(topicName); + List<String> resultList = new ArrayList<>(); + topic.ListSubscription(0, 100, subscriptionName, resultList); + if (resultList.contains(subscriptionName)) { + return true; + } + return false; + } + + /** + * 璁㈤槄涓婚 + * + * @param topicName-涓婚鍚嶇О + * @param subscriptionName-璁㈤槄鍚嶇О + * @param queueName-鎺ュ彈娑堟伅鐨勯槦鍒楀悕绉� + * @return + */ + public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) { + + try { + if (isAlreadySubscribe(topicName, subscriptionName)) { + return true; + } + } catch (Exception e) { + } + try { + topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue"); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List<String> filterTags) { + + try { + if (isAlreadySubscribe(topicName, subscriptionName)) { + return true; + } + } catch (Exception e) { + } + try { + topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue", filterTags, null, "BACKOFF_RETRY", "JSON"); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + /** + * 鍒犻櫎璁㈤槄 + * + * @param topicName + * @param subscriptionName + * @return + */ + public boolean deleteSubscribeTopic(String topicName, String subscriptionName) { + try { + topicAccount.deleteSubscribe(topicName, subscriptionName); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + /** + * 鍙戝竷璁㈤槄娑堟伅 + * + * @param topicName + * @param message + * @return + */ + public boolean publishTopicMessage(String topicName, String message) { + try { + Topic topic = topicAccount.getTopic(topicName); + if (topic == null) + return false; + topic.publishMessage(message); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + /** + * 鍙戝竷璁㈤槄娑堟伅 + * + * @param topicName + * @param tagList -鏍囩 + * @param message + * @return + */ + public boolean publishTopicMessage(String topicName, List<String> tagList, String message) { + try { + Topic topic = topicAccount.getTopic(topicName); + if (topic == null) + return false; + topic.publishMessage(message, tagList, null); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + + /** + * 鎵归噺鍙戝竷娑堟伅 + * + * @param topicName + * @param msgList + * @return + */ + public boolean batchPublishTopicMessage(String topicName, List<String> msgList) { + try { + Topic topic = topicAccount.getTopic(topicName); + if (topic == null) + return false; + topic.batchPublishMessage(msgList); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } } -- Gitblit v1.8.0