From 2f5935ed11672046c37f733d855214f6147b4b58 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期一, 28 三月 2022 11:33:19 +0800 Subject: [PATCH] TDMQ兼容 --- fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/HongBaoRecieveCMQManager.java | 132 +++++++++++++++++++++++-------------------- 1 files changed, 70 insertions(+), 62 deletions(-) diff --git a/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/HongBaoRecieveCMQManager.java b/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/HongBaoRecieveCMQManager.java index f5de356..e27ab09 100644 --- a/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/HongBaoRecieveCMQManager.java +++ b/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/HongBaoRecieveCMQManager.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Map; +import com.yeshi.fanli.util.Constant; import org.yeshi.utils.CMQUtil; import com.qcloud.cmq.Message; @@ -11,78 +12,85 @@ public class HongBaoRecieveCMQManager { - private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25"; - private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo"; - private static HongBaoRecieveCMQManager userMoneyChangeCMQManager; - private static CMQUtil cmqUtil; + private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25"; + private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo"; + private static HongBaoRecieveCMQManager userMoneyChangeCMQManager; + private static CMQUtil cmqUtil; - private final static String TOPIC_NAME = "topic_hongbao"; + private final static String TOPIC_NAME = "topic_hongbao"; - public static String QUEUE_INTEGRAL = TOPIC_NAME + "_" + "integral"; + public static String QUEUE_INTEGRAL = TOPIC_NAME + "_" + "integral"; - public static String SUBSCRIBE_INTEGRAL = "integral"; + public static String SUBSCRIBE_INTEGRAL = "integral"; - static { - cmqUtil = CMQUtil.getInstance(secretId, secretKey); - // 鍒涘缓涓婚锛屾坊鍔犺闃� - cmqUtil.createTopic(TOPIC_NAME); - // 鐢ㄦ埛鍒歌闃� - String[] subscripts = new String[] { SUBSCRIBE_INTEGRAL }; - String[] queues = new String[] { QUEUE_INTEGRAL }; + static { + cmqUtil = CMQUtil.getInstance(secretId, secretKey); - for (int i = 0; i < subscripts.length; i++) { - String queueName = queues[i]; - try { - cmqUtil.createQueue(queueName); - } catch (Exception e) { - } - try { - cmqUtil.subscribeTopic(TOPIC_NAME, subscripts[i], queueName); - } catch (Exception e) { + TDMQUtil.getInstance().init(secretId, secretKey, Constant.TDMQ_PUBLIC); - } - } - } + // 鍒涘缓涓婚锛屾坊鍔犺闃� + TDMQUtil.getInstance().createTopic(TOPIC_NAME); + // 鐢ㄦ埛鍒歌闃� + String[] subscripts = new String[]{SUBSCRIBE_INTEGRAL}; + String[] queues = new String[]{QUEUE_INTEGRAL}; - public static HongBaoRecieveCMQManager getInstance() { - if (userMoneyChangeCMQManager == null) - userMoneyChangeCMQManager = new HongBaoRecieveCMQManager(); - return userMoneyChangeCMQManager; - } + for (int i = 0; i < subscripts.length; i++) { + String queueName = queues[i]; + try { + TDMQUtil.getInstance().createQueue(queueName); + } catch (Exception e) { + } + try { + TDMQUtil.getInstance().subscribeTopic(TOPIC_NAME, subscripts[i], queueName); + } catch (Exception e) { - /** - * 鍙戝竷 - * - * @param hongBaoId - */ - public void addHongBaoRecieveMsg(Long hongBaoId) { - if (hongBaoId == null) - return; - cmqUtil.publishTopicMessage(TOPIC_NAME, hongBaoId + ""); - LogHelper.test("绾㈠寘娑堟伅鎶曢�掓垚鍔�"); - } + } + } + } - /** - * 娑堣垂闃熷垪娑堟伅 - * - * @param queueName - * @param count - * @return - */ - public Map<String, Long> consumeQueueMsg(String queueName, int count) { - List<Message> list = cmqUtil.recieveMsg(count, queueName); - Map<String, Long> map = new HashMap<>(); + public static HongBaoRecieveCMQManager getInstance() { + if (userMoneyChangeCMQManager == null) + userMoneyChangeCMQManager = new HongBaoRecieveCMQManager(); + return userMoneyChangeCMQManager; + } - if (list != null) - for (Message msg : list) { - String result = msg.msgBody; - map.put(msg.receiptHandle, Long.parseLong(result)); - } - return map; - } + /** + * 鍙戝竷 + * + * @param hongBaoId + */ + public void addHongBaoRecieveMsg(Long hongBaoId) { + if (hongBaoId == null) + return; + TDMQUtil.getInstance().publishTopicMessage(TOPIC_NAME, hongBaoId + ""); + LogHelper.test("绾㈠寘娑堟伅鎶曢�掓垚鍔�"); + } - public void deleteQueueMsg(String queueName, String receiptHandle) { - cmqUtil.deleteMsg(queueName, receiptHandle); - } + /** + * 娑堣垂闃熷垪娑堟伅 + * + * @param queueName + * @param count + * @return + */ + public Map<String, Long> consumeQueueMsg(String queueName, int count) { + List<Message> list = Constant.TDMQ_CONSUMER ? TDMQUtil.getInstance().recieveMsg(count, queueName) : cmqUtil.recieveMsg(count, queueName); + Map<String, Long> map = new HashMap<>(); + + if (list != null) + for (Message msg : list) { + String result = msg.msgBody; + map.put(msg.receiptHandle, Long.parseLong(result)); + } + return map; + } + + public void deleteQueueMsg(String queueName, String receiptHandle) { + if (Constant.TDMQ_CONSUMER) { + TDMQUtil.getInstance().deleteMsg(queueName, receiptHandle); + } else { + cmqUtil.deleteMsg(queueName, receiptHandle); + } + } } -- Gitblit v1.8.0