From bd885c7015446c6c0495d3299ef64068a4c9b30e Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期三, 16 十月 2024 15:00:34 +0800 Subject: [PATCH] CMQ改造为rabbitmq --- service-push/src/main/java/com/ks/push/manager/PushManager.java | 33 +++++++++++++++++++++------------ 1 files changed, 21 insertions(+), 12 deletions(-) diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java index 2c8b010..7215fec 100644 --- a/service-push/src/main/java/com/ks/push/manager/PushManager.java +++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java @@ -4,6 +4,7 @@ import com.ks.push.dao.BPushTaskDao; import com.ks.push.dto.BPushDeviceDataSet; import com.ks.push.exception.BPushTaskException; +import com.ks.push.manager.rabbitmq.RabbitmqManager; import com.ks.push.pojo.DO.BPushDeviceToken; import com.ks.push.pojo.DO.BPushPlatformAppInfo; import com.ks.push.pojo.DO.BPushTask; @@ -36,10 +37,13 @@ private BPushPlatformAppInfoManager bPushPlatformAppInfoManager; @Resource - private RedisTemplate<String, Object> redisTemplate; + private PushExcuteResultManager pushExcuteResultManager; @Resource - private PushExcuteResultManager pushExcuteResultManager; + private RedisTemplate redisTemplate; + + @Resource + private RabbitmqManager rabbitmqManager; /** @@ -81,13 +85,19 @@ } String key = "task-" + taskId; - if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) { + + Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS); + + if (s == null || !s) { throw new BPushTaskException(BPushTaskException.CODE_BUSY, "鏈嶅姟鍣ㄧ箒蹇欙紝璇风◢鍚庡啀璇�"); } - logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId()); - updateState(taskId, BPushTask.STATE_PUSHING, null); - boolean hasDevice = false; + + try { + logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId()); + updateState(taskId, BPushTask.STATE_PUSHING, null); + boolean hasDevice = false; + // 鏌ヨ鍙帹閫佺殑骞冲彴 List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode()); @@ -132,19 +142,18 @@ //淇敼鎬绘暟 pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount); } - CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet); + rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet); } logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count); + } + + if (!hasDevice) { + updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�"); } } finally { redisTemplate.delete(key); } - //娌℃湁鍙帹閫佺殑璁惧 - if (!hasDevice) { - updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�"); - } - logger.info("鍚姩鎺ㄩ�佺粨鏉�#taskId:{}", task.getId()); } -- Gitblit v1.8.0