admin
2020-12-26 24bed5e6b6e31090cb61600a0bdea898eac65da1
utils/src/main/java/org/yeshi/utils/CMQUtil.java
@@ -3,11 +3,7 @@
import java.util.ArrayList;
import java.util.List;
import com.qcloud.cmq.Account;
import com.qcloud.cmq.Message;
import com.qcloud.cmq.Queue;
import com.qcloud.cmq.QueueMeta;
import com.qcloud.cmq.Topic;
import com.qcloud.cmq.*;
import org.springframework.data.annotation.Transient;
//腾讯CMQ消息
@@ -64,8 +60,27 @@
      return false;
   }
    private boolean queueNameExist(String queueName) {
        List<String> resultList = new ArrayList<>();
        try {
            account.listQueue(queueName, 0, 100, resultList);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //已经创建了
        if (resultList.contains(queueName)) {
            return true;
        }
        return false;
    }
   // 创建队列
   public boolean createQueue(String queueName) {
        if (queueNameExist(queueName)) {
            return true;
        }
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = 10;
@@ -82,7 +97,9 @@
   // 创建队列
   public boolean createQueue(String queueName, int maxMsgSize) {
        if (queueNameExist(queueName)) {
            return true;
        }
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = 10;
      meta.visibilityTimeout = 5 * 60;// 消息可见性超时
@@ -100,13 +117,15 @@
    * 指定参数创建队列
    * 
    * @param queueName
    * @param pollingWaitSeconds
    *            -长轮训等待时间
    * @param visibilityTimeout
    *            -消息消费后再次可见的时间
     * @param pollingWaitSeconds -长轮训等待时间
     * @param visibilityTimeout  -消息消费后再次可见的时间
    * @return
    */
   public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
        if (queueNameExist(queueName)) {
            return true;
        }
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = pollingWaitSeconds;
@@ -189,10 +208,8 @@
   /**
    * 消费消息
    * 
    * @param count
    *            1-16
    * @param queueName
    *            队列名字
     * @param count     1-16
     * @param queueName 队列名字
    * @return
    */
   public List<Message> recieveMsg(int count, String queueName) {
@@ -240,6 +257,28 @@
    * 订阅消息相关
    */
    /**
     * 主题名称是否已经存在
     *
     * @param topicName
     * @return
     */
    private boolean topicNameExist(String topicName) {
        List<String> resultList = new ArrayList<>();
        try {
            account.listTopic(topicName, resultList, 0, 100);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //已经创建了
        if (resultList.contains(topicName)) {
            return true;
        }
        return false;
    }
   /**
    * 创建订阅主题
    * 
@@ -249,6 +288,9 @@
    * @return
    */
   public boolean createTopic(String topicName, int maxMsgSize, int filterType) {
        if (topicNameExist(topicName)) {
            return true;
        }
      try {
         topicAccount.createTopic(topicName, maxMsgSize, filterType);
         return true;
@@ -265,11 +307,32 @@
    * @return
    */
   public boolean createTopic(String topicName) {
        if (topicNameExist(topicName)) {
            return true;
        }
      try {
         topicAccount.createTopic(topicName, 65536);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
        }
        return false;
    }
    /**
     * 是否已经订阅
     *
     * @param topicName
     * @param subscriptionName
     * @return
     */
    private boolean isAlreadySubscribe(String topicName, String subscriptionName) throws Exception {
        Topic topic = topicAccount.getTopic(topicName);
        List<String> resultList = new ArrayList<>();
        topic.ListSubscription(0, 100, subscriptionName, resultList);
        if (resultList.contains(subscriptionName)) {
            return true;
      }
      return false;
   }
@@ -283,8 +346,31 @@
    * @return
    */
   public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
        try {
            if (isAlreadySubscribe(topicName, subscriptionName)) {
                return true;
            }
        } catch (Exception e) {
        }
      try {
         topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List<String> filterTags) {
        try {
            if (isAlreadySubscribe(topicName, subscriptionName)) {
                return true;
            }
        } catch (Exception e) {
        }
        try {
            topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue", filterTags, null, "BACKOFF_RETRY", "JSON");
      } catch (Exception e) {
         e.printStackTrace();
      }
@@ -330,6 +416,28 @@
   }
   /**
     * 发布订阅消息
     *
     * @param topicName
     * @param tagList   -标签
     * @param message
     * @return
     */
    public boolean publishTopicMessage(String topicName, List<String> tagList, String message) {
        try {
            Topic topic = topicAccount.getTopic(topicName);
            if (topic == null)
                return false;
            topic.publishMessage(message, tagList, null);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
    * 批量发布消息
    * 
    * @param topicName