From 98b1a0affd69bbe63223c21fdd2c404e8bedfccb Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期三, 20 五月 2020 17:25:08 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/div' into 2.1.2 --- utils/src/main/java/com/qcloud/cmq/Queue.java | 596 +++++++++++++++++++++++++++++----------------------------- 1 files changed, 298 insertions(+), 298 deletions(-) diff --git a/utils/src/main/java/com/qcloud/cmq/Queue.java b/utils/src/main/java/com/qcloud/cmq/Queue.java index 32bc129..81ebd81 100644 --- a/utils/src/main/java/com/qcloud/cmq/Queue.java +++ b/utils/src/main/java/com/qcloud/cmq/Queue.java @@ -1,298 +1,298 @@ -package com.qcloud.cmq; - -import java.util.List; -import java.util.ArrayList; -import java.util.TreeMap; -import java.lang.Integer; - -import com.qcloud.cmq.*; -import com.qcloud.cmq.Json.*; - -/** - * Queue class. - * - * @author York. - * Created 2016骞�9鏈�26鏃�. - */ -public class Queue{ - protected String queueName; - protected CMQClient client; - - - Queue(String queueName, CMQClient client){ - this.queueName = queueName; - this.client = client; - } - - /** - * 璁剧疆闃熷垪灞炴�� - * - * @param meta 闃熷垪灞炴�у弬鏁� - * @throws CMQClientException - * @throws CMQServerException - */ - public void setQueueAttributes(QueueMeta meta) throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - - if(meta.maxMsgHeapNum > 0) - param.put("maxMsgHeapNum",Integer.toString(meta.maxMsgHeapNum)); - if(meta.pollingWaitSeconds > 0) - param.put("pollingWaitSeconds",Integer.toString(meta.pollingWaitSeconds)); - if(meta.visibilityTimeout > 0) - param.put("visibilityTimeout",Integer.toString(meta.visibilityTimeout)); - if(meta.maxMsgSize > 0) - param.put("maxMsgSize",Integer.toString(meta.maxMsgSize)); - if(meta.msgRetentionSeconds > 0) - param.put("msgRetentionSeconds",Integer.toString(meta.msgRetentionSeconds)); - - String result = this.client.call("SetQueueAttributes", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - } - - /** - * 鑾峰彇闃熷垪灞炴�� - * - * @return 杩斿洖鐨勯槦鍒楀睘鎬у弬鏁� - * @throws CMQClientException - * @throws CMQServerException - */ - public QueueMeta getQueueAttributes() throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - String result = this.client.call("GetQueueAttributes", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - - QueueMeta meta = new QueueMeta(); - meta.maxMsgHeapNum = jsonObj.getInt("maxMsgHeapNum"); - meta.pollingWaitSeconds = jsonObj.getInt("pollingWaitSeconds"); - meta.visibilityTimeout = jsonObj.getInt("visibilityTimeout"); - meta.maxMsgSize = jsonObj.getInt("maxMsgSize"); - meta.msgRetentionSeconds = jsonObj.getInt("msgRetentionSeconds"); - meta.createTime = jsonObj.getInt("createTime"); - meta.lastModifyTime = jsonObj.getInt("lastModifyTime"); - meta.activeMsgNum = jsonObj.getInt("activeMsgNum"); - meta.inactiveMsgNum = jsonObj.getInt("inactiveMsgNum"); - meta.rewindmsgNum = jsonObj.getInt("rewindMsgNum"); - meta.minMsgTime = jsonObj.getInt("minMsgTime"); - meta.delayMsgNum = jsonObj.getInt("delayMsgNum"); - - - return meta; - } - - /** - * 鍙戦�佹秷鎭� - * - * @param msgBody 娑堟伅鍐呭 - * @return 鏈嶅姟鍣ㄨ繑鍥炵殑娑堟伅鍞竴鏍囪瘑 - * @throws CMQClientException - * @throws CMQServerException - */ - public String sendMessage(String msgBody) throws Exception { - return sendMessage(msgBody, 0); - } - public String sendMessage(String msgBody ,int delayTime) throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - param.put("msgBody",msgBody); - param.put("delaySeconds",Integer.toString(delayTime)); - - String result = this.client.call("SendMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - - return jsonObj.getString("msgId"); - } - - /** - * 鎵归噺鍙戦�佹秷鎭� - * - * @param vtMsgBody 娑堟伅鍒楄〃 - * @return 鏈嶅姟鍣ㄨ繑鍥炵殑娑堟伅鍞竴鏍囪瘑鍒楄〃 - * @throws CMQClientException - * @throws CMQServerException - */ - public List<String> batchSendMessage(List<String> vtMsgBody) throws Exception { - return batchSendMessage(vtMsgBody, 0); - } - public List<String> batchSendMessage(List<String> vtMsgBody,int delayTime) throws Exception { - - if(vtMsgBody.isEmpty() || vtMsgBody.size() > 16) - throw new CMQClientException("Error: message size is empty or more than 16"); - - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - for(int i=0;i<vtMsgBody.size();i++) - { - String k = "msgBody." + Integer.toString(i+1); - param.put(k,vtMsgBody.get(i)); - } - param.put("delaySeconds", Integer.toString(delayTime)); - - String result = this.client.call("BatchSendMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - - ArrayList<String> vtMsgId = new ArrayList<String>(); - JSONArray jsonArray = jsonObj.getJSONArray("msgList"); - for(int i=0;i<jsonArray.length();i++) - { - JSONObject obj = (JSONObject)jsonArray.get(i); - vtMsgId.add(obj.getString("msgId")); - } - - return vtMsgId; - } - - /** - * 鑾峰彇娑堟伅 - * - * @param pollingWaitSeconds 璇锋眰鏈�闀跨殑Polling绛夊緟鏃堕棿 - * @return 鏈嶅姟鍣ㄨ繑鍥炴秷鎭� - * @throws CMQClientException - * @throws CMQServerException - */ - public Message receiveMessage(int pollingWaitSeconds) throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - if(pollingWaitSeconds > 0) - { - param.put("UserpollingWaitSeconds",Integer.toString(pollingWaitSeconds *1000)); - param.put("pollingWaitSeconds", Integer.toString(pollingWaitSeconds) ); - } - else - { - param.put("UserpollingWaitSeconds",Integer.toString(30000)); - } - - String result = this.client.call("ReceiveMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - - Message msg = new Message(); - msg.msgId = jsonObj.getString("msgId"); - msg.receiptHandle = jsonObj.getString("receiptHandle"); - msg.msgBody = jsonObj.getString("msgBody"); - msg.enqueueTime = jsonObj.getLong("enqueueTime"); - msg.nextVisibleTime = jsonObj.getLong("nextVisibleTime"); - msg.firstDequeueTime = jsonObj.getLong("firstDequeueTime"); - msg.dequeueCount = jsonObj.getInt("dequeueCount"); - - return msg; - } - - /** - * 鎵归噺鑾峰彇娑堟伅 - * - * @param numOfMsg 鍑嗗鑾峰彇娑堟伅鏁� - * @param pollingWaitSeconds 璇锋眰鏈�闀跨殑Polling绛夊緟鏃堕棿 - * @return 鏈嶅姟鍣ㄨ繑鍥炴秷鎭垪琛� - * @throws CMQClientException - * @throws CMQServerException - */ - public List<Message> batchReceiveMessage(int numOfMsg, int pollingWaitSeconds) throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - param.put("numOfMsg",Integer.toString(numOfMsg)); - if(pollingWaitSeconds > 0) - { - param.put("UserpollingWaitSeconds",Integer.toString(pollingWaitSeconds*1000)); - param.put("pollingWaitSeconds", Integer.toString(pollingWaitSeconds) ); - } - else - { - param.put("UserpollingWaitSeconds",Integer.toString(30000)); - } - String result = this.client.call("BatchReceiveMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - - ArrayList<Message> vtMessage = new ArrayList<Message>(); - - JSONArray jsonArray = jsonObj.getJSONArray("msgInfoList"); - for(int i=0;i<jsonArray.length();i++) - { - JSONObject obj = (JSONObject)jsonArray.get(i); - Message msg = new Message(); - msg.msgId = obj.getString("msgId"); - msg.receiptHandle = obj.getString("receiptHandle"); - msg.msgBody = obj.getString("msgBody"); - msg.enqueueTime = obj.getLong("enqueueTime"); - msg.nextVisibleTime = obj.getLong("nextVisibleTime"); - msg.firstDequeueTime = obj.getLong("firstDequeueTime"); - msg.dequeueCount = obj.getInt("dequeueCount"); - - vtMessage.add(msg); - } - - return vtMessage; - } - - /** - * 鍒犻櫎娑堟伅 - * - * @param receiptHandle 娑堟伅鍙ユ焺,鑾峰彇娑堟伅鏃剁敱鏈嶅姟鍣ㄨ繑鍥� - * @throws CMQClientException - * @throws CMQServerException - */ - public void deleteMessage(String receiptHandle) throws Exception { - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - param.put("receiptHandle",receiptHandle); - - String result = this.client.call("DeleteMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - } - - /** - * 鎵归噺鍒犻櫎娑堟伅 - * - * @param receiptHandle 娑堟伅鍙ユ焺鍒楄〃锛岃幏鍙栨秷鎭椂鐢辨湇鍔″櫒杩斿洖 - * @throws CMQClientException - * @throws CMQServerException - */ - public void batchDeleteMessage(List<String> vtReceiptHandle) throws Exception { - if(vtReceiptHandle.isEmpty()) - return; - - TreeMap<String, String> param = new TreeMap<String, String>(); - - param.put("queueName",this.queueName); - for(int i=0;i<vtReceiptHandle.size();i++) - { - String k = "receiptHandle." + Integer.toString(i+1); - param.put(k,vtReceiptHandle.get(i)); - } - - String result = this.client.call("BatchDeleteMessage", param); - JSONObject jsonObj = new JSONObject(result); - int code = jsonObj.getInt("code"); - if(code != 0) - throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); - } -} +package com.qcloud.cmq; + +import java.util.List; +import java.util.ArrayList; +import java.util.TreeMap; +import java.lang.Integer; + +import com.qcloud.cmq.*; +import com.qcloud.cmq.Json.*; + +/** + * Queue class. + * + * @author York. + * Created 2016骞�9鏈�26鏃�. + */ +public class Queue{ + protected String queueName; + protected CMQClient client; + + + Queue(String queueName, CMQClient client){ + this.queueName = queueName; + this.client = client; + } + + /** + * 璁剧疆闃熷垪灞炴�� + * + * @param meta 闃熷垪灞炴�у弬鏁� + * @throws CMQClientException + * @throws CMQServerException + */ + public void setQueueAttributes(QueueMeta meta) throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + + if(meta.maxMsgHeapNum > 0) + param.put("maxMsgHeapNum",Integer.toString(meta.maxMsgHeapNum)); + if(meta.pollingWaitSeconds > 0) + param.put("pollingWaitSeconds",Integer.toString(meta.pollingWaitSeconds)); + if(meta.visibilityTimeout > 0) + param.put("visibilityTimeout",Integer.toString(meta.visibilityTimeout)); + if(meta.maxMsgSize > 0) + param.put("maxMsgSize",Integer.toString(meta.maxMsgSize)); + if(meta.msgRetentionSeconds > 0) + param.put("msgRetentionSeconds",Integer.toString(meta.msgRetentionSeconds)); + + String result = this.client.call("SetQueueAttributes", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + } + + /** + * 鑾峰彇闃熷垪灞炴�� + * + * @return 杩斿洖鐨勯槦鍒楀睘鎬у弬鏁� + * @throws CMQClientException + * @throws CMQServerException + */ + public QueueMeta getQueueAttributes() throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + String result = this.client.call("GetQueueAttributes", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + + QueueMeta meta = new QueueMeta(); + meta.maxMsgHeapNum = jsonObj.getInt("maxMsgHeapNum"); + meta.pollingWaitSeconds = jsonObj.getInt("pollingWaitSeconds"); + meta.visibilityTimeout = jsonObj.getInt("visibilityTimeout"); + meta.maxMsgSize = jsonObj.getInt("maxMsgSize"); + meta.msgRetentionSeconds = jsonObj.getInt("msgRetentionSeconds"); + meta.createTime = jsonObj.getInt("createTime"); + meta.lastModifyTime = jsonObj.getInt("lastModifyTime"); + meta.activeMsgNum = jsonObj.getInt("activeMsgNum"); + meta.inactiveMsgNum = jsonObj.getInt("inactiveMsgNum"); + meta.rewindmsgNum = jsonObj.getInt("rewindMsgNum"); + meta.minMsgTime = jsonObj.getInt("minMsgTime"); + meta.delayMsgNum = jsonObj.getInt("delayMsgNum"); + + + return meta; + } + + /** + * 鍙戦�佹秷鎭� + * + * @param msgBody 娑堟伅鍐呭 + * @return 鏈嶅姟鍣ㄨ繑鍥炵殑娑堟伅鍞竴鏍囪瘑 + * @throws CMQClientException + * @throws CMQServerException + */ + public String sendMessage(String msgBody) throws Exception { + return sendMessage(msgBody, 0); + } + public String sendMessage(String msgBody ,int delayTime) throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + param.put("msgBody",msgBody); + param.put("delaySeconds",Integer.toString(delayTime)); + + String result = this.client.call("SendMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + + return jsonObj.getString("msgId"); + } + + /** + * 鎵归噺鍙戦�佹秷鎭� + * + * @param vtMsgBody 娑堟伅鍒楄〃 + * @return 鏈嶅姟鍣ㄨ繑鍥炵殑娑堟伅鍞竴鏍囪瘑鍒楄〃 + * @throws CMQClientException + * @throws CMQServerException + */ + public List<String> batchSendMessage(List<String> vtMsgBody) throws Exception { + return batchSendMessage(vtMsgBody, 0); + } + public List<String> batchSendMessage(List<String> vtMsgBody,int delayTime) throws Exception { + + if(vtMsgBody.isEmpty() || vtMsgBody.size() > 16) + throw new CMQClientException("Error: message size is empty or more than 16"); + + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + for(int i=0;i<vtMsgBody.size();i++) + { + String k = "msgBody." + Integer.toString(i+1); + param.put(k,vtMsgBody.get(i)); + } + param.put("delaySeconds", Integer.toString(delayTime)); + + String result = this.client.call("BatchSendMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + + ArrayList<String> vtMsgId = new ArrayList<String>(); + JSONArray jsonArray = jsonObj.getJSONArray("msgList"); + for(int i=0;i<jsonArray.length();i++) + { + JSONObject obj = (JSONObject)jsonArray.get(i); + vtMsgId.add(obj.getString("msgId")); + } + + return vtMsgId; + } + + /** + * 鑾峰彇娑堟伅 + * + * @param pollingWaitSeconds 璇锋眰鏈�闀跨殑Polling绛夊緟鏃堕棿 + * @return 鏈嶅姟鍣ㄨ繑鍥炴秷鎭� + * @throws CMQClientException + * @throws CMQServerException + */ + public Message receiveMessage(int pollingWaitSeconds) throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + if(pollingWaitSeconds > 0) + { + param.put("UserpollingWaitSeconds",Integer.toString(pollingWaitSeconds *1000)); + param.put("pollingWaitSeconds", Integer.toString(pollingWaitSeconds) ); + } + else + { + param.put("UserpollingWaitSeconds",Integer.toString(30000)); + } + + String result = this.client.call("ReceiveMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + + Message msg = new Message(); + msg.msgId = jsonObj.getString("msgId"); + msg.receiptHandle = jsonObj.getString("receiptHandle"); + msg.msgBody = jsonObj.getString("msgBody"); + msg.enqueueTime = jsonObj.getLong("enqueueTime"); + msg.nextVisibleTime = jsonObj.getLong("nextVisibleTime"); + msg.firstDequeueTime = jsonObj.getLong("firstDequeueTime"); + msg.dequeueCount = jsonObj.getInt("dequeueCount"); + + return msg; + } + + /** + * 鎵归噺鑾峰彇娑堟伅 + * + * @param numOfMsg 鍑嗗鑾峰彇娑堟伅鏁� + * @param pollingWaitSeconds 璇锋眰鏈�闀跨殑Polling绛夊緟鏃堕棿 + * @return 鏈嶅姟鍣ㄨ繑鍥炴秷鎭垪琛� + * @throws CMQClientException + * @throws CMQServerException + */ + public List<Message> batchReceiveMessage(int numOfMsg, int pollingWaitSeconds) throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + param.put("numOfMsg",Integer.toString(numOfMsg)); + if(pollingWaitSeconds > 0) + { + param.put("UserpollingWaitSeconds",Integer.toString(pollingWaitSeconds*1000)); + param.put("pollingWaitSeconds", Integer.toString(pollingWaitSeconds) ); + } + else + { + param.put("UserpollingWaitSeconds",Integer.toString(30000)); + } + String result = this.client.call("BatchReceiveMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + + ArrayList<Message> vtMessage = new ArrayList<Message>(); + + JSONArray jsonArray = jsonObj.getJSONArray("msgInfoList"); + for(int i=0;i<jsonArray.length();i++) + { + JSONObject obj = (JSONObject)jsonArray.get(i); + Message msg = new Message(); + msg.msgId = obj.getString("msgId"); + msg.receiptHandle = obj.getString("receiptHandle"); + msg.msgBody = obj.getString("msgBody"); + msg.enqueueTime = obj.getLong("enqueueTime"); + msg.nextVisibleTime = obj.getLong("nextVisibleTime"); + msg.firstDequeueTime = obj.getLong("firstDequeueTime"); + msg.dequeueCount = obj.getInt("dequeueCount"); + + vtMessage.add(msg); + } + + return vtMessage; + } + + /** + * 鍒犻櫎娑堟伅 + * + * @param receiptHandle 娑堟伅鍙ユ焺,鑾峰彇娑堟伅鏃剁敱鏈嶅姟鍣ㄨ繑鍥� + * @throws CMQClientException + * @throws CMQServerException + */ + public void deleteMessage(String receiptHandle) throws Exception { + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + param.put("receiptHandle",receiptHandle); + + String result = this.client.call("DeleteMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + } + + /** + * 鎵归噺鍒犻櫎娑堟伅 + * + * @param receiptHandle 娑堟伅鍙ユ焺鍒楄〃锛岃幏鍙栨秷鎭椂鐢辨湇鍔″櫒杩斿洖 + * @throws CMQClientException + * @throws CMQServerException + */ + public void batchDeleteMessage(List<String> vtReceiptHandle) throws Exception { + if(vtReceiptHandle.isEmpty()) + return; + + TreeMap<String, String> param = new TreeMap<String, String>(); + + param.put("queueName",this.queueName); + for(int i=0;i<vtReceiptHandle.size();i++) + { + String k = "receiptHandle." + Integer.toString(i+1); + param.put(k,vtReceiptHandle.get(i)); + } + + String result = this.client.call("BatchDeleteMessage", param); + JSONObject jsonObj = new JSONObject(result); + int code = jsonObj.getInt("code"); + if(code != 0) + throw new CMQServerException(code,jsonObj.getString("message"),jsonObj.getString("requestId")); + } +} -- Gitblit v1.8.0