From f06a592dd1a7e995bf313ccb5efe7dff73ccfc4e Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期三, 12 四月 2023 18:25:52 +0800
Subject: [PATCH] 增加本地获取IP归属地/广告优化

---
 src/main/java/com/yeshi/buwan/util/mq/CMQManager.java |  143 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 103 insertions(+), 40 deletions(-)

diff --git a/src/main/java/com/yeshi/buwan/util/mq/CMQManager.java b/src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
index 146daa3..13a20aa 100644
--- a/src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
+++ b/src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
@@ -4,15 +4,18 @@
 import com.qcloud.cmq.Message;
 import com.yeshi.buwan.dto.mq.*;
 import com.yeshi.buwan.util.StringUtil;
+import net.sf.json.JSONObject;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class CMQManager {
     private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
     private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
     private static CMQManager cmqManager;
-    private static CMQUtil cmqUtil;
+    private static TDMQUtil tdmqUtil;
     // 鎼滅储寮曟搸
     public static String QUEUENAME_SOLR = "buwan-solr-new";
     //瑙嗛鏇存柊-鐖卞鑹�2
@@ -34,29 +37,43 @@
 
     public static String QUEUENAME_UPDATE_RESOURCE_VIDEO = "buwan-video-update-resource-video";
 
+
+    //鍚屾V2鐗堟湰鐨勮棰戜俊鎭�
+    public static String QUEUENAME_VIDEO_SYNCDATA_V2 = "buwan-video-syncdata-v2";
+
+
     private static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change";
+
+    //瑙嗛淇℃伅淇敼
+    private static String TOPIC_VIDEO_INFO_CHANGE = "buwan_topic_video_info_change";
 
 
     static {
-        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
+        tdmqUtil = TDMQUtil.getInstance();
+        tdmqUtil.init(secretId, secretKey, true);
+
+
         // 鏈�澶ф秷鎭负1M
-        cmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_PPTV, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_UPDATE_INTERNET_SEARCH, 1024 * 1024);
-        cmqUtil.createQueue(QUEUENAME_UPDATE_RESOURCE_VIDEO, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_PPTV, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_UPDATE_INTERNET_SEARCH, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_UPDATE_RESOURCE_VIDEO, 1024 * 1024);
+        tdmqUtil.createQueue(QUEUENAME_VIDEO_SYNCDATA_V2, 1024 * 1024);
 
         //鍒涘缓涓婚
-        cmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO);
+        tdmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO);
+        tdmqUtil.createTopic(TOPIC_VIDEO_INFO_CHANGE);
         //璁㈤槄涓婚
         try {
             List<String> filters = new ArrayList<>();
             filters.add("resource");
             filters.add("category");
-            cmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, StringUtil.Md5(TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO), QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters);
+            tdmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO), QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters);
+            tdmqUtil.subscribeTopic(TOPIC_VIDEO_INFO_CHANGE, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_INFO_CHANGE + "#" + QUEUENAME_VIDEO_SYNCDATA_V2), QUEUENAME_VIDEO_SYNCDATA_V2);
         } catch (Exception e) {
         }
     }
@@ -70,13 +87,13 @@
 
     //娣诲姞涓撹緫鏇存柊娑堟伅
     public void addIqiyiAlbumUpdateMsg(Long id) {
-        cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
+        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
     }
 
     //娑堣垂涓撹緫鏇存柊娑堟伅
     public List<IqiyiAlbum2MQMsg> consumeIqiyiAlbumUpdateMsg(int count) {
         List<IqiyiAlbum2MQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
         if (msgList != null)
             for (Message msg : msgList) {
                 IqiyiAlbum2MQMsg mm = new IqiyiAlbum2MQMsg();
@@ -89,19 +106,20 @@
 
     //鍒犻櫎涓撹緫鏇存柊娑堟伅
     public void deleteIqiyiAlbumUpdateMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
+        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
+
     }
 
 
     //娣诲姞涓撹緫鏇存柊娑堟伅
     public void addFunTVAlbumUpdateMsg(String id) {
-        cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id);
+        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id);
     }
 
     //娑堣垂涓撹緫鏇存柊娑堟伅
     public List<FunTVAlbum2MQMsg> consumeFunTVAlbumUpdateMsg(int count) {
         List<FunTVAlbum2MQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
         if (msgList != null)
             for (Message msg : msgList) {
                 FunTVAlbum2MQMsg mm = new FunTVAlbum2MQMsg();
@@ -115,7 +133,10 @@
 
     //鍒犻櫎涓撹緫鏇存柊娑堟伅
     public void deleteFunTVAlbumUpdateMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
+
+
     }
 
 
@@ -125,13 +146,13 @@
 
     //娣诲姞涓撹緫鏇存柊娑堟伅
     public void addPPTVSeriesUpdateMsg(PPTVMQMsg msg) {
-        cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_PPTV, new Gson().toJson(msg));
+        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_PPTV, new Gson().toJson(msg));
     }
 
     //娑堣垂涓撹緫鏇存柊娑堟伅
     public List<PPTVMQMsg> consumePPTVSeriesUpdateMsg(int count) {
         List<PPTVMQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
         if (msgList != null)
             for (Message msg : msgList) {
                 PPTVMQMsg mm = new Gson().fromJson(msg.msgBody, PPTVMQMsg.class);
@@ -143,7 +164,9 @@
 
     //鍒犻櫎涓撹緫鏇存柊娑堟伅
     public void deletePPTVSeriesUpdateMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
+
     }
 
 
@@ -158,12 +181,12 @@
      * @param id
      */
     public void addSolrMsg(String id) {
-        cmqUtil.sendMsg(QUEUENAME_SOLR, id);
+        tdmqUtil.sendMsg(QUEUENAME_SOLR, id);
     }
 
     public List<SolrVideoMQMsg> consumeSolrMsg(int count) {
         List<SolrVideoMQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_SOLR);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_SOLR);
         if (msgList != null)
             for (Message msg : msgList) {
                 SolrVideoMQMsg mm = new SolrVideoMQMsg();
@@ -176,7 +199,9 @@
 
 
     public void deleteSolrMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
+
     }
 
 
@@ -187,12 +212,12 @@
      */
 
     public void addVideoResourceDeleteMsg(String videoId) {
-        cmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
+        tdmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
     }
 
     public List<CMQResult> consumeVideoResourceDeleteMsg(int count) {
         List<CMQResult> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
         if (msgList != null)
             for (Message msg : msgList) {
                 CMQResult mm = new CMQResult();
@@ -205,7 +230,9 @@
 
 
     public void deleteVideoResourceDeleteMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
+
     }
 
 
@@ -215,13 +242,13 @@
     public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) {
         List<String> list = new ArrayList<>();
         list.add(msg.getType());
-        cmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, new Gson().toJson(msg), list);
+        tdmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, list, new Gson().toJson(msg));
     }
 
 
     public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) {
         List<CMQResult> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
         if (msgList != null)
             for (Message msg : msgList) {
                 CMQResult mm = new CMQResult();
@@ -233,7 +260,9 @@
     }
 
     public void deleteUpdateVideoExtraInfoMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
+
     }
 
 
@@ -244,13 +273,13 @@
      */
 
     public void addInternetSearchVideoUpdateMsg(InternetSearchVideoMQMsg msg) {
-        cmqUtil.sendMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, new Gson().toJson(msg));
+        tdmqUtil.sendMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, new Gson().toJson(msg));
     }
 
     public List<InternetSearchVideoMQMsg> consumeInternetSearchVideoUpdateMsg(int count) {
         Gson gson = new Gson();
         List<InternetSearchVideoMQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
         if (msgList != null)
             for (Message msg : msgList) {
                 InternetSearchVideoMQMsg mm = gson.fromJson(msg.msgBody, InternetSearchVideoMQMsg.class);
@@ -262,7 +291,9 @@
 
 
     public void deleteInternetSearchVideoUpdateMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
+
     }
 
     /**
@@ -271,26 +302,58 @@
      * @param msg
      */
     public void addUpdateResourceVideoMsg(UpdateResourceVideoMQMsg msg) {
-        cmqUtil.sendMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, new Gson().toJson(msg));
+        tdmqUtil.sendMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, new Gson().toJson(msg));
     }
 
     public void deleteUpdateResourceVideoMsg(String handler) {
-        cmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
+
+        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
+
     }
 
 
     public List<UpdateResourceVideoMQMsg> consumeUpdateResourceVideoMsg(int count) {
         Gson gson = new Gson();
         List<UpdateResourceVideoMQMsg> list = new ArrayList<>();
-        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
         if (msgList != null)
             for (Message msg : msgList) {
-                UpdateResourceVideoMQMsg mm = gson.fromJson(msg.msgBody, UpdateResourceVideoMQMsg.class);
-                mm.setHandler(msg.receiptHandle);
-                list.add(mm);
+                try {
+                    UpdateResourceVideoMQMsg mm = gson.fromJson(msg.msgBody, UpdateResourceVideoMQMsg.class);
+                    mm.setHandler(msg.receiptHandle);
+                    list.add(mm);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    JSONObject jsonObject = JSONObject.fromObject(msg.msgBody);
+                    jsonObject.remove("date");
+                    UpdateResourceVideoMQMsg mm = gson.fromJson(jsonObject.toString(), UpdateResourceVideoMQMsg.class);
+                    mm.setHandler(msg.receiptHandle);
+                    list.add(mm);
+                }
             }
         return list;
     }
 
+    //瑙嗛鏁版嵁鏇存敼
+    public void addVideoDataChanged(VideoDataChangeMQMsg msg) {
+        tdmqUtil.publishTopicMessage(TOPIC_VIDEO_INFO_CHANGE, new Gson().toJson(msg));
+    }
 
-}
+
+    public Map<String, VideoDataChangeMQMsg> consumeVideoSyncV2DataMsg(int count) {
+        Gson gson = new Gson();
+        Map<String, VideoDataChangeMQMsg> map = new HashMap<>();
+        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
+        if (msgList != null)
+            for (Message msg : msgList) {
+                map.put(msg.receiptHandle, gson.fromJson(msg.msgBody, VideoDataChangeMQMsg.class));
+            }
+        return map;
+    }
+
+
+    public void deleteVideoSyncV2DataMsg(String handler) {
+        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
+    }
+
+}
\ No newline at end of file

--
Gitblit v1.8.0