admin
2023-04-12 f06a592dd1a7e995bf313ccb5efe7dff73ccfc4e
src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
@@ -1,32 +1,81 @@
package com.yeshi.buwan.util.mq;
import com.google.gson.Gson;
import com.qcloud.cmq.Message;
import com.yeshi.buwan.dto.mq.FunTVAlbum2MQMsg;
import com.yeshi.buwan.dto.mq.IqiyiAlbum2MQMsg;
import com.yeshi.buwan.dto.mq.SolrVideoMQMsg;
import com.yeshi.buwan.dto.mq.*;
import com.yeshi.buwan.util.StringUtil;
import net.sf.json.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CMQManager {
    private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
    private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
    private static CMQManager cmqManager;
    private static CMQUtil cmqUtil;
    private static TDMQUtil tdmqUtil;
    // 搜索引擎
    public static String QUEUENAME_SOLR = "buwan-solr";
    public static String QUEUENAME_SOLR = "buwan-solr-new";
    //视频更新-爱奇艺2
    public static String QUEUENAME_VIDEO_UPDATE_IQIYI_2 = "buwan-video-update-iqiyi2";
    //视频更新-风行2
    public static String QUEUENAME_VIDEO_UPDATE_FUNTV_2 = "buwan-video-update-funtv2";
    //视频更新-PPTV
    public static String QUEUENAME_VIDEO_UPDATE_PPTV = "buwan-video-update-pptv";
    //删除视频资源
    public static String QUEUENAME_VIDEO_RESOURCE_DELETE = "buwan-video-resource-delete";
    //更新视频附加信息
    public static String QUEUENAME_UPDATE_VIDEO_EXTRAINFO = "buwan-video-video-update-extrainfo";
    //全网搜
    public static String QUEUENAME_UPDATE_INTERNET_SEARCH = "buwan-video-update-internet-search";
    public static String QUEUENAME_UPDATE_RESOURCE_VIDEO = "buwan-video-update-resource-video";
    //同步V2版本的视频信息
    public static String QUEUENAME_VIDEO_SYNCDATA_V2 = "buwan-video-syncdata-v2";
    private static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change";
    //视频信息修改
    private static String TOPIC_VIDEO_INFO_CHANGE = "buwan_topic_video_info_change";
    static {
        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
        tdmqUtil = TDMQUtil.getInstance();
        tdmqUtil.init(secretId, secretKey, true);
        // 最大消息为1M
        cmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024);
        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_PPTV, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_UPDATE_INTERNET_SEARCH, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_UPDATE_RESOURCE_VIDEO, 1024 * 1024);
        tdmqUtil.createQueue(QUEUENAME_VIDEO_SYNCDATA_V2, 1024 * 1024);
        //创建主题
        tdmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO);
        tdmqUtil.createTopic(TOPIC_VIDEO_INFO_CHANGE);
        //订阅主题
        try {
            List<String> filters = new ArrayList<>();
            filters.add("resource");
            filters.add("category");
            tdmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO), QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters);
            tdmqUtil.subscribeTopic(TOPIC_VIDEO_INFO_CHANGE, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_INFO_CHANGE + "#" + QUEUENAME_VIDEO_SYNCDATA_V2), QUEUENAME_VIDEO_SYNCDATA_V2);
        } catch (Exception e) {
        }
    }
    public static CMQManager getInstance() {
@@ -38,13 +87,13 @@
    //添加专辑更新消息
    public void addIqiyiAlbumUpdateMsg(Long id) {
        cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
    }
    //消费专辑更新消息
    public List<IqiyiAlbum2MQMsg> consumeIqiyiAlbumUpdateMsg(int count) {
        List<IqiyiAlbum2MQMsg> list = new ArrayList<>();
        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
        if (msgList != null)
            for (Message msg : msgList) {
                IqiyiAlbum2MQMsg mm = new IqiyiAlbum2MQMsg();
@@ -57,19 +106,20 @@
    //删除专辑更新消息
    public void deleteIqiyiAlbumUpdateMsg(String handler) {
        cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
    }
    //添加专辑更新消息
    public void addFunTVAlbumUpdateMsg(String id) {
        cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id );
        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id);
    }
    //消费专辑更新消息
    public List<FunTVAlbum2MQMsg> consumeFunTVAlbumUpdateMsg(int count) {
        List<FunTVAlbum2MQMsg> list = new ArrayList<>();
        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
        if (msgList != null)
            for (Message msg : msgList) {
                FunTVAlbum2MQMsg mm = new FunTVAlbum2MQMsg();
@@ -80,9 +130,43 @@
        return list;
    }
    //删除专辑更新消息
    public void deleteFunTVAlbumUpdateMsg(String handler) {
        cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
    }
    /**
     * PPTV剧集更新
     */
    //添加专辑更新消息
    public void addPPTVSeriesUpdateMsg(PPTVMQMsg msg) {
        tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_PPTV, new Gson().toJson(msg));
    }
    //消费专辑更新消息
    public List<PPTVMQMsg> consumePPTVSeriesUpdateMsg(int count) {
        List<PPTVMQMsg> list = new ArrayList<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
        if (msgList != null)
            for (Message msg : msgList) {
                PPTVMQMsg mm = new Gson().fromJson(msg.msgBody, PPTVMQMsg.class);
                mm.setHandler(msg.receiptHandle);
                list.add(mm);
            }
        return list;
    }
    //删除专辑更新消息
    public void deletePPTVSeriesUpdateMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
    }
@@ -97,12 +181,12 @@
     * @param id
     */
    public void addSolrMsg(String id) {
        cmqUtil.sendMsg(QUEUENAME_SOLR, id);
        tdmqUtil.sendMsg(QUEUENAME_SOLR, id);
    }
    public List<SolrVideoMQMsg> consumeSolrMsg(int count) {
        List<SolrVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_SOLR);
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_SOLR);
        if (msgList != null)
            for (Message msg : msgList) {
                SolrVideoMQMsg mm = new SolrVideoMQMsg();
@@ -115,7 +199,161 @@
    public void deleteSolrMsg(String handler) {
        cmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
        tdmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
    }
}
    /**
     * 删除视频资源
     *
     * @param videoId
     */
    public void addVideoResourceDeleteMsg(String videoId) {
        tdmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
    }
    public List<CMQResult> consumeVideoResourceDeleteMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQResult mm = new CMQResult();
                mm.setHandler(msg.receiptHandle);
                mm.setData(msg.msgBody);
                list.add(mm);
            }
        return list;
    }
    public void deleteVideoResourceDeleteMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
    }
    /**
     * 视频附加信息改变
     */
    public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) {
        List<String> list = new ArrayList<>();
        list.add(msg.getType());
        tdmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, list, new Gson().toJson(msg));
    }
    public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQResult mm = new CMQResult();
                mm.setHandler(msg.receiptHandle);
                mm.setData(new Gson().fromJson(msg.msgBody, VideoExtraInfoChangeMQMsg.class));
                list.add(mm);
            }
        return list;
    }
    public void deleteUpdateVideoExtraInfoMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
    }
    /**
     * 全网搜
     *
     * @param msg
     */
    public void addInternetSearchVideoUpdateMsg(InternetSearchVideoMQMsg msg) {
        tdmqUtil.sendMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, new Gson().toJson(msg));
    }
    public List<InternetSearchVideoMQMsg> consumeInternetSearchVideoUpdateMsg(int count) {
        Gson gson = new Gson();
        List<InternetSearchVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        if (msgList != null)
            for (Message msg : msgList) {
                InternetSearchVideoMQMsg mm = gson.fromJson(msg.msgBody, InternetSearchVideoMQMsg.class);
                mm.setHandler(msg.receiptHandle);
                list.add(mm);
            }
        return list;
    }
    public void deleteInternetSearchVideoUpdateMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
    }
    /**
     * 视频来源更新
     *
     * @param msg
     */
    public void addUpdateResourceVideoMsg(UpdateResourceVideoMQMsg msg) {
        tdmqUtil.sendMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, new Gson().toJson(msg));
    }
    public void deleteUpdateResourceVideoMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
    }
    public List<UpdateResourceVideoMQMsg> consumeUpdateResourceVideoMsg(int count) {
        Gson gson = new Gson();
        List<UpdateResourceVideoMQMsg> list = new ArrayList<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
        if (msgList != null)
            for (Message msg : msgList) {
                try {
                    UpdateResourceVideoMQMsg mm = gson.fromJson(msg.msgBody, UpdateResourceVideoMQMsg.class);
                    mm.setHandler(msg.receiptHandle);
                    list.add(mm);
                } catch (Exception e) {
                    e.printStackTrace();
                    JSONObject jsonObject = JSONObject.fromObject(msg.msgBody);
                    jsonObject.remove("date");
                    UpdateResourceVideoMQMsg mm = gson.fromJson(jsonObject.toString(), UpdateResourceVideoMQMsg.class);
                    mm.setHandler(msg.receiptHandle);
                    list.add(mm);
                }
            }
        return list;
    }
    //视频数据更改
    public void addVideoDataChanged(VideoDataChangeMQMsg msg) {
        tdmqUtil.publishTopicMessage(TOPIC_VIDEO_INFO_CHANGE, new Gson().toJson(msg));
    }
    public Map<String, VideoDataChangeMQMsg> consumeVideoSyncV2DataMsg(int count) {
        Gson gson = new Gson();
        Map<String, VideoDataChangeMQMsg> map = new HashMap<>();
        List<Message> msgList = tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
        if (msgList != null)
            for (Message msg : msgList) {
                map.put(msg.receiptHandle, gson.fromJson(msg.msgBody, VideoDataChangeMQMsg.class));
            }
        return map;
    }
    public void deleteVideoSyncV2DataMsg(String handler) {
        tdmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
    }
}