From 6b2670dfa68af9ce2e36a5f9580125f4fc6da570 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期六, 26 六月 2021 18:06:10 +0800 Subject: [PATCH] 推送服务完善 --- service-push/src/main/java/com/ks/push/manager/PushManager.java | 82 ++++++++++++++++++++++++++++------------ 1 files changed, 57 insertions(+), 25 deletions(-) diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java index 80e88ad..a830a6d 100644 --- a/service-push/src/main/java/com/ks/push/manager/PushManager.java +++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java @@ -7,6 +7,7 @@ import com.ks.push.pojo.DO.BPushDeviceToken; import com.ks.push.pojo.DO.BPushPlatformAppInfo; import com.ks.push.pojo.DO.BPushTask; +import com.ks.push.pojo.DO.BPushTaskExcuteResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; @@ -15,9 +16,7 @@ import javax.annotation.Resource; import javax.validation.Valid; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -42,8 +41,6 @@ @Resource private PushExcuteResultManager pushExcuteResultManager; - @Resource - private CMQManager cmqManager; /** @@ -90,29 +87,51 @@ // 鏌ヨ鍙帹閫佺殑骞冲彴 List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode()); + //鍏堝垵濮嬪寲鎺ㄩ�佺粨鏋滄暟鎹暟鎹� + List<BPushTaskExcuteResult> resultList = new ArrayList<>(); + Map<String, Long> resultCountMap = new HashMap<>(); for (BPushPlatformAppInfo appInfo : list) { long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter()); if (count > 0) { hasDevice = true; //鍒濆鍖栨帹閫佺粨鏋滄暟鎹� - pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); - //灏嗘暟鎹姞鍏ュ埌cmq - int pageSize = 500; - int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); - for (int page = 0; page < totalPage; page++) { - List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize); - List<String> tokenList = new ArrayList<>(); - for (BPushDeviceToken deviceToken : deviceTokenList) { - tokenList.add(deviceToken.getToken()); - } - BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); - cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet); - } - - logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), appInfo.getPlatform().name(), count); + BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); + resultCountMap.put(result.getId(), count); + resultList.add(result); } } + + for (BPushTaskExcuteResult result : resultList) { + long count = resultCountMap.get(result.getId()); + int pageSize = 500; + int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); + long totalValidCount = 0L; + + //鍒濆鍖栨壒閲忔帹閫佺殑璁℃暟 + for (int page = 0; page < totalPage; page++) { + pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + ""); + } + + for (int page = 0; page < totalPage; page++) { + List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize); + + List<String> tokenList = new ArrayList<>(); + for (BPushDeviceToken deviceToken : deviceTokenList) { + //TODO 鏃堕棿鍒ゆ柇 + tokenList.add(deviceToken.getToken()); + } + totalValidCount += tokenList.size(); + BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); + //鏈�鍚庝竴椤� + if (page == totalPage - 1 && totalValidCount != count) { + //淇敼鎬绘暟 + pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); + } + CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet); + } + logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count); + } } finally { redisTemplate.delete(key); } @@ -126,6 +145,12 @@ } + /** + * 鏆傚仠鎺ㄩ�� + * + * @param taskId + * @throws BPushTaskException + */ public void pausePush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); @@ -135,13 +160,16 @@ if (task.getState() != BPushTask.STATE_PUSHING) { throw new BPushTaskException(2, "澶勪簬鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙殏鍋�"); } - updateState(taskId, BPushTask.STATE_PAUSED, null); - logger.info("鏆傚仠鎺ㄩ��#taskId:{}", task.getId()); - } + /** + * 閲嶆柊鎺ㄩ�侊紙澶勪簬鏆傚仠鐘舵�佺殑鏁版嵁鎵嶈兘閲嶆柊鎺ㄩ�侊級 + * + * @param taskId + * @throws BPushTaskException + */ public void reStartPush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); @@ -151,12 +179,16 @@ if (task.getState() != BPushTask.STATE_PAUSED) { throw new BPushTaskException(2, "澶勪簬鏆傚仠鐘舵�佺殑鎺ㄩ�佹墠鍙噸鏂板紑濮嬫帹閫�"); } - updateState(taskId, BPushTask.STATE_PUSHING, "閲嶆柊鎺ㄩ��"); - logger.info("閲嶆柊鎺ㄩ��#taskId:{}", task.getId()); } + /** + * 鍙栨秷鎺ㄩ�侊紙澶勪簬鏆傚仠/鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙彇娑堬級 + * + * @param taskId + * @throws BPushTaskException + */ public void cancelPush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); -- Gitblit v1.8.0