admin
2022-08-25 264b5dea5b74c4b5ba54a90caba7e709858a037e
src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
@@ -5,7 +5,6 @@
import com.yeshi.buwan.dto.mq.*;
import com.yeshi.buwan.util.StringUtil;
import net.sf.json.JSONObject;
import org.yeshi.utils.CMQUtil;
import java.util.ArrayList;
import java.util.HashMap;
@@ -16,11 +15,7 @@
    private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
    private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
    private static CMQManager cmqManager;
    private static CMQUtil cmqUtil;
    private static TDMQUtil tdmqUtil;
    private final static boolean TDMQ_CONSUMER = true;
    // 搜索引擎
    public static String QUEUENAME_SOLR = "buwan-solr-new";
    //视频更新-爱奇艺2
@@ -54,8 +49,6 @@
    static {
        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
        tdmqUtil = TDMQUtil.getInstance();
        tdmqUtil.init(secretId, secretKey, true);
@@ -100,7 +93,7 @@
    //消费专辑更新消息
    public List<IqiyiAlbum2MQMsg> consumeIqiyiAlbumUpdateMsg(int count) {
        List<IqiyiAlbum2MQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
        if (msgList != null)
            for (Message msg : msgList) {
                IqiyiAlbum2MQMsg mm = new IqiyiAlbum2MQMsg();
@@ -113,11 +106,8 @@
    //删除专辑更新消息
    public void deleteIqiyiAlbumUpdateMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
        }
    }
@@ -129,7 +119,7 @@
    //消费专辑更新消息
    public List<FunTVAlbum2MQMsg> consumeFunTVAlbumUpdateMsg(int count) {
        List<FunTVAlbum2MQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
        if (msgList != null)
            for (Message msg : msgList) {
                FunTVAlbum2MQMsg mm = new FunTVAlbum2MQMsg();
@@ -143,11 +133,8 @@
    //删除专辑更新消息
    public void deleteFunTVAlbumUpdateMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
        }
    }
@@ -165,7 +152,7 @@
    //消费专辑更新消息
    public List<PPTVMQMsg> consumePPTVSeriesUpdateMsg(int count) {
        List<PPTVMQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
        if (msgList != null)
            for (Message msg : msgList) {
                PPTVMQMsg mm = new Gson().fromJson(msg.msgBody, PPTVMQMsg.class);
@@ -177,11 +164,9 @@
    //删除专辑更新消息
    public void deletePPTVSeriesUpdateMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
        }
    }
@@ -201,7 +186,7 @@
    public List<SolrVideoMQMsg> consumeSolrMsg(int count) {
        List<SolrVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_SOLR) : cmqUtil.recieveMsg(count, QUEUENAME_SOLR);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_SOLR);
        if (msgList != null)
            for (Message msg : msgList) {
                SolrVideoMQMsg mm = new SolrVideoMQMsg();
@@ -214,11 +199,9 @@
    public void deleteSolrMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
        }
    }
@@ -234,7 +217,7 @@
    public List<CMQResult> consumeVideoResourceDeleteMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQResult mm = new CMQResult();
@@ -247,11 +230,9 @@
    public void deleteVideoResourceDeleteMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
        }
    }
@@ -267,7 +248,7 @@
    public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQResult mm = new CMQResult();
@@ -279,11 +260,9 @@
    }
    public void deleteUpdateVideoExtraInfoMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
        }
    }
@@ -300,7 +279,7 @@
    public List<InternetSearchVideoMQMsg> consumeInternetSearchVideoUpdateMsg(int count) {
        Gson gson = new Gson();
        List<InternetSearchVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        if (msgList != null)
            for (Message msg : msgList) {
                InternetSearchVideoMQMsg mm = gson.fromJson(msg.msgBody, InternetSearchVideoMQMsg.class);
@@ -312,11 +291,9 @@
    public void deleteInternetSearchVideoUpdateMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
        }
    }
    /**
@@ -329,18 +306,16 @@
    }
    public void deleteUpdateResourceVideoMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
        }
    }
    public List<UpdateResourceVideoMQMsg> consumeUpdateResourceVideoMsg(int count) {
        Gson gson = new Gson();
        List<UpdateResourceVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
        if (msgList != null)
            for (Message msg : msgList) {
                try {
@@ -368,7 +343,7 @@
    public Map<String, VideoDataChangeMQMsg> consumeVideoSyncV2DataMsg(int count) {
        Gson gson = new Gson();
        Map<String, VideoDataChangeMQMsg> map = new HashMap<>();
        List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        if (msgList != null)
            for (Message msg : msgList) {
                map.put(msg.receiptHandle, gson.fromJson(msg.msgBody, VideoDataChangeMQMsg.class));
@@ -378,12 +353,7 @@
    public void deleteVideoSyncV2DataMsg(String handler) {
        if (TDMQ_CONSUMER) {
            tdmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
        } else {
            cmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
        }
    }
}