From 7fa83e5dd03f7896bd1d1e8c47f5e926ff3d4ba0 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期三, 16 十月 2024 14:12:24 +0800
Subject: [PATCH] CMQ改造为rabbitmq

---
 service-push/src/main/java/com/ks/push/manager/PushManager.java |  117 ++++++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 81 insertions(+), 36 deletions(-)

diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java
index 80e88ad..7215fec 100644
--- a/service-push/src/main/java/com/ks/push/manager/PushManager.java
+++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java
@@ -4,9 +4,11 @@
 import com.ks.push.dao.BPushTaskDao;
 import com.ks.push.dto.BPushDeviceDataSet;
 import com.ks.push.exception.BPushTaskException;
+import com.ks.push.manager.rabbitmq.RabbitmqManager;
 import com.ks.push.pojo.DO.BPushDeviceToken;
 import com.ks.push.pojo.DO.BPushPlatformAppInfo;
 import com.ks.push.pojo.DO.BPushTask;
+import com.ks.push.pojo.DO.BPushTaskExcuteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -15,9 +17,7 @@
 
 import javax.annotation.Resource;
 import javax.validation.Valid;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -37,13 +37,13 @@
     private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
 
     @Resource
-    private RedisTemplate<String, Object> redisTemplate;
-
-    @Resource
     private PushExcuteResultManager pushExcuteResultManager;
 
     @Resource
-    private CMQManager cmqManager;
+    private RedisTemplate redisTemplate;
+
+    @Resource
+    private RabbitmqManager rabbitmqManager;
 
 
     /**
@@ -58,6 +58,11 @@
             task.setCreateTime(new Date());
         }
         task.setId(BPushTask.createId());
+
+        if (task.getState() == null) {
+            task.setState(BPushTask.STATE_NOT_START);
+        }
+
         bPushTaskDao.save(task);
         logger.info("鍒涘缓鎺ㄩ�佹垚鍔�#taskId:{}", task.getId());
         return task.getId();
@@ -80,52 +85,85 @@
         }
 
         String key = "task-" + taskId;
-        if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) {
+
+        Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS);
+
+        if (s == null || !s) {
             throw new BPushTaskException(BPushTaskException.CODE_BUSY, "鏈嶅姟鍣ㄧ箒蹇欙紝璇风◢鍚庡啀璇�");
         }
-        logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId());
-        updateState(taskId, BPushTask.STATE_PUSHING, null);
-        boolean hasDevice = false;
+
+
         try {
+            logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId());
+            updateState(taskId, BPushTask.STATE_PUSHING, null);
+            boolean hasDevice = false;
+
             // 鏌ヨ鍙帹閫佺殑骞冲彴
             List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode());
 
+            //鍏堝垵濮嬪寲鎺ㄩ�佺粨鏋滄暟鎹暟鎹�
+            List<BPushTaskExcuteResult> resultList = new ArrayList<>();
+            Map<String, Long> resultCountMap = new HashMap<>();
             for (BPushPlatformAppInfo appInfo : list) {
                 long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter());
                 if (count > 0) {
                     hasDevice = true;
                     //鍒濆鍖栨帹閫佺粨鏋滄暟鎹�
-                    pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
-                    //灏嗘暟鎹姞鍏ュ埌cmq
-                    int pageSize = 500;
-                    int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
-                    for (int page = 0; page < totalPage; page++) {
-                        List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize);
-                        List<String> tokenList = new ArrayList<>();
-                        for (BPushDeviceToken deviceToken : deviceTokenList) {
-                            tokenList.add(deviceToken.getToken());
-                        }
-                        BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
-                        cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet);
-                    }
-
-                    logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), appInfo.getPlatform().name(), count);
+                    BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
+                    resultCountMap.put(result.getId(), count);
+                    resultList.add(result);
                 }
             }
 
+
+            for (BPushTaskExcuteResult result : resultList) {
+                long count = resultCountMap.get(result.getId());
+                int pageSize = 500;
+                int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
+                long totalValidCount = 0L;
+
+                //鍒濆鍖栨壒閲忔帹閫佺殑璁℃暟
+                for (int page = 0; page < totalPage; page++) {
+                    pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + "");
+                }
+
+                for (int page = 0; page < totalPage; page++) {
+                    List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize);
+
+                    List<String> tokenList = new ArrayList<>();
+                    for (BPushDeviceToken deviceToken : deviceTokenList) {
+                        //TODO 鏃堕棿鍒ゆ柇
+                        tokenList.add(deviceToken.getToken());
+                    }
+                    totalValidCount += tokenList.size();
+                    BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
+                    //鏈�鍚庝竴椤�
+                    if (page == totalPage - 1 && totalValidCount != count) {
+                        //淇敼鎬绘暟
+                        pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
+                    }
+                    rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet);
+                }
+                logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count);
+            }
+
+            if (!hasDevice) {
+                updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�");
+            }
         } finally {
             redisTemplate.delete(key);
         }
-
         //娌℃湁鍙帹閫佺殑璁惧
-        if (!hasDevice) {
-            updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�");
-        }
-
         logger.info("鍚姩鎺ㄩ�佺粨鏉�#taskId:{}", task.getId());
     }
 
 
+    /**
+     * 鏆傚仠鎺ㄩ��
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void pausePush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);
@@ -135,13 +173,16 @@
         if (task.getState() != BPushTask.STATE_PUSHING) {
             throw new BPushTaskException(2, "澶勪簬鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙殏鍋�");
         }
-
         updateState(taskId, BPushTask.STATE_PAUSED, null);
-
         logger.info("鏆傚仠鎺ㄩ��#taskId:{}", task.getId());
-
     }
 
+    /**
+     * 閲嶆柊鎺ㄩ�侊紙澶勪簬鏆傚仠鐘舵�佺殑鏁版嵁鎵嶈兘閲嶆柊鎺ㄩ�侊級
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void reStartPush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);
@@ -151,12 +192,16 @@
         if (task.getState() != BPushTask.STATE_PAUSED) {
             throw new BPushTaskException(2, "澶勪簬鏆傚仠鐘舵�佺殑鎺ㄩ�佹墠鍙噸鏂板紑濮嬫帹閫�");
         }
-
         updateState(taskId, BPushTask.STATE_PUSHING, "閲嶆柊鎺ㄩ��");
-
         logger.info("閲嶆柊鎺ㄩ��#taskId:{}", task.getId());
     }
 
+    /**
+     * 鍙栨秷鎺ㄩ�侊紙澶勪簬鏆傚仠/鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙彇娑堬級
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void cancelPush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);

--
Gitblit v1.8.0