admin
2020-10-26 89e370bfdda29ac8a8f7080a18dc09a6ddc75c09
src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
@@ -1,10 +1,8 @@
package com.yeshi.buwan.util.mq;
import com.google.gson.Gson;
import com.qcloud.cmq.Message;
import com.yeshi.buwan.dto.mq.CMQConsumeMsg;
import com.yeshi.buwan.dto.mq.FunTVAlbum2MQMsg;
import com.yeshi.buwan.dto.mq.IqiyiAlbum2MQMsg;
import com.yeshi.buwan.dto.mq.SolrVideoMQMsg;
import com.yeshi.buwan.dto.mq.*;
import java.util.ArrayList;
import java.util.List;
@@ -24,6 +22,12 @@
    //删除视频资源
    public static String QUEUENAME_VIDEO_RESOURCE_DELETE = "buwan-video-resource-delete";
    //更新视频附加信息
    public static String QUEUENAME_UPDATE_VIDEO_EXTRAINFO = "buwan-video-video-update-extrainfo";
    private static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change";
    static {
        cmqUtil = CMQUtil.getInstance(secretId, secretKey);
@@ -32,6 +36,18 @@
        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
        cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
        cmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024);
        cmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024);
        //创建主题
        cmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO);
        //订阅主题
        try {
            List<String> filters = new ArrayList<>();
            filters.add("resource");
            filters.add("category");
            cmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO, QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters);
        } catch (Exception e) {
        }
    }
    public static CMQManager getInstance() {
@@ -134,12 +150,12 @@
        cmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
    }
    public List<CMQConsumeMsg> consumeVideoResourceDeleteMsg(int count) {
        List<CMQConsumeMsg> list = new ArrayList<>();
    public List<CMQResult> consumeVideoResourceDeleteMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQConsumeMsg mm = new CMQConsumeMsg();
                CMQResult mm = new CMQResult();
                mm.setHandler(msg.receiptHandle);
                mm.setData(msg.msgBody);
                list.add(mm);
@@ -153,4 +169,32 @@
    }
    /**
     * 视频附加信息改变
     */
    public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) {
        List<String> list = new ArrayList<>();
        list.add(msg.getType());
        cmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, new Gson().toJson(msg), list);
    }
    public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) {
        List<CMQResult> list = new ArrayList<>();
        List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
        if (msgList != null)
            for (Message msg : msgList) {
                CMQResult mm = new CMQResult();
                mm.setHandler(msg.receiptHandle);
                mm.setData(new Gson().fromJson(msg.msgBody, VideoExtraInfoChangeMQMsg.class));
                list.add(mm);
            }
        return list;
    }
    public void deleteUpdateVideoExtraInfoMsg(String handler) {
        cmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
    }
}