From 2f5935ed11672046c37f733d855214f6147b4b58 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期一, 28 三月 2022 11:33:19 +0800
Subject: [PATCH] TDMQ兼容

---
 fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/order/PlaceOrderCMQManager.java |  148 +++++++++++++++++++++++++-----------------------
 1 files changed, 77 insertions(+), 71 deletions(-)

diff --git a/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/order/PlaceOrderCMQManager.java b/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/order/PlaceOrderCMQManager.java
index 5e72363..72b9754 100644
--- a/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/order/PlaceOrderCMQManager.java
+++ b/fanli/src/main/java/com/yeshi/fanli/util/mq/cmq/order/PlaceOrderCMQManager.java
@@ -1,92 +1,98 @@
 package com.yeshi.fanli.util.mq.cmq.order;
 
+import com.google.gson.Gson;
+import com.qcloud.cmq.Message;
+import com.yeshi.fanli.entity.bus.user.Order;
+import com.yeshi.fanli.util.Constant;
+import com.yeshi.fanli.util.mq.cmq.TDMQUtil;
+import org.yeshi.utils.CMQUtil;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.yeshi.utils.CMQUtil;
-
-import com.google.gson.Gson;
-import com.qcloud.cmq.Message;
-import com.yeshi.fanli.entity.bus.user.Order;
-import com.yeshi.fanli.entity.order.CommonOrder;
-
 public class PlaceOrderCMQManager {
 
-	private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
-	private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
-	private static PlaceOrderCMQManager placeOrderCMQManager;
-	private static CMQUtil cmqUtil;
+    private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
+    private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
+    private static PlaceOrderCMQManager placeOrderCMQManager;
+    private static CMQUtil cmqUtil;
 
-	private final static String TOPIC_NAME = "topic_place_order";
+    private final static String TOPIC_NAME = "topic_place_order";
 
-	public static String QUEUE_AD = TOPIC_NAME + "_" + "ad";
-	public static String QUEUE_INTEGRAL = TOPIC_NAME + "_" + "integral";
+    public static String QUEUE_AD = TOPIC_NAME + "_" + "ad";
+    public static String QUEUE_INTEGRAL = TOPIC_NAME + "_" + "integral";
 
-	public static String SUBSCRIBE_AD = "ad";
-	public static String SUBSCRIBE_INTEGRAL = "integral";
+    public static String SUBSCRIBE_AD = "ad";
+    public static String SUBSCRIBE_INTEGRAL = "integral";
 
-	static {
-		cmqUtil = CMQUtil.getInstance(secretId, secretKey);
-		// 鍒涘缓涓婚锛屾坊鍔犺闃�
-		cmqUtil.createTopic(TOPIC_NAME);
-		// 鐢ㄦ埛鍒歌闃�
-		String[] subscripts = new String[] { SUBSCRIBE_AD, SUBSCRIBE_INTEGRAL };
-		String[] queues = new String[] { QUEUE_AD, QUEUE_INTEGRAL };
+    static {
+        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
+        TDMQUtil.getInstance().init(secretId, secretKey, Constant.TDMQ_PUBLIC);
 
-		for (int i = 0; i < subscripts.length; i++) {
-			String queueName = queues[i];
-			try {
-				cmqUtil.createQueue(queueName);
-			} catch (Exception e) {
-			}
-			try {
-				cmqUtil.subscribeTopic(TOPIC_NAME, subscripts[i], queueName);
-			} catch (Exception e) {
+        // 鍒涘缓涓婚锛屾坊鍔犺闃�
+        TDMQUtil.getInstance().createTopic(TOPIC_NAME);
+        // 鐢ㄦ埛鍒歌闃�
+        String[] subscripts = new String[]{SUBSCRIBE_AD, SUBSCRIBE_INTEGRAL};
+        String[] queues = new String[]{QUEUE_AD, QUEUE_INTEGRAL};
 
-			}
-		}
-	}
+        for (int i = 0; i < subscripts.length; i++) {
+            String queueName = queues[i];
+            try {
+                TDMQUtil.getInstance().createQueue(queueName);
+            } catch (Exception e) {
+            }
+            try {
+                TDMQUtil.getInstance().subscribeTopic(TOPIC_NAME, subscripts[i], queueName);
+            } catch (Exception e) {
 
-	public static PlaceOrderCMQManager getInstance() {
-		if (placeOrderCMQManager == null)
-			placeOrderCMQManager = new PlaceOrderCMQManager();
-		return placeOrderCMQManager;
-	}
+            }
+        }
+    }
 
-	/**
-	 * 涓嬪崟
-	 * 
-	 * @param order锛堝彧闇�瑕佽鍗曞彿涓巗ourceType锛�
-	 */
-	public void addPlaceOrderMsg(Order order) {
-		if (order == null)
-			return;
-		cmqUtil.publishTopicMessage(TOPIC_NAME, new Gson().toJson(order));
-	}
+    public static PlaceOrderCMQManager getInstance() {
+        if (placeOrderCMQManager == null)
+            placeOrderCMQManager = new PlaceOrderCMQManager();
+        return placeOrderCMQManager;
+    }
 
-	/**
-	 * 娑堣垂闃熷垪娑堟伅
-	 * 
-	 * @param queueName
-	 * @param count
-	 * @return
-	 */
-	public Map<String, Order> consumeQueueMsg(String queueName, int count) {
-		List<Message> list = cmqUtil.recieveMsg(count, queueName);
-		Map<String, Order> map = new HashMap<>();
+    /**
+     * 涓嬪崟
+     *
+     * @param order锛堝彧闇�瑕佽鍗曞彿涓巗ourceType锛�
+     */
+    public void addPlaceOrderMsg(Order order) {
+        if (order == null)
+            return;
+        TDMQUtil.getInstance().publishTopicMessage(TOPIC_NAME, new Gson().toJson(order));
+    }
 
-		if (list != null)
-			for (Message msg : list) {
-				String result = msg.msgBody;
-				Order dto = new Gson().fromJson(result, Order.class);
-				map.put(msg.receiptHandle, dto);
-			}
-		return map;
-	}
+    /**
+     * 娑堣垂闃熷垪娑堟伅
+     *
+     * @param queueName
+     * @param count
+     * @return
+     */
+    public Map<String, Order> consumeQueueMsg(String queueName, int count) {
+        List<Message> list = Constant.TDMQ_CONSUMER ? TDMQUtil.getInstance().recieveMsg(count, queueName) : cmqUtil.recieveMsg(count, queueName);
+        Map<String, Order> map = new HashMap<>();
 
-	public void deleteQueueMsg(String queueName, String receiptHandle) {
-		cmqUtil.deleteMsg(queueName, receiptHandle);
-	}
+        if (list != null)
+            for (Message msg : list) {
+                String result = msg.msgBody;
+                Order dto = new Gson().fromJson(result, Order.class);
+                map.put(msg.receiptHandle, dto);
+            }
+        return map;
+    }
+
+    public void deleteQueueMsg(String queueName, String receiptHandle) {
+        if (Constant.TDMQ_CONSUMER) {
+            TDMQUtil.getInstance().deleteMsg(queueName, receiptHandle);
+        } else {
+            cmqUtil.deleteMsg(queueName, receiptHandle);
+        }
+    }
 
 }

--
Gitblit v1.8.0