From 6b2670dfa68af9ce2e36a5f9580125f4fc6da570 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期六, 26 六月 2021 18:06:10 +0800
Subject: [PATCH] 推送服务完善

---
 service-push/src/main/java/com/ks/push/manager/PushManager.java |   82 ++++++++++++++++++++++++++++------------
 1 files changed, 57 insertions(+), 25 deletions(-)

diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java
index 80e88ad..a830a6d 100644
--- a/service-push/src/main/java/com/ks/push/manager/PushManager.java
+++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java
@@ -7,6 +7,7 @@
 import com.ks.push.pojo.DO.BPushDeviceToken;
 import com.ks.push.pojo.DO.BPushPlatformAppInfo;
 import com.ks.push.pojo.DO.BPushTask;
+import com.ks.push.pojo.DO.BPushTaskExcuteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -15,9 +16,7 @@
 
 import javax.annotation.Resource;
 import javax.validation.Valid;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -42,8 +41,6 @@
     @Resource
     private PushExcuteResultManager pushExcuteResultManager;
 
-    @Resource
-    private CMQManager cmqManager;
 
 
     /**
@@ -90,29 +87,51 @@
             // 鏌ヨ鍙帹閫佺殑骞冲彴
             List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode());
 
+            //鍏堝垵濮嬪寲鎺ㄩ�佺粨鏋滄暟鎹暟鎹�
+            List<BPushTaskExcuteResult> resultList = new ArrayList<>();
+            Map<String, Long> resultCountMap = new HashMap<>();
             for (BPushPlatformAppInfo appInfo : list) {
                 long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter());
                 if (count > 0) {
                     hasDevice = true;
                     //鍒濆鍖栨帹閫佺粨鏋滄暟鎹�
-                    pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
-                    //灏嗘暟鎹姞鍏ュ埌cmq
-                    int pageSize = 500;
-                    int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
-                    for (int page = 0; page < totalPage; page++) {
-                        List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize);
-                        List<String> tokenList = new ArrayList<>();
-                        for (BPushDeviceToken deviceToken : deviceTokenList) {
-                            tokenList.add(deviceToken.getToken());
-                        }
-                        BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
-                        cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet);
-                    }
-
-                    logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), appInfo.getPlatform().name(), count);
+                    BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count);
+                    resultCountMap.put(result.getId(), count);
+                    resultList.add(result);
                 }
             }
 
+
+            for (BPushTaskExcuteResult result : resultList) {
+                long count = resultCountMap.get(result.getId());
+                int pageSize = 500;
+                int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1);
+                long totalValidCount = 0L;
+
+                //鍒濆鍖栨壒閲忔帹閫佺殑璁℃暟
+                for (int page = 0; page < totalPage; page++) {
+                    pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + "");
+                }
+
+                for (int page = 0; page < totalPage; page++) {
+                    List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize);
+
+                    List<String> tokenList = new ArrayList<>();
+                    for (BPushDeviceToken deviceToken : deviceTokenList) {
+                        //TODO 鏃堕棿鍒ゆ柇
+                        tokenList.add(deviceToken.getToken());
+                    }
+                    totalValidCount += tokenList.size();
+                    BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId);
+                    //鏈�鍚庝竴椤�
+                    if (page == totalPage - 1 && totalValidCount != count) {
+                        //淇敼鎬绘暟
+                        pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
+                    }
+                    CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet);
+                }
+                logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count);
+            }
         } finally {
             redisTemplate.delete(key);
         }
@@ -126,6 +145,12 @@
     }
 
 
+    /**
+     * 鏆傚仠鎺ㄩ��
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void pausePush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);
@@ -135,13 +160,16 @@
         if (task.getState() != BPushTask.STATE_PUSHING) {
             throw new BPushTaskException(2, "澶勪簬鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙殏鍋�");
         }
-
         updateState(taskId, BPushTask.STATE_PAUSED, null);
-
         logger.info("鏆傚仠鎺ㄩ��#taskId:{}", task.getId());
-
     }
 
+    /**
+     * 閲嶆柊鎺ㄩ�侊紙澶勪簬鏆傚仠鐘舵�佺殑鏁版嵁鎵嶈兘閲嶆柊鎺ㄩ�侊級
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void reStartPush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);
@@ -151,12 +179,16 @@
         if (task.getState() != BPushTask.STATE_PAUSED) {
             throw new BPushTaskException(2, "澶勪簬鏆傚仠鐘舵�佺殑鎺ㄩ�佹墠鍙噸鏂板紑濮嬫帹閫�");
         }
-
         updateState(taskId, BPushTask.STATE_PUSHING, "閲嶆柊鎺ㄩ��");
-
         logger.info("閲嶆柊鎺ㄩ��#taskId:{}", task.getId());
     }
 
+    /**
+     * 鍙栨秷鎺ㄩ�侊紙澶勪簬鏆傚仠/鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙彇娑堬級
+     *
+     * @param taskId
+     * @throws BPushTaskException
+     */
     public void cancelPush(String taskId) throws BPushTaskException {
         //楠岃瘉鐘舵��
         BPushTask task = bPushTaskDao.get(taskId);

--
Gitblit v1.8.0