package org.yeshi.utils;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
import com.qcloud.cmq.Account;
|
import com.qcloud.cmq.Message;
|
import com.qcloud.cmq.Queue;
|
import com.qcloud.cmq.QueueMeta;
|
|
//腾讯CMQ消息
|
public class CMQUtil {
|
|
private static CMQUtil cmqUtil;
|
|
public static CMQUtil getInstance(String secretId, String secretKey) {
|
if (cmqUtil == null) {
|
cmqUtil = new CMQUtil();
|
cmqUtil.init(secretId, secretKey);
|
}
|
return cmqUtil;
|
}
|
|
private String secretId = "";
|
private String secretKey = "";
|
// 内网 http://cmq-queue-gz.api.tencentyun.com
|
// 外网 http://cmq-queue-gz.api.qcloud.com
|
private static String endpoint = "http://cmq-queue-gz.api.qcloud.com";
|
// private static String endpoint =
|
// "http://cmq-queue-gz.api.tencentyun.com";
|
|
private Account account;
|
|
static {
|
// if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX)
|
// endpoint = "http://cmq-queue-gz.api.tencentyun.com";
|
// else
|
endpoint = "http://cmq-queue-gz.api.qcloud.com";
|
}
|
|
public void init(String secretId, String secretKey) {
|
this.secretId = secretId;
|
this.secretKey = secretKey;
|
account = new Account(endpoint, this.secretId, this.secretKey);
|
}
|
|
public boolean existQueue(String queueName) {
|
|
ArrayList<String> vtQueue = new ArrayList<String>();
|
try {
|
int totalCount = account.listQueue(queueName, 0, 1, vtQueue);
|
if (totalCount <= 0)
|
return false;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
if (vtQueue.size() > 0 && vtQueue.get(0).equalsIgnoreCase(queueName))
|
return true;
|
return false;
|
}
|
|
// 创建队列
|
public boolean createQueue(String queueName) {
|
|
QueueMeta meta = new QueueMeta();
|
meta.pollingWaitSeconds = 10;
|
meta.visibilityTimeout = 5 * 60;// 消息可见性超时
|
meta.maxMsgSize = 65536;
|
meta.msgRetentionSeconds = 345600;
|
try {
|
account.createQueue(queueName, meta);
|
return true;
|
} catch (Exception e) {
|
return false;
|
}
|
}
|
|
// 创建队列
|
public boolean createQueue(String queueName, int maxMsgSize) {
|
|
QueueMeta meta = new QueueMeta();
|
meta.pollingWaitSeconds = 10;
|
meta.visibilityTimeout = 5 * 60;// 消息可见性超时
|
meta.maxMsgSize = maxMsgSize;
|
meta.msgRetentionSeconds = 345600;
|
try {
|
account.createQueue(queueName, meta);
|
return true;
|
} catch (Exception e) {
|
return false;
|
}
|
}
|
|
/**
|
* 指定参数创建队列
|
*
|
* @param queueName
|
* @param pollingWaitSeconds
|
* -长轮训等待时间
|
* @param visibilityTimeout
|
* -消息消费后再次可见的时间
|
* @return
|
*/
|
public boolean createQueue(String queueName, int pollingWaitSeconds, int visibilityTimeout) {
|
|
QueueMeta meta = new QueueMeta();
|
meta.pollingWaitSeconds = pollingWaitSeconds;
|
meta.visibilityTimeout = visibilityTimeout;// 消息可见性超时
|
meta.maxMsgSize = 65536;
|
meta.msgRetentionSeconds = 345600;
|
try {
|
account.createQueue(queueName, meta);
|
return true;
|
} catch (Exception e) {
|
return false;
|
}
|
}
|
|
// 删除队列
|
public boolean deleteQueue(String queueName) {
|
try {
|
account.deleteQueue(queueName);
|
return true;
|
} catch (Exception e) {
|
return false;
|
}
|
}
|
|
// 获取队列列表
|
public List<String> getQueueNameList(String key) {
|
account = new Account(endpoint, this.secretId, this.secretKey);
|
ArrayList<String> vtQueue = new ArrayList<String>();
|
try {
|
int totalCount = account.listQueue(key, 0, 100, vtQueue);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return vtQueue;
|
}
|
|
// 获取队列
|
public Queue getQueue(String queueName) {
|
account = new Account(endpoint, this.secretId, this.secretKey);
|
Queue queue = account.getQueue(queueName);
|
return queue;
|
}
|
|
// 获取队列属性
|
public QueueMeta getQueueAtrribute(String queueName) {
|
Queue queue = account.getQueue(queueName);
|
QueueMeta meta2 = null;
|
try {
|
meta2 = queue.getQueueAttributes();
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return meta2;
|
}
|
|
// 发送消息
|
public String sendMsg(String queueName, String msg) {
|
try {
|
Queue queue = getQueue(queueName);
|
String msgId = queue.sendMessage(msg);
|
return msgId;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
// 消费消息
|
public Message recieveMsg(String queueName) {
|
try {
|
Queue queue = getQueue(queueName);
|
Message msg = queue.receiveMessage(10);
|
return msg;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
/**
|
* 消费消息
|
*
|
* @param count
|
* 1-16
|
* @param queueName
|
* 队列名字
|
* @return
|
*/
|
public List<Message> recieveMsg(int count, String queueName) {
|
Queue queue = getQueue(queueName);
|
|
if (queue == null ) {
|
return null;
|
}
|
|
List<Message> msgList = null;
|
try {
|
msgList = queue.batchReceiveMessage(count, 20);
|
return msgList;
|
} catch (Exception e) {
|
if (e.getMessage()!=null&& !e.getMessage().contains("no message"))
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
public List<Message> recieveMsg(int count, String queueName, int waitSeconds) {
|
Queue queue = getQueue(queueName);
|
List<Message> msgList = null;
|
try {
|
msgList = queue.batchReceiveMessage(count, waitSeconds);
|
return msgList;
|
} catch (Exception e) {
|
}
|
return null;
|
}
|
|
// 删除消息
|
public boolean deleteMsg(String queueName, String receiptHandle) {
|
try {
|
Queue queue = getQueue(queueName);
|
queue.deleteMessage(receiptHandle);
|
return true;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return false;
|
}
|
}
|