From 24bed5e6b6e31090cb61600a0bdea898eac65da1 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期六, 26 十二月 2020 17:20:23 +0800
Subject: [PATCH] cmq调整

---
 utils/src/main/java/org/yeshi/utils/CMQUtil.java |  724 ++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 416 insertions(+), 308 deletions(-)

diff --git a/utils/src/main/java/org/yeshi/utils/CMQUtil.java b/utils/src/main/java/org/yeshi/utils/CMQUtil.java
index 1bca5db..0a4b529 100644
--- a/utils/src/main/java/org/yeshi/utils/CMQUtil.java
+++ b/utils/src/main/java/org/yeshi/utils/CMQUtil.java
@@ -3,350 +3,458 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import com.qcloud.cmq.Account;
-import com.qcloud.cmq.Message;
-import com.qcloud.cmq.Queue;
-import com.qcloud.cmq.QueueMeta;
-import com.qcloud.cmq.Topic;
+import com.qcloud.cmq.*;
 import org.springframework.data.annotation.Transient;
 
 //鑵捐CMQ娑堟伅
 public class CMQUtil {
 
-	private static CMQUtil cmqUtil;
+    private static CMQUtil cmqUtil;
 
-	public static CMQUtil getInstance(String secretId, String secretKey) {
-		if (cmqUtil == null) {
-			cmqUtil = new CMQUtil();
-			cmqUtil.init(secretId, secretKey);
-		}
-		return cmqUtil;
-	}
+    public static CMQUtil getInstance(String secretId, String secretKey) {
+        if (cmqUtil == null) {
+            cmqUtil = new CMQUtil();
+            cmqUtil.init(secretId, secretKey);
+        }
+        return cmqUtil;
+    }
 
-	private String secretId = "";
-	private String secretKey = "";
-	// 鍐呯綉 http://cmq-queue-gz.api.tencentyun.com
-	// 澶栫綉 http://cmq-queue-gz.api.qcloud.com
-	private static String endpoint = "http://cmq-queue-gz.api.qcloud.com";
-	private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com";
-	// private static String endpoint =
-	// "http://cmq-queue-gz.api.tencentyun.com";
+    private String secretId = "";
+    private String secretKey = "";
+    // 鍐呯綉 http://cmq-queue-gz.api.tencentyun.com
+    // 澶栫綉 http://cmq-queue-gz.api.qcloud.com
+    private static String endpoint = "http://cmq-queue-gz.api.qcloud.com";
+    private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com";
+    // private static String endpoint =
+    // "http://cmq-queue-gz.api.tencentyun.com";
 
-	private Account account;
-	private Account topicAccount;
+    private Account account;
+    private Account topicAccount;
 
-	static {
-		// if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX)
-		// endpoint = "http://cmq-queue-gz.api.tencentyun.com";
-		// else
-		endpoint = "http://cmq-queue-gz.api.qcloud.com";
-	}
+    static {
+        // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX)
+        // endpoint = "http://cmq-queue-gz.api.tencentyun.com";
+        // else
+        endpoint = "http://cmq-queue-gz.api.qcloud.com";
+    }
 
-	public void init(String secretId, String secretKey) {
-		this.secretId = secretId;
-		this.secretKey = secretKey;
-		account = new Account(endpoint, this.secretId, this.secretKey);
-		topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey);
-	}
+    public void init(String secretId, String secretKey) {
+        this.secretId = secretId;
+        this.secretKey = secretKey;
+        account = new Account(endpoint, this.secretId, this.secretKey);
+        topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey);
+    }
 
-	public boolean existQueue(String queueName) {
+    public boolean existQueue(String queueName) {
 
-		ArrayList<String> vtQueue = new ArrayList<String>();
-		try {
-			int totalCount = account.listQueue(queueName, 0, 1, vtQueue);
-			if (totalCount <= 0)
-				return false;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName))
-			return true;
-		return false;
-	}
+        ArrayList<String> vtQueue = new ArrayList<String>();
+        try {
+            int totalCount = account.listQueue(queueName, 0, 1, vtQueue);
+            if (totalCount <= 0)
+                return false;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName))
+            return true;
+        return false;
+    }
 
-	// 鍒涘缓闃熷垪
-	public boolean createQueue(String queueName) {
+    private boolean queueNameExist(String queueName) {
+        List<String> resultList = new ArrayList<>();
+        try {
+            account.listQueue(queueName, 0, 100, resultList);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-		QueueMeta meta = new QueueMeta();
-		meta.pollingWaitSeconds = 10;
-		meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃�
-		meta.maxMsgSize = 65536;
-		meta.msgRetentionSeconds = 345600;
-		try {
-			account.createQueue(queueName, meta);
-			return true;
-		} catch (Exception e) {
-			return false;
-		}
-	}
+        //宸茬粡鍒涘缓浜�
+        if (resultList.contains(queueName)) {
+            return true;
+        }
+        return false;
+    }
 
-	// 鍒涘缓闃熷垪
-	public boolean createQueue(String queueName, int maxMsgSize) {
+    // 鍒涘缓闃熷垪
+    public boolean createQueue(String queueName) {
 
-		QueueMeta meta = new QueueMeta();
-		meta.pollingWaitSeconds = 10;
-		meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃�
-		meta.maxMsgSize = maxMsgSize;
-		meta.msgRetentionSeconds = 345600;
-		try {
-			account.createQueue(queueName, meta);
-			return true;
-		} catch (Exception e) {
-			return false;
-		}
-	}
+        if (queueNameExist(queueName)) {
+            return true;
+        }
 
-	/**
-	 * 鎸囧畾鍙傛暟鍒涘缓闃熷垪
-	 * 
-	 * @param queueName
-	 * @param pollingWaitSeconds
-	 *            -闀胯疆璁瓑寰呮椂闂�
-	 * @param visibilityTimeout
-	 *            -娑堟伅娑堣垂鍚庡啀娆″彲瑙佺殑鏃堕棿
-	 * @return
-	 */
-	public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
+        QueueMeta meta = new QueueMeta();
+        meta.pollingWaitSeconds = 10;
+        meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃�
+        meta.maxMsgSize = 65536;
+        meta.msgRetentionSeconds = 345600;
+        try {
+            account.createQueue(queueName, meta);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
 
-		QueueMeta meta = new QueueMeta();
-		meta.pollingWaitSeconds = pollingWaitSeconds;
-		meta.visibilityTimeout = visibilityTimeout;// 娑堟伅鍙鎬ц秴鏃�
-		meta.maxMsgSize = 65536;
-		meta.msgRetentionSeconds = 345600;
-		try {
-			account.createQueue(queueName, meta);
-			return true;
-		} catch (Exception e) {
-			return false;
-		}
-	}
+    // 鍒涘缓闃熷垪
+    public boolean createQueue(String queueName, int maxMsgSize) {
+        if (queueNameExist(queueName)) {
+            return true;
+        }
+        QueueMeta meta = new QueueMeta();
+        meta.pollingWaitSeconds = 10;
+        meta.visibilityTimeout = 5 * 60;// 娑堟伅鍙鎬ц秴鏃�
+        meta.maxMsgSize = maxMsgSize;
+        meta.msgRetentionSeconds = 345600;
+        try {
+            account.createQueue(queueName, meta);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
 
-	// 鍒犻櫎闃熷垪
-	public boolean deleteQueue(String queueName) {
-		try {
-			account.deleteQueue(queueName);
-			return true;
-		} catch (Exception e) {
-			return false;
-		}
-	}
+    /**
+     * 鎸囧畾鍙傛暟鍒涘缓闃熷垪
+     *
+     * @param queueName
+     * @param pollingWaitSeconds -闀胯疆璁瓑寰呮椂闂�
+     * @param visibilityTimeout  -娑堟伅娑堣垂鍚庡啀娆″彲瑙佺殑鏃堕棿
+     * @return
+     */
+    public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
 
-	// 鑾峰彇闃熷垪鍒楄〃
-	public List<String> getQueueNameList(String key) {
-		account = new Account(endpoint, this.secretId, this.secretKey);
-		ArrayList<String> vtQueue = new ArrayList<String>();
-		try {
-			int totalCount = account.listQueue(key, 0, 100, vtQueue);
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return vtQueue;
-	}
+        if (queueNameExist(queueName)) {
+            return true;
+        }
 
-	// 鑾峰彇闃熷垪
-	public Queue getQueue(String queueName) {
-		account = new Account(endpoint, this.secretId, this.secretKey);
-		Queue queue = account.getQueue(queueName);
-		return queue;
-	}
+        QueueMeta meta = new QueueMeta();
+        meta.pollingWaitSeconds = pollingWaitSeconds;
+        meta.visibilityTimeout = visibilityTimeout;// 娑堟伅鍙鎬ц秴鏃�
+        meta.maxMsgSize = 65536;
+        meta.msgRetentionSeconds = 345600;
+        try {
+            account.createQueue(queueName, meta);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
 
-	// 鑾峰彇闃熷垪灞炴��
-	public QueueMeta getQueueAtrribute(String queueName) {
-		Queue queue = account.getQueue(queueName);
-		QueueMeta meta2 = null;
-		try {
-			meta2 = queue.getQueueAttributes();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return meta2;
-	}
+    // 鍒犻櫎闃熷垪
+    public boolean deleteQueue(String queueName) {
+        try {
+            account.deleteQueue(queueName);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
 
-	// 鍙戦�佹秷鎭�
-	public String sendMsg(String queueName, String msg) {
-		try {
-			Queue queue = getQueue(queueName);
-			String msgId = queue.sendMessage(msg);
-			return msgId;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return null;
-	}
+    // 鑾峰彇闃熷垪鍒楄〃
+    public List<String> getQueueNameList(String key) {
+        account = new Account(endpoint, this.secretId, this.secretKey);
+        ArrayList<String> vtQueue = new ArrayList<String>();
+        try {
+            int totalCount = account.listQueue(key, 0, 100, vtQueue);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return vtQueue;
+    }
 
-	// 娑堣垂娑堟伅
-	public Message recieveMsg(String queueName) {
-		try {
-			Queue queue = getQueue(queueName);
-			Message msg = queue.receiveMessage(10);
-			return msg;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return null;
-	}
+    // 鑾峰彇闃熷垪
+    public Queue getQueue(String queueName) {
+        account = new Account(endpoint, this.secretId, this.secretKey);
+        Queue queue = account.getQueue(queueName);
+        return queue;
+    }
 
-	/**
-	 * 娑堣垂娑堟伅
-	 * 
-	 * @param count
-	 *            1-16
-	 * @param queueName
-	 *            闃熷垪鍚嶅瓧
-	 * @return
-	 */
-	public List<Message> recieveMsg(int count, String queueName) {
-		Queue queue = getQueue(queueName);
+    // 鑾峰彇闃熷垪灞炴��
+    public QueueMeta getQueueAtrribute(String queueName) {
+        Queue queue = account.getQueue(queueName);
+        QueueMeta meta2 = null;
+        try {
+            meta2 = queue.getQueueAttributes();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return meta2;
+    }
 
-		if (queue == null) {
-			return null;
-		}
+    // 鍙戦�佹秷鎭�
+    public String sendMsg(String queueName, String msg) {
+        try {
+            Queue queue = getQueue(queueName);
+            String msgId = queue.sendMessage(msg);
+            return msgId;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
 
-		List<Message> msgList = null;
-		try {
-			msgList = queue.batchReceiveMessage(count, 20);
-			return msgList;
-		} catch (Exception e) {
-			if (e.getMessage() != null && !e.getMessage().contains("no message"))
-				e.printStackTrace();
-		}
-		return null;
-	}
+    // 娑堣垂娑堟伅
+    public Message recieveMsg(String queueName) {
+        try {
+            Queue queue = getQueue(queueName);
+            Message msg = queue.receiveMessage(10);
+            return msg;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
 
-	public List<Message> recieveMsg(int count, String queueName, int waitSeconds) {
-		Queue queue = getQueue(queueName);
-		List<Message> msgList = null;
-		try {
-			msgList = queue.batchReceiveMessage(count, waitSeconds);
-			return msgList;
-		} catch (Exception e) {
-		}
-		return null;
-	}
+    /**
+     * 娑堣垂娑堟伅
+     *
+     * @param count     1-16
+     * @param queueName 闃熷垪鍚嶅瓧
+     * @return
+     */
+    public List<Message> recieveMsg(int count, String queueName) {
+        Queue queue = getQueue(queueName);
 
-	// 鍒犻櫎娑堟伅
-	public boolean deleteMsg(String queueName, String receiptHandle) {
-		try {
-			Queue queue = getQueue(queueName);
-			queue.deleteMessage(receiptHandle);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+        if (queue == null) {
+            return null;
+        }
 
-	/**
-	 * 璁㈤槄娑堟伅鐩稿叧
-	 */
+        List<Message> msgList = null;
+        try {
+            msgList = queue.batchReceiveMessage(count, 20);
+            return msgList;
+        } catch (Exception e) {
+            if (e.getMessage() != null && !e.getMessage().contains("no message"))
+                e.printStackTrace();
+        }
+        return null;
+    }
 
-	/**
-	 * 鍒涘缓璁㈤槄涓婚
-	 * 
-	 * @param topicName-涓婚鍚嶇О
-	 * @param maxMsgSize-娑堟伅鏈�澶ч暱搴�
-	 * @param filterType-杩囨护绫诲瀷
-	 * @return
-	 */
-	public boolean createTopic(String topicName, int maxMsgSize, int filterType) {
-		try {
-			topicAccount.createTopic(topicName, maxMsgSize, filterType);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+    public List<Message> recieveMsg(int count, String queueName, int waitSeconds) {
+        Queue queue = getQueue(queueName);
+        List<Message> msgList = null;
+        try {
+            msgList = queue.batchReceiveMessage(count, waitSeconds);
+            return msgList;
+        } catch (Exception e) {
+        }
+        return null;
+    }
 
-	/**
-	 * 鍒涘缓榛樿鍙傛暟鐨勪富棰�
-	 * 
-	 * @param topicName
-	 * @return
-	 */
-	public boolean createTopic(String topicName) {
-		try {
-			topicAccount.createTopic(topicName, 65536);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+    // 鍒犻櫎娑堟伅
+    public boolean deleteMsg(String queueName, String receiptHandle) {
+        try {
+            Queue queue = getQueue(queueName);
+            queue.deleteMessage(receiptHandle);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
 
-	/**
-	 * 璁㈤槄涓婚
-	 * 
-	 * @param topicName-涓婚鍚嶇О
-	 * @param subscriptionName-璁㈤槄鍚嶇О
-	 * @param queueName-鎺ュ彈娑堟伅鐨勯槦鍒楀悕绉�
-	 * @return
-	 */
-	public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
-		try {
-			topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue");
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+    /**
+     * 璁㈤槄娑堟伅鐩稿叧
+     */
 
-	/**
-	 * 鍒犻櫎璁㈤槄
-	 * 
-	 * @param topicName
-	 * @param subscriptionName
-	 * @return
-	 */
-	public boolean deleteSubscribeTopic(String topicName, String subscriptionName) {
-		try {
-			topicAccount.deleteSubscribe(topicName, subscriptionName);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
 
-		return false;
-	}
+    /**
+     * 涓婚鍚嶇О鏄惁宸茬粡瀛樺湪
+     *
+     * @param topicName
+     * @return
+     */
+    private boolean topicNameExist(String topicName) {
+        List<String> resultList = new ArrayList<>();
+        try {
+            account.listTopic(topicName, resultList, 0, 100);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
-	/**
-	 * 鍙戝竷璁㈤槄娑堟伅
-	 * 
-	 * @param topicName
-	 * @param message
-	 * @return
-	 */
-	public boolean publishTopicMessage(String topicName, String message) {
-		try {
-			Topic topic = topicAccount.getTopic(topicName);
-			if (topic == null)
-				return false;
-			topic.publishMessage(message);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+        //宸茬粡鍒涘缓浜�
+        if (resultList.contains(topicName)) {
+            return true;
+        }
+        return false;
+    }
 
-	/**
-	 * 鎵归噺鍙戝竷娑堟伅
-	 * 
-	 * @param topicName
-	 * @param msgList
-	 * @return
-	 */
-	public boolean batchPublishTopicMessage(String topicName, List<String> msgList) {
-		try {
-			Topic topic = topicAccount.getTopic(topicName);
-			if (topic == null)
-				return false;
-			topic.batchPublishMessage(msgList);
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		return false;
-	}
+    /**
+     * 鍒涘缓璁㈤槄涓婚
+     *
+     * @param topicName-涓婚鍚嶇О
+     * @param maxMsgSize-娑堟伅鏈�澶ч暱搴�
+     * @param filterType-杩囨护绫诲瀷
+     * @return
+     */
+    public boolean createTopic(String topicName, int maxMsgSize, int filterType) {
+        if (topicNameExist(topicName)) {
+            return true;
+        }
+        try {
+            topicAccount.createTopic(topicName, maxMsgSize, filterType);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    /**
+     * 鍒涘缓榛樿鍙傛暟鐨勪富棰�
+     *
+     * @param topicName
+     * @return
+     */
+    public boolean createTopic(String topicName) {
+        if (topicNameExist(topicName)) {
+            return true;
+        }
+        try {
+            topicAccount.createTopic(topicName, 65536);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+
+    /**
+     * 鏄惁宸茬粡璁㈤槄
+     *
+     * @param topicName
+     * @param subscriptionName
+     * @return
+     */
+    private boolean isAlreadySubscribe(String topicName, String subscriptionName) throws Exception {
+        Topic topic = topicAccount.getTopic(topicName);
+        List<String> resultList = new ArrayList<>();
+        topic.ListSubscription(0, 100, subscriptionName, resultList);
+        if (resultList.contains(subscriptionName)) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * 璁㈤槄涓婚
+     *
+     * @param topicName-涓婚鍚嶇О
+     * @param subscriptionName-璁㈤槄鍚嶇О
+     * @param queueName-鎺ュ彈娑堟伅鐨勯槦鍒楀悕绉�
+     * @return
+     */
+    public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
+
+        try {
+            if (isAlreadySubscribe(topicName, subscriptionName)) {
+                return true;
+            }
+        } catch (Exception e) {
+        }
+        try {
+            topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List<String> filterTags) {
+
+        try {
+            if (isAlreadySubscribe(topicName, subscriptionName)) {
+                return true;
+            }
+        } catch (Exception e) {
+        }
+        try {
+            topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue", filterTags, null, "BACKOFF_RETRY", "JSON");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    /**
+     * 鍒犻櫎璁㈤槄
+     *
+     * @param topicName
+     * @param subscriptionName
+     * @return
+     */
+    public boolean deleteSubscribeTopic(String topicName, String subscriptionName) {
+        try {
+            topicAccount.deleteSubscribe(topicName, subscriptionName);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return false;
+    }
+
+    /**
+     * 鍙戝竷璁㈤槄娑堟伅
+     *
+     * @param topicName
+     * @param message
+     * @return
+     */
+    public boolean publishTopicMessage(String topicName, String message) {
+        try {
+            Topic topic = topicAccount.getTopic(topicName);
+            if (topic == null)
+                return false;
+            topic.publishMessage(message);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    /**
+     * 鍙戝竷璁㈤槄娑堟伅
+     *
+     * @param topicName
+     * @param tagList   -鏍囩
+     * @param message
+     * @return
+     */
+    public boolean publishTopicMessage(String topicName, List<String> tagList, String message) {
+        try {
+            Topic topic = topicAccount.getTopic(topicName);
+            if (topic == null)
+                return false;
+            topic.publishMessage(message, tagList, null);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+
+    /**
+     * 鎵归噺鍙戝竷娑堟伅
+     *
+     * @param topicName
+     * @param msgList
+     * @return
+     */
+    public boolean batchPublishTopicMessage(String topicName, List<String> msgList) {
+        try {
+            Topic topic = topicAccount.getTopic(topicName);
+            if (topic == null)
+                return false;
+            topic.batchPublishMessage(msgList);
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
 
 }

--
Gitblit v1.8.0