From bd885c7015446c6c0495d3299ef64068a4c9b30e Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期三, 16 十月 2024 15:00:34 +0800
Subject: [PATCH] CMQ改造为rabbitmq

---
 service-push/src/main/java/com/ks/push/manager/PushManager.java |   33 +++++++++++++++++++++------------
 1 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/service-push/src/main/java/com/ks/push/manager/PushManager.java b/service-push/src/main/java/com/ks/push/manager/PushManager.java
index 2c8b010..7215fec 100644
--- a/service-push/src/main/java/com/ks/push/manager/PushManager.java
+++ b/service-push/src/main/java/com/ks/push/manager/PushManager.java
@@ -4,6 +4,7 @@
 import com.ks.push.dao.BPushTaskDao;
 import com.ks.push.dto.BPushDeviceDataSet;
 import com.ks.push.exception.BPushTaskException;
+import com.ks.push.manager.rabbitmq.RabbitmqManager;
 import com.ks.push.pojo.DO.BPushDeviceToken;
 import com.ks.push.pojo.DO.BPushPlatformAppInfo;
 import com.ks.push.pojo.DO.BPushTask;
@@ -36,10 +37,13 @@
     private BPushPlatformAppInfoManager bPushPlatformAppInfoManager;
 
     @Resource
-    private RedisTemplate<String, Object> redisTemplate;
+    private PushExcuteResultManager pushExcuteResultManager;
 
     @Resource
-    private PushExcuteResultManager pushExcuteResultManager;
+    private RedisTemplate redisTemplate;
+
+    @Resource
+    private RabbitmqManager rabbitmqManager;
 
 
     /**
@@ -81,13 +85,19 @@
         }
 
         String key = "task-" + taskId;
-        if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS)) {
+
+        Boolean s = redisTemplate.opsForValue().setIfAbsent(key, "1", 5 * 60, TimeUnit.SECONDS);
+
+        if (s == null || !s) {
             throw new BPushTaskException(BPushTaskException.CODE_BUSY, "鏈嶅姟鍣ㄧ箒蹇欙紝璇风◢鍚庡啀璇�");
         }
-        logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId());
-        updateState(taskId, BPushTask.STATE_PUSHING, null);
-        boolean hasDevice = false;
+
+
         try {
+            logger.info("寮�濮嬪惎鍔ㄦ帹閫�#taskId:{}", task.getId());
+            updateState(taskId, BPushTask.STATE_PUSHING, null);
+            boolean hasDevice = false;
+
             // 鏌ヨ鍙帹閫佺殑骞冲彴
             List<BPushPlatformAppInfo> list = bPushPlatformAppInfoManager.listByAppCode(task.getAppCode());
 
@@ -132,19 +142,18 @@
                         //淇敼鎬绘暟
                         pushExcuteResultManager.setDeviceCount(result.getId(), totalValidCount);
                     }
-                    CMQManager.getInstance().addToPushQueue(result.getPushPlatform(), dataSet);
+                    rabbitmqManager.addToPushQueue(result.getPushPlatform(), dataSet);
                 }
                 logger.info("鍔犲叆鎺ㄩ�侀槦鍒�#浠诲姟Id:{}#骞冲彴:{}#鎺ㄩ�佹暟閲�:{}", task.getId(), result.getPushPlatform().name(), count);
+            }
+
+            if (!hasDevice) {
+                updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�");
             }
         } finally {
             redisTemplate.delete(key);
         }
-
         //娌℃湁鍙帹閫佺殑璁惧
-        if (!hasDevice) {
-            updateState(taskId, BPushTask.STATE_FINSIH, "娌℃湁婊¤冻鏉′欢鐨勫彲鎺ㄩ�佽澶�");
-        }
-
         logger.info("鍚姩鎺ㄩ�佺粨鏉�#taskId:{}", task.getId());
     }
 

--
Gitblit v1.8.0