From ea68e8f8c00c0178037a9f1cc6dc1663bde913b8 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期六, 03 七月 2021 17:49:35 +0800
Subject: [PATCH] 推送服务完善

---
 service-push/src/main/java/com/ks/push/manager/CMQManager.java |  149 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 147 insertions(+), 2 deletions(-)

diff --git a/service-push/src/main/java/com/ks/push/manager/CMQManager.java b/service-push/src/main/java/com/ks/push/manager/CMQManager.java
index 587f8a5..582c968 100644
--- a/service-push/src/main/java/com/ks/push/manager/CMQManager.java
+++ b/service-push/src/main/java/com/ks/push/manager/CMQManager.java
@@ -1,12 +1,80 @@
 package com.ks.push.manager;
 
+import com.google.gson.Gson;
 import com.ks.push.dto.BPushDeviceDataSet;
 import com.ks.push.pojo.DO.PushPlatform;
+import com.qcloud.cmq.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.yeshi.utils.CMQUtil;
 
-@Component
+import java.util.ArrayList;
+import java.util.List;
+
 public class CMQManager {
+    static Logger logger = LoggerFactory.getLogger(CMQManager.class);
 
+    private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
+    private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
+
+    private static CMQManager cmqManager;
+    private static CMQUtil cmqUtil;
+    /**
+     * 灏忕背鎺ㄩ�侀槦鍒�
+     */
+    public static String PUSH_XM = "bpush-xm";
+    /**
+     * 鍗庝负鎺ㄩ�侀槦鍒�
+     */
+    public static String PUSH_HUAWEI = "bpush-huawei";
+    /**
+     * oppo鎺ㄩ�侀槦鍒�
+     */
+    public static String PUSH_OPPO = "bpush-oppo";
+    /**
+     * vivo鎺ㄩ�侀槦鍒�
+     */
+    public static String PUSH_VIVO = "bpush-vivo";
+    /**
+     * 榄呮棌鎺ㄩ�侀槦鍒�
+     */
+    public static String PUSH_MZ = "bpush-mz";
+
+    static {
+        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
+        // 鏈�澶ф秷鎭负1M
+        cmqUtil.createQueue(PUSH_XM);
+        cmqUtil.createQueue(PUSH_HUAWEI);
+        cmqUtil.createQueue(PUSH_OPPO);
+        cmqUtil.createQueue(PUSH_VIVO);
+        cmqUtil.createQueue(PUSH_MZ);
+        logger.info("鍒涘缓闃熷垪瀹屾瘯");
+    }
+
+
+    public static CMQManager getInstance() {
+        if (cmqManager == null) {
+            cmqManager = new CMQManager();
+        }
+        return cmqManager;
+    }
+
+    private String getQueueName(PushPlatform platform) {
+        String queueName = null;
+        if (platform == PushPlatform.xm) {
+            queueName = PUSH_XM;
+        } else if (platform == PushPlatform.hw) {
+            queueName = PUSH_HUAWEI;
+        } else if (platform == PushPlatform.oppo) {
+            queueName = PUSH_OPPO;
+        } else if (platform == PushPlatform.vivo) {
+            queueName = PUSH_VIVO;
+        } else if (platform == PushPlatform.mz) {
+            queueName = PUSH_MZ;
+        }
+        return queueName;
+    }
 
     /**
      * 娣诲姞鍒版帹閫侀槦鍒�
@@ -15,8 +83,85 @@
      * @param dataSet
      */
     public void addToPushQueue(PushPlatform platform, BPushDeviceDataSet dataSet) {
-//TODO 娣诲姞鍒版帹閫侀槦鍒�
+        String queueName = getQueueName(platform);
+        if (queueName == null) {
+            return;
+        }
+        cmqUtil.sendMsg(queueName, new Gson().toJson(dataSet));
+    }
 
+    /**
+     * 娑堣垂闃熷垪
+     *
+     * @param platform
+     * @param count
+     * @return
+     */
+    public List<MQMsgConsumeResult> consumePushQueue(PushPlatform platform, int count) throws Exception {
+        String queueName = getQueueName(platform);
+        if (queueName == null) {
+            return null;
+        }
+
+        List<Message> list = cmqUtil.recieveMsg(count, queueName);
+        if (list != null) {
+            List<MQMsgConsumeResult> resultList = new ArrayList<>();
+            for (Message msg : list) {
+                String result = msg.msgBody;
+                logger.info("闃熷垪鍚嶇О锛歿} 娑堟伅鍐呭锛歿}", queueName, result);
+                BPushDeviceDataSet dataSet = new Gson().fromJson(result, BPushDeviceDataSet.class);
+                resultList.add(new MQMsgConsumeResult(dataSet, queueName, msg.receiptHandle));
+            }
+            return resultList;
+        }
+        return null;
+    }
+
+    /**
+     * 鍒犻櫎娑堟伅
+     *
+     * @param queueName
+     * @param receiptHandle
+     */
+    public void deleteMsg(String queueName, String receiptHandle) {
+        cmqUtil.deleteMsg(queueName, receiptHandle);
+    }
+
+
+    public static class MQMsgConsumeResult {
+        private String queueName;
+        private Object data;
+        private String receiptHandle;
+
+        public MQMsgConsumeResult(Object data, String queueName, String receiptHandle) {
+            this.data = data;
+            this.queueName = queueName;
+            this.receiptHandle = receiptHandle;
+        }
+
+        public Object getData() {
+            return data;
+        }
+
+        public void setData(Object data) {
+            this.data = data;
+        }
+
+        public String getReceiptHandle() {
+            return receiptHandle;
+        }
+
+        public void setReceiptHandle(String receiptHandle) {
+            this.receiptHandle = receiptHandle;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        public void setQueueName(String queueName) {
+            this.queueName = queueName;
+        }
     }
 
 

--
Gitblit v1.8.0