From b9b263043cbb2e028017b9a5626c664b54513749 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期一, 14 十月 2024 17:46:11 +0800 Subject: [PATCH] 将CMQ替换为rabbitmq --- fanli/src/main/java/com/yeshi/fanli/util/CMQManager.java | 199 ++++++++----------------------------------------- 1 files changed, 32 insertions(+), 167 deletions(-) diff --git a/fanli/src/main/java/com/yeshi/fanli/util/CMQManager.java b/fanli/src/main/java/com/yeshi/fanli/util/CMQManager.java index 1a79756..17a352c 100644 --- a/fanli/src/main/java/com/yeshi/fanli/util/CMQManager.java +++ b/fanli/src/main/java/com/yeshi/fanli/util/CMQManager.java @@ -13,7 +13,6 @@ import com.yeshi.fanli.entity.push.PushQueueRecord; import com.yeshi.fanli.entity.taobao.TaoBaoWeiQuanOrder; import com.yeshi.fanli.util.mq.cmq.TDMQUtil; -import org.yeshi.utils.CMQUtil; import java.util.ArrayList; import java.util.HashMap; @@ -24,7 +23,6 @@ private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25"; private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo"; private static CMQManager cmqManager; - private static CMQUtil cmqUtil; private static TDMQUtil tdmqUtil; // 璁㈠崟澶勭悊闃熷垪 public static String QUEUENAME_ORDER = "fanli-orders"; @@ -60,20 +58,6 @@ static { - if (Constant.IS_TEST) { - QUEUENAME_ORDER = "test-" + QUEUENAME_ORDER; - QUEUENAME_NEW_ORDER = "test-" + QUEUENAME_NEW_ORDER; - EXTRACT_RESULT = "test-" + EXTRACT_RESULT; - PUSH_IOS = "test-" + PUSH_IOS; - FANLI_ORDER = "test-" + FANLI_ORDER; - FANLI_ORDER_NEW = "test-" + FANLI_ORDER_NEW; - - ORDER_WEIQUAN = "test-" + ORDER_WEIQUAN; - IMPORTANT_GOODS_UPDATE = "test-" + IMPORTANT_GOODS_UPDATE; - TEJIA_VIP = "test-" + TEJIA_VIP; - FANLI_SHARE_ORDER = "test-" + FANLI_SHARE_ORDER; - } - QUEUENAME_ORDER += "-" + Constant.systemCommonConfig.getProjectName(); QUEUENAME_NEW_ORDER += "-" + Constant.systemCommonConfig.getProjectName(); EXTRACT_RESULT += "-" + Constant.systemCommonConfig.getProjectName(); @@ -85,7 +69,6 @@ FANLI_SHARE_ORDER += "-" + Constant.systemCommonConfig.getProjectName(); TEJIA_VIP += "-" + Constant.systemCommonConfig.getProjectName(); - cmqUtil = CMQUtil.getInstance(secretId, secretKey); TDMQUtil.getInstance().init(secretId, secretKey, Constant.TDMQ_PUBLIC); tdmqUtil = TDMQUtil.getInstance(); // 鏈�澶ф秷鎭负1M @@ -110,100 +93,6 @@ return cmqManager; } - /** - * 鍙戦�佽鍗曟秷鎭� - * - * @param orderId - */ - public void addTaoBaoOrderMsg(String orderId) { - if (!StringUtil.isNullOrEmpty(orderId)) { - tdmqUtil.sendMsg(QUEUENAME_ORDER, orderId); - } - } - - /** - * 娑堣垂娣樺疂璁㈠崟娑堟伅 - * - * @param count - */ - public List<String> consumeTaoBaoOrderMsg(int count) { - List<String> resultList = new ArrayList<>(); - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_ORDER) : cmqUtil.recieveMsg(count, QUEUENAME_ORDER); - - if (list != null) - for (Message msg : list) { - String result = msg.msgBody; - resultList.add(result); - // 鍒犻櫎娑堟伅 - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(QUEUENAME_ORDER, msg.receiptHandle); - } else { - cmqUtil.deleteMsg(QUEUENAME_ORDER, msg.receiptHandle); - } - } - return resultList; - } - - /** - * 鍒犻櫎娣樺疂璁㈠崟娑堟伅 - * - * @param receiptHandle - */ - public void deleteTaoBaoOrderMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(QUEUENAME_ORDER, receiptHandle); - } else { - cmqUtil.deleteMsg(QUEUENAME_ORDER, receiptHandle); - } - } - - /** - * 鍙戦�佽鍗曟秷鎭紙鍒氬垰浜х敓鐨勮鍗曪級 - * - * @param orderId - */ - public void addTaoBaoNewOrderMsg(String orderId) { - if (!StringUtil.isNullOrEmpty(orderId)) { - tdmqUtil.sendMsg(QUEUENAME_NEW_ORDER, orderId); - } - } - - /** - * 娑堣垂娣樺疂璁㈠崟娑堟伅锛堝垰鍒氫骇鐢熺殑璁㈠崟锛� - * - * @param count - */ - public List<String> consumeTaoBaoNewOrderMsg(int count) { - List<String> resultList = new ArrayList<>(); - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_NEW_ORDER) : cmqUtil.recieveMsg(count, QUEUENAME_NEW_ORDER); - - if (list != null) - for (Message msg : list) { - String result = msg.msgBody; - resultList.add(result); - // 鍒犻櫎娑堟伅 - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(QUEUENAME_NEW_ORDER, msg.receiptHandle); - } else { - cmqUtil.deleteMsg(QUEUENAME_NEW_ORDER, msg.receiptHandle); - } - } - return resultList; - } - - - /** - * 鍒犻櫎娣樺疂璁㈠崟娑堟伅锛堝垰鍒氫骇鐢熺殑璁㈠崟锛� - * - * @param receiptHandle - */ - public void deleteTaoBaoNewOrderMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(QUEUENAME_NEW_ORDER, receiptHandle); - } else { - cmqUtil.deleteMsg(QUEUENAME_NEW_ORDER, receiptHandle); - } - } /** * 娣诲姞鎻愮幇缁撴灉娑堟伅 @@ -224,7 +113,7 @@ * @return */ public Map<String, AlipayTransferResultInfo> consumeExtractResultMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, EXTRACT_RESULT) : cmqUtil.recieveMsg(count, EXTRACT_RESULT); + List<Message> list = tdmqUtil.recieveMsg(count, EXTRACT_RESULT); Map<String, AlipayTransferResultInfo> map = new HashMap<>(); if (list != null) @@ -245,11 +134,9 @@ * @param receiptHandle */ public void deleteExtractResultMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(EXTRACT_RESULT, receiptHandle); - } else { - cmqUtil.deleteMsg(EXTRACT_RESULT, receiptHandle); - } + + tdmqUtil.deleteMsg(EXTRACT_RESULT, receiptHandle); + } // 璁㈠崟杩斿埄 @@ -265,7 +152,7 @@ } public Map<String, HongBao> consumeFanLiMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, FANLI_ORDER) : cmqUtil.recieveMsg(count, FANLI_ORDER); + List<Message> list = tdmqUtil.recieveMsg(count, FANLI_ORDER); Map<String, HongBao> map = new HashMap<>(); if (list != null) for (Message msg : list) { @@ -279,7 +166,7 @@ } public Map<String, HongBaoV2> consumeFanLiMsgNew(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, FANLI_ORDER_NEW) : cmqUtil.recieveMsg(count, FANLI_ORDER_NEW); + List<Message> list = tdmqUtil.recieveMsg(count, FANLI_ORDER_NEW); Map<String, HongBaoV2> map = new HashMap<>(); if (list != null) for (Message msg : list) { @@ -293,19 +180,15 @@ } public void deleteFanLiMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(FANLI_ORDER, receiptHandle); - } else { - cmqUtil.deleteMsg(FANLI_ORDER, receiptHandle); - } + + tdmqUtil.deleteMsg(FANLI_ORDER, receiptHandle); + } public void deleteFanLiMsgNew(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(FANLI_ORDER_NEW, receiptHandle); - } else { - cmqUtil.deleteMsg(FANLI_ORDER_NEW, receiptHandle); - } + + tdmqUtil.deleteMsg(FANLI_ORDER_NEW, receiptHandle); + } @@ -315,7 +198,7 @@ } public Map<String, UidDateDTO> consumeFanLiShareMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, FANLI_SHARE_ORDER) : cmqUtil.recieveMsg(count, FANLI_SHARE_ORDER); + List<Message> list = tdmqUtil.recieveMsg(count, FANLI_SHARE_ORDER); Map<String, UidDateDTO> map = new HashMap<>(); if (list != null) for (Message msg : list) { @@ -329,11 +212,7 @@ } public void deleteFanLiShareMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(FANLI_SHARE_ORDER, receiptHandle); - } else { - cmqUtil.deleteMsg(FANLI_SHARE_ORDER, receiptHandle); - } + tdmqUtil.deleteMsg(FANLI_SHARE_ORDER, receiptHandle); } // 缁存潈璁㈠崟 @@ -342,7 +221,7 @@ } public Map<String, TaoBaoWeiQuanOrder> consumeWeiQuanOrderMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, ORDER_WEIQUAN) : cmqUtil.recieveMsg(count, ORDER_WEIQUAN); + List<Message> list = tdmqUtil.recieveMsg(count, ORDER_WEIQUAN); Map<String, TaoBaoWeiQuanOrder> map = new HashMap<>(); Gson gson = new Gson(); if (list != null) @@ -356,11 +235,7 @@ } public void deleteWeiQuanOrderMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(ORDER_WEIQUAN, receiptHandle); - } else { - cmqUtil.deleteMsg(ORDER_WEIQUAN, receiptHandle); - } + tdmqUtil.deleteMsg(ORDER_WEIQUAN, receiptHandle); } /** @@ -380,7 +255,7 @@ * @param count */ public Map<String, PushQueueRecord> consumeIOSPushMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, PUSH_IOS) : cmqUtil.recieveMsg(count, PUSH_IOS); + List<Message> list = tdmqUtil.recieveMsg(count, PUSH_IOS); if (list == null) { return null; } @@ -405,11 +280,7 @@ * @param receiptHandle */ public void deleteIOSPushMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(PUSH_IOS, receiptHandle); - } else { - cmqUtil.deleteMsg(PUSH_IOS, receiptHandle); - } + tdmqUtil.deleteMsg(PUSH_IOS, receiptHandle); } /** @@ -417,7 +288,7 @@ * * @param goodsId */ - public void addNeedUpdateTaoBaoGoodsId(Long goodsId) { + public void addNeedUpdateTaoBaoGoodsId(String goodsId) { tdmqUtil.sendMsg(GOODS_UPDATE, goodsId + ""); } @@ -426,17 +297,17 @@ * * @param count */ - public Map<String, Long> consumeNeedUpdateTaoBaoGoodsIdMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, GOODS_UPDATE) : cmqUtil.recieveMsg(count, GOODS_UPDATE); + public Map<String, String> consumeNeedUpdateTaoBaoGoodsIdMsg(int count) { + List<Message> list = tdmqUtil.recieveMsg(count, GOODS_UPDATE); if (list == null) { return null; } - Map<String, Long> map = new HashMap<>(); + Map<String, String> map = new HashMap<>(); for (Message msg : list) { String result = msg.msgBody; if (!StringUtil.isNullOrEmpty(result)) { - map.put(msg.receiptHandle, Long.parseLong(result)); + map.put(msg.receiptHandle, result); } } @@ -450,11 +321,9 @@ * @param receiptHandle */ public void deleteNeedUpdateTaoBaoGoodsIdMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(GOODS_UPDATE, receiptHandle); - } else { - cmqUtil.deleteMsg(GOODS_UPDATE, receiptHandle); - } + + tdmqUtil.deleteMsg(GOODS_UPDATE, receiptHandle); + } /** @@ -462,7 +331,7 @@ * * @param actionId */ - public void addTBImpGoodsUpdate(Long actionId) { + public void addTBImpGoodsUpdate(String actionId) { tdmqUtil.sendMsg(IMPORTANT_GOODS_UPDATE, actionId + ""); } @@ -472,17 +341,17 @@ * @param count * @return */ - public Map<String, Long> consumeTBImpGoodsUpdateMsg(int count) { - List<Message> list = Constant.TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, IMPORTANT_GOODS_UPDATE) : cmqUtil.recieveMsg(count, IMPORTANT_GOODS_UPDATE); + public Map<String, String> consumeTBImpGoodsUpdateMsg(int count) { + List<Message> list = tdmqUtil.recieveMsg(count, IMPORTANT_GOODS_UPDATE); if (list == null) { return null; } - Map<String, Long> map = new HashMap<>(); + Map<String, String> map = new HashMap<>(); for (Message msg : list) { String result = msg.msgBody; if (!StringUtil.isNullOrEmpty(result)) { - map.put(msg.receiptHandle, Long.parseLong(result)); + map.put(msg.receiptHandle, result); } } return map; @@ -494,11 +363,7 @@ * @param receiptHandle */ public void deleteTBImpGoodsUpdateMsg(String receiptHandle) { - if (Constant.TDMQ_CONSUMER) { - tdmqUtil.deleteMsg(IMPORTANT_GOODS_UPDATE, receiptHandle); - } else { - cmqUtil.deleteMsg(IMPORTANT_GOODS_UPDATE, receiptHandle); - } + tdmqUtil.deleteMsg(IMPORTANT_GOODS_UPDATE, receiptHandle); } } -- Gitblit v1.8.0