From 7fa83e5dd03f7896bd1d1e8c47f5e926ff3d4ba0 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期三, 16 十月 2024 14:12:24 +0800 Subject: [PATCH] CMQ改造为rabbitmq --- service-push/src/main/java/com/ks/push/manager/PushManager.java | 117 ++++++++++++++++++++++++++++++++++++++++------------------ 1 files changed, 81 insertions(+), 36 deletions(-) diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java index 80e88ad..7215fec 100644 --- a/service-push/src/main/java/com/ks/push/manager/PushManager.java +++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java @@ -4,9 +4,11 @@ import com.ks.push.dao.BPushTaskDao; import com.ks.push.dto.BPushDeviceDataSet; import com.ks.push.exception.BPushTaskException; +import com.ks.push.manager.rabbitmq.RabbitmqManager; import com.ks.push.pojo.DO.BPushDeviceToken; import com.ks.push.pojo.DO.BPushPlatformAppInfo; import com.ks.push.pojo.DO.BPushTask; +import com.ks.push.pojo.DO.BPushTaskExcuteResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; @@ -15,9 +17,7 @@ import javax.annotation.Resource; import javax.validation.Valid; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -37,13 +37,13 @@ private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; @Resource - private RedisTemplate<String, Object> redisTemplate; - - @Resource private PushExcuteResultManager pushExcuteResultManager; @Resource - private CMQManager cmqManager; + private RedisTemplate redisTemplate; + + @Resource + private RabbitmqManager rabbitmqManager; /** @@ -58,6 +58,11 @@ task.setCreateTime(new Date()); } task.setId(BPushTask.createId()); + + if (task.getState() == null) { + task.setState(BPushTask.STATE_NOT_START); + } + bPushTaskDao.save(task); logger.info("鍒涘缓鎺ㄩ�佹垚鍔�#taskId:{}", task.getId()); return task.getId(); @@ -80,52 +85,85 @@ } String key = "task-" + taskId; - if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) { + + Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS); + + if (s == null || !s) { throw new BPushTaskException(BPushTaskException.CODE_BUSY, "鏈嶅姟鍣ㄧ箒蹇欙紝璇风◢鍚庡啀璇�"); } - logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId()); - updateState(taskId, BPushTask.STATE_PUSHING, null); - boolean hasDevice = false; + + try { + logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId()); + updateState(taskId, BPushTask.STATE_PUSHING, null); + boolean hasDevice = false; + // 鏌ヨ鍙帹閫佺殑骞冲彴 List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode()); + //鍏堝垵濮嬪寲鎺ㄩ�佺粨鏋滄暟鎹暟鎹� + List<BPushTaskExcuteResult> resultList = new ArrayList<>(); + Map<String, Long> resultCountMap = new HashMap<>(); for (BPushPlatformAppInfo appInfo : list) { long count = pushDeviceTokenManager.count(task.getAppCode(), appInfo.getPlatform(), task.getFilter()); if (count > 0) { hasDevice = true; //鍒濆鍖栨帹閫佺粨鏋滄暟鎹� - pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); - //灏嗘暟鎹姞鍏ュ埌cmq - int pageSize = 500; - int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); - for (int page = 0; page < totalPage; page++) { - List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), appInfo.getPlatform(), task.getFilter(), page + 1, pageSize); - List<String> tokenList = new ArrayList<>(); - for (BPushDeviceToken deviceToken : deviceTokenList) { - tokenList.add(deviceToken.getToken()); - } - BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); - cmqManager.addToPushQueue(appInfo.getPlatform(), dataSet); - } - - logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), appInfo.getPlatform().name(), count); + BPushTaskExcuteResult result = pushExcuteResultManager.initPushExcuteResult(taskId, appInfo.getPlatform(), count); + resultCountMap.put(result.getId(), count); + resultList.add(result); } } + + for (BPushTaskExcuteResult result : resultList) { + long count = resultCountMap.get(result.getId()); + int pageSize = 500; + int totalPage = (int) (count % pageSize == 0 ? count / pageSize : count / pageSize + 1); + long totalValidCount = 0L; + + //鍒濆鍖栨壒閲忔帹閫佺殑璁℃暟 + for (int page = 0; page < totalPage; page++) { + pushExcuteResultManager.startBatchPush(taskId, result.getPushPlatform(), page + ""); + } + + for (int page = 0; page < totalPage; page++) { + List<BPushDeviceToken> deviceTokenList = pushDeviceTokenManager.list(task.getAppCode(), result.getPushPlatform(), task.getFilter(), page + 1, pageSize); + + List<String> tokenList = new ArrayList<>(); + for (BPushDeviceToken deviceToken : deviceTokenList) { + //TODO 鏃堕棿鍒ゆ柇 + tokenList.add(deviceToken.getToken()); + } + totalValidCount += tokenList.size(); + BPushDeviceDataSet dataSet = new BPushDeviceDataSet(tokenList, page + "", taskId); + //鏈�鍚庝竴椤� + if (page == totalPage - 1 && totalValidCount != count) { + //淇敼鎬绘暟 + pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); + } + rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet); + } + logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count); + } + + if (!hasDevice) { + updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�"); + } } finally { redisTemplate.delete(key); } - //娌℃湁鍙帹閫佺殑璁惧 - if (!hasDevice) { - updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�"); - } - logger.info("鍚姩鎺ㄩ�佺粨鏉�#taskId:{}", task.getId()); } + /** + * 鏆傚仠鎺ㄩ�� + * + * @param taskId + * @throws BPushTaskException + */ public void pausePush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); @@ -135,13 +173,16 @@ if (task.getState() != BPushTask.STATE_PUSHING) { throw new BPushTaskException(2, "澶勪簬鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙殏鍋�"); } - updateState(taskId, BPushTask.STATE_PAUSED, null); - logger.info("鏆傚仠鎺ㄩ��#taskId:{}", task.getId()); - } + /** + * 閲嶆柊鎺ㄩ�侊紙澶勪簬鏆傚仠鐘舵�佺殑鏁版嵁鎵嶈兘閲嶆柊鎺ㄩ�侊級 + * + * @param taskId + * @throws BPushTaskException + */ public void reStartPush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); @@ -151,12 +192,16 @@ if (task.getState() != BPushTask.STATE_PAUSED) { throw new BPushTaskException(2, "澶勪簬鏆傚仠鐘舵�佺殑鎺ㄩ�佹墠鍙噸鏂板紑濮嬫帹閫�"); } - updateState(taskId, BPushTask.STATE_PUSHING, "閲嶆柊鎺ㄩ��"); - logger.info("閲嶆柊鎺ㄩ��#taskId:{}", task.getId()); } + /** + * 鍙栨秷鎺ㄩ�侊紙澶勪簬鏆傚仠/鎺ㄩ�佷腑鐘舵�佺殑鎺ㄩ�佹墠鍙彇娑堬級 + * + * @param taskId + * @throws BPushTaskException + */ public void cancelPush(String taskId) throws BPushTaskException { //楠岃瘉鐘舵�� BPushTask task = bPushTaskDao.get(taskId); -- Gitblit v1.8.0