admin
2021-01-25 2ba431be9c12a79783e0f9ef249292b7fa95f2a1
utils/src/main/java/org/yeshi/utils/CMQUtil.java
@@ -3,349 +3,457 @@
import java.util.ArrayList;
import java.util.List;
import com.qcloud.cmq.Account;
import com.qcloud.cmq.Message;
import com.qcloud.cmq.Queue;
import com.qcloud.cmq.QueueMeta;
import com.qcloud.cmq.Topic;
import com.qcloud.cmq.*;
import org.springframework.data.annotation.Transient;
//腾讯CMQ消息
public class CMQUtil {
   private static CMQUtil cmqUtil;
    private static CMQUtil cmqUtil;
   public static CMQUtil getInstance(String secretId, String secretKey) {
      if (cmqUtil == null) {
         cmqUtil = new CMQUtil();
         cmqUtil.init(secretId, secretKey);
      }
      return cmqUtil;
   }
    public static CMQUtil getInstance(String secretId, String secretKey) {
        if (cmqUtil == null) {
            cmqUtil = new CMQUtil();
            cmqUtil.init(secretId, secretKey);
        }
        return cmqUtil;
    }
   private String secretId = "";
   private String secretKey = "";
   // 内网 http://cmq-queue-gz.api.tencentyun.com
   // 外网 http://cmq-queue-gz.api.qcloud.com
   private static String endpoint = "http://cmq-queue-gz.api.qcloud.com";
   private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com";
   // private static String endpoint =
   // "http://cmq-queue-gz.api.tencentyun.com";
    private String secretId = "";
    private String secretKey = "";
    // 内网 http://cmq-queue-gz.api.tencentyun.com
    // 外网 http://cmq-queue-gz.api.qcloud.com
    private static String endpoint = "http://cmq-queue-gz.api.qcloud.com";
    private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com";
    // private static String endpoint =
    // "http://cmq-queue-gz.api.tencentyun.com";
   private Account account;
   private Account topicAccount;
    private Account account;
    private Account topicAccount;
   static {
      // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX)
      // endpoint = "http://cmq-queue-gz.api.tencentyun.com";
      // else
      endpoint = "http://cmq-queue-gz.api.qcloud.com";
   }
    static {
        // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX)
        // endpoint = "http://cmq-queue-gz.api.tencentyun.com";
        // else
        endpoint = "http://cmq-queue-gz.api.qcloud.com";
    }
   public void init(String secretId, String secretKey) {
      this.secretId = secretId;
      this.secretKey = secretKey;
      account = new Account(endpoint, this.secretId, this.secretKey);
      topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey);
   }
    public void init(String secretId, String secretKey) {
        this.secretId = secretId;
        this.secretKey = secretKey;
        account = new Account(endpoint, this.secretId, this.secretKey);
        topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey);
    }
   public boolean existQueue(String queueName) {
    public boolean existQueue(String queueName) {
      ArrayList<String> vtQueue = new ArrayList<String>();
      try {
         int totalCount = account.listQueue(queueName, 0, 1, vtQueue);
         if (totalCount <= 0)
            return false;
      } catch (Exception e) {
         e.printStackTrace();
      }
      if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName))
         return true;
      return false;
   }
        ArrayList<String> vtQueue = new ArrayList<String>();
        try {
            int totalCount = account.listQueue(queueName, 0, 1, vtQueue);
            if (totalCount <= 0)
                return false;
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName))
            return true;
        return false;
    }
   // 创建队列
   public boolean createQueue(String queueName) {
    private boolean queueNameExist(String queueName) {
        List<String> resultList = new ArrayList<>();
        try {
            account.listQueue(queueName, 0, 100, resultList);
        } catch (Exception e) {
            e.printStackTrace();
        }
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = 10;
      meta.visibilityTimeout = 5 * 60;// 消息可见性超时
      meta.maxMsgSize = 65536;
      meta.msgRetentionSeconds = 345600;
      try {
         account.createQueue(queueName, meta);
         return true;
      } catch (Exception e) {
         return false;
      }
   }
        //已经创建了
        if (resultList.contains(queueName)) {
            return true;
        }
        return false;
    }
   // 创建队列
   public boolean createQueue(String queueName, int maxMsgSize) {
    // 创建队列
    public boolean createQueue(String queueName) {
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = 10;
      meta.visibilityTimeout = 5 * 60;// 消息可见性超时
      meta.maxMsgSize = maxMsgSize;
      meta.msgRetentionSeconds = 345600;
      try {
         account.createQueue(queueName, meta);
         return true;
      } catch (Exception e) {
         return false;
      }
   }
        if (queueNameExist(queueName)) {
            return true;
        }
   /**
    * 指定参数创建队列
    *
    * @param queueName
    * @param pollingWaitSeconds
    *            -长轮训等待时间
    * @param visibilityTimeout
    *            -消息消费后再次可见的时间
    * @return
    */
   public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
        QueueMeta meta = new QueueMeta();
        meta.pollingWaitSeconds = 10;
        meta.visibilityTimeout = 5 * 60;// 消息可见性超时
        meta.maxMsgSize = 65536;
        meta.msgRetentionSeconds = 345600;
        try {
            account.createQueue(queueName, meta);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
      QueueMeta meta = new QueueMeta();
      meta.pollingWaitSeconds = pollingWaitSeconds;
      meta.visibilityTimeout = visibilityTimeout;// 消息可见性超时
      meta.maxMsgSize = 65536;
      meta.msgRetentionSeconds = 345600;
      try {
         account.createQueue(queueName, meta);
         return true;
      } catch (Exception e) {
         return false;
      }
   }
    // 创建队列
    public boolean createQueue(String queueName, int maxMsgSize) {
        if (queueNameExist(queueName)) {
            return true;
        }
        QueueMeta meta = new QueueMeta();
        meta.pollingWaitSeconds = 10;
        meta.visibilityTimeout = 5 * 60;// 消息可见性超时
        meta.maxMsgSize = maxMsgSize;
        meta.msgRetentionSeconds = 345600;
        try {
            account.createQueue(queueName, meta);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
   // 删除队列
   public boolean deleteQueue(String queueName) {
      try {
         account.deleteQueue(queueName);
         return true;
      } catch (Exception e) {
         return false;
      }
   }
    /**
     * 指定参数创建队列
     *
     * @param queueName
     * @param pollingWaitSeconds -长轮训等待时间
     * @param visibilityTimeout  -消息消费后再次可见的时间
     * @return
     */
    public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
   // 获取队列列表
   public List<String> getQueueNameList(String key) {
      account = new Account(endpoint, this.secretId, this.secretKey);
      ArrayList<String> vtQueue = new ArrayList<String>();
      try {
         int totalCount = account.listQueue(key, 0, 100, vtQueue);
      } catch (Exception e) {
         e.printStackTrace();
      }
      return vtQueue;
   }
        if (queueNameExist(queueName)) {
            return true;
        }
   // 获取队列
   public Queue getQueue(String queueName) {
      account = new Account(endpoint, this.secretId, this.secretKey);
      Queue queue = account.getQueue(queueName);
      return queue;
   }
        QueueMeta meta = new QueueMeta();
        meta.pollingWaitSeconds = pollingWaitSeconds;
        meta.visibilityTimeout = visibilityTimeout;// 消息可见性超时
        meta.maxMsgSize = 65536;
        meta.msgRetentionSeconds = 345600;
        try {
            account.createQueue(queueName, meta);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
   // 获取队列属性
   public QueueMeta getQueueAtrribute(String queueName) {
      Queue queue = account.getQueue(queueName);
      QueueMeta meta2 = null;
      try {
         meta2 = queue.getQueueAttributes();
      } catch (Exception e) {
         e.printStackTrace();
      }
      return meta2;
   }
    // 删除队列
    public boolean deleteQueue(String queueName) {
        try {
            account.deleteQueue(queueName);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
   // 发送消息
   public String sendMsg(String queueName, String msg) {
      try {
         Queue queue = getQueue(queueName);
         String msgId = queue.sendMessage(msg);
         return msgId;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return null;
   }
    // 获取队列列表
    public List<String> getQueueNameList(String key) {
        account = new Account(endpoint, this.secretId, this.secretKey);
        ArrayList<String> vtQueue = new ArrayList<String>();
        try {
            int totalCount = account.listQueue(key, 0, 100, vtQueue);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return vtQueue;
    }
   // 消费消息
   public Message recieveMsg(String queueName) {
      try {
         Queue queue = getQueue(queueName);
         Message msg = queue.receiveMessage(10);
         return msg;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return null;
   }
    // 获取队列
    public Queue getQueue(String queueName) {
        account = new Account(endpoint, this.secretId, this.secretKey);
        Queue queue = account.getQueue(queueName);
        return queue;
    }
   /**
    * 消费消息
    *
    * @param count
    *            1-16
    * @param queueName
    *            队列名字
    * @return
    */
   public List<Message> recieveMsg(int count, String queueName) {
      Queue queue = getQueue(queueName);
    // 获取队列属性
    public QueueMeta getQueueAtrribute(String queueName) {
        Queue queue = account.getQueue(queueName);
        QueueMeta meta2 = null;
        try {
            meta2 = queue.getQueueAttributes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return meta2;
    }
      if (queue == null) {
         return null;
      }
    // 发送消息
    public String sendMsg(String queueName, String msg) {
        try {
            Queue queue = getQueue(queueName);
            String msgId = queue.sendMessage(msg);
            return msgId;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
      List<Message> msgList = null;
      try {
         msgList = queue.batchReceiveMessage(count, 20);
         return msgList;
      } catch (Exception e) {
         if (e.getMessage() != null && !e.getMessage().contains("no message"))
            e.printStackTrace();
      }
      return null;
   }
    // 消费消息
    public Message recieveMsg(String queueName) {
        try {
            Queue queue = getQueue(queueName);
            Message msg = queue.receiveMessage(10);
            return msg;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
   public List<Message> recieveMsg(int count, String queueName, int waitSeconds) {
      Queue queue = getQueue(queueName);
      List<Message> msgList = null;
      try {
         msgList = queue.batchReceiveMessage(count, waitSeconds);
         return msgList;
      } catch (Exception e) {
      }
      return null;
   }
    /**
     * 消费消息
     *
     * @param count     1-16
     * @param queueName 队列名字
     * @return
     */
    public List<Message> recieveMsg(int count, String queueName) {
        Queue queue = getQueue(queueName);
   // 删除消息
   public boolean deleteMsg(String queueName, String receiptHandle) {
      try {
         Queue queue = getQueue(queueName);
         queue.deleteMessage(receiptHandle);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
        if (queue == null) {
            return null;
        }
   /**
    * 订阅消息相关
    */
        List<Message> msgList = null;
        try {
            msgList = queue.batchReceiveMessage(count, 20);
            return msgList;
        } catch (Exception e) {
            if (e.getMessage() != null && !e.getMessage().contains("no message"))
                e.printStackTrace();
        }
        return null;
    }
   /**
    * 创建订阅主题
    *
    * @param topicName-主题名称
    * @param maxMsgSize-消息最大长度
    * @param filterType-过滤类型
    * @return
    */
   public boolean createTopic(String topicName, int maxMsgSize, int filterType) {
      try {
         topicAccount.createTopic(topicName, maxMsgSize, filterType);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
    public List<Message> recieveMsg(int count, String queueName, int waitSeconds) {
        Queue queue = getQueue(queueName);
        List<Message> msgList = null;
        try {
            msgList = queue.batchReceiveMessage(count, waitSeconds);
            return msgList;
        } catch (Exception e) {
        }
        return null;
    }
   /**
    * 创建默认参数的主题
    *
    * @param topicName
    * @return
    */
   public boolean createTopic(String topicName) {
      try {
         topicAccount.createTopic(topicName, 65536);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
    // 删除消息
    public boolean deleteMsg(String queueName, String receiptHandle) {
        try {
            Queue queue = getQueue(queueName);
            queue.deleteMessage(receiptHandle);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
   /**
    * 订阅主题
    *
    * @param topicName-主题名称
    * @param subscriptionName-订阅名称
    * @param queueName-接受消息的队列名称
    * @return
    */
   public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
      try {
         topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue");
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
    /**
     * 订阅消息相关
     */
   /**
    * 删除订阅
    *
    * @param topicName
    * @param subscriptionName
    * @return
    */
   public boolean deleteSubscribeTopic(String topicName, String subscriptionName) {
      try {
         topicAccount.deleteSubscribe(topicName, subscriptionName);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
    /**
     * 主题名称是否已经存在
     *
     * @param topicName
     * @return
     */
    private boolean topicNameExist(String topicName) {
        List<String> resultList = new ArrayList<>();
        try {
            account.listTopic(topicName, resultList, 0, 100);
        } catch (Exception e) {
            e.printStackTrace();
        }
   /**
    * 发布订阅消息
    *
    * @param topicName
    * @param message
    * @return
    */
   public boolean publishTopicMessage(String topicName, String message) {
      try {
         Topic topic = topicAccount.getTopic(topicName);
         if (topic == null)
            return false;
         topic.publishMessage(message);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
        //已经创建了
        if (resultList.contains(topicName)) {
            return true;
        }
        return false;
    }
   /**
    * 批量发布消息
    *
    * @param topicName
    * @param msgList
    * @return
    */
   public boolean batchPublishTopicMessage(String topicName, List<String> msgList) {
      try {
         Topic topic = topicAccount.getTopic(topicName);
         if (topic == null)
            return false;
         topic.batchPublishMessage(msgList);
         return true;
      } catch (Exception e) {
         e.printStackTrace();
      }
      return false;
   }
    /**
     * 创建订阅主题
     *
     * @param topicName-主题名称
     * @param maxMsgSize-消息最大长度
     * @param filterType-过滤类型
     * @return
     */
    public boolean createTopic(String topicName, int maxMsgSize, int filterType) {
        if (topicNameExist(topicName)) {
            return true;
        }
        try {
            topicAccount.createTopic(topicName, maxMsgSize, filterType);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 创建默认参数的主题
     *
     * @param topicName
     * @return
     */
    public boolean createTopic(String topicName) {
        if (topicNameExist(topicName)) {
            return true;
        }
        try {
            topicAccount.createTopic(topicName, 65536);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 是否已经订阅
     *
     * @param topicName
     * @param subscriptionName
     * @return
     */
    private boolean isAlreadySubscribe(String topicName, String subscriptionName) throws Exception {
        Topic topic = topicAccount.getTopic(topicName);
        List<String> resultList = new ArrayList<>();
        topic.ListSubscription(0, 100, subscriptionName, resultList);
        if (resultList.contains(subscriptionName)) {
            return true;
        }
        return false;
    }
    /**
     * 订阅主题
     *
     * @param topicName-主题名称
     * @param subscriptionName-订阅名称
     * @param queueName-接受消息的队列名称
     * @return
     */
    public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) {
        try {
            if (isAlreadySubscribe(topicName, subscriptionName)) {
                return true;
            }
        } catch (Exception e) {
        }
        try {
            topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    public boolean subscribeTopic(String topicName, String subscriptionName, String queueName, List<String> filterTags) {
        try {
            if (isAlreadySubscribe(topicName, subscriptionName)) {
                return true;
            }
        } catch (Exception e) {
        }
        try {
            topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue", filterTags, null, "BACKOFF_RETRY", "JSON");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 删除订阅
     *
     * @param topicName
     * @param subscriptionName
     * @return
     */
    public boolean deleteSubscribeTopic(String topicName, String subscriptionName) {
        try {
            topicAccount.deleteSubscribe(topicName, subscriptionName);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 发布订阅消息
     *
     * @param topicName
     * @param message
     * @return
     */
    public String publishTopicMessage(String topicName, String message) {
        try {
            Topic topic = topicAccount.getTopic(topicName);
            if (topic == null)
                return null;
            return topic.publishMessage(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 发布订阅消息
     *
     * @param topicName
     * @param tagList   -标签
     * @param message
     * @return
     */
    public boolean publishTopicMessage(String topicName, List<String> tagList, String message) {
        try {
            Topic topic = topicAccount.getTopic(topicName);
            if (topic == null)
                return false;
            topic.publishMessage(message, tagList, null);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 批量发布消息
     *
     * @param topicName
     * @param msgList
     * @return
     */
    public boolean batchPublishTopicMessage(String topicName, List<String> msgList) {
        try {
            Topic topic = topicAccount.getTopic(topicName);
            if (topic == null)
                return false;
            topic.batchPublishMessage(msgList);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}