package com.yeshi.buwan.util.mq;
|
|
import com.google.gson.Gson;
|
import com.qcloud.cmq.Message;
|
import com.yeshi.buwan.dto.mq.*;
|
import com.yeshi.buwan.util.StringUtil;
|
import net.sf.json.JSONObject;
|
import org.yeshi.utils.CMQUtil;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
public class CMQManager {
|
private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25";
|
private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo";
|
private static CMQManager cmqManager;
|
private static CMQUtil cmqUtil;
|
private static TDMQUtil tdmqUtil;
|
|
private final static boolean TDMQ_CONSUMER = true;
|
|
// 搜索引擎
|
public static String QUEUENAME_SOLR = "buwan-solr-new";
|
//视频更新-爱奇艺2
|
public static String QUEUENAME_VIDEO_UPDATE_IQIYI_2 = "buwan-video-update-iqiyi2";
|
//视频更新-风行2
|
public static String QUEUENAME_VIDEO_UPDATE_FUNTV_2 = "buwan-video-update-funtv2";
|
|
//视频更新-PPTV
|
public static String QUEUENAME_VIDEO_UPDATE_PPTV = "buwan-video-update-pptv";
|
|
//删除视频资源
|
public static String QUEUENAME_VIDEO_RESOURCE_DELETE = "buwan-video-resource-delete";
|
|
//更新视频附加信息
|
public static String QUEUENAME_UPDATE_VIDEO_EXTRAINFO = "buwan-video-video-update-extrainfo";
|
|
//全网搜
|
public static String QUEUENAME_UPDATE_INTERNET_SEARCH = "buwan-video-update-internet-search";
|
|
public static String QUEUENAME_UPDATE_RESOURCE_VIDEO = "buwan-video-update-resource-video";
|
|
|
//同步V2版本的视频信息
|
public static String QUEUENAME_VIDEO_SYNCDATA_V2 = "buwan-video-syncdata-v2";
|
|
|
private static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change";
|
|
//视频信息修改
|
private static String TOPIC_VIDEO_INFO_CHANGE = "buwan_topic_video_info_change";
|
|
|
static {
|
cmqUtil = CMQUtil.getInstance(secretId, secretKey);
|
|
tdmqUtil = TDMQUtil.getInstance();
|
tdmqUtil.init(secretId, secretKey, true);
|
|
|
// 最大消息为1M
|
tdmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_PPTV, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_UPDATE_INTERNET_SEARCH, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_UPDATE_RESOURCE_VIDEO, 1024 * 1024);
|
tdmqUtil.createQueue(QUEUENAME_VIDEO_SYNCDATA_V2, 1024 * 1024);
|
|
//创建主题
|
tdmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO);
|
tdmqUtil.createTopic(TOPIC_VIDEO_INFO_CHANGE);
|
//订阅主题
|
try {
|
List<String> filters = new ArrayList<>();
|
filters.add("resource");
|
filters.add("category");
|
tdmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO), QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters);
|
tdmqUtil.subscribeTopic(TOPIC_VIDEO_INFO_CHANGE, "subscribe_" + StringUtil.Md5(TOPIC_VIDEO_INFO_CHANGE + "#" + QUEUENAME_VIDEO_SYNCDATA_V2), QUEUENAME_VIDEO_SYNCDATA_V2);
|
} catch (Exception e) {
|
}
|
}
|
|
public static CMQManager getInstance() {
|
if (cmqManager == null)
|
cmqManager = new CMQManager();
|
return cmqManager;
|
}
|
|
|
//添加专辑更新消息
|
public void addIqiyiAlbumUpdateMsg(Long id) {
|
tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
|
}
|
|
//消费专辑更新消息
|
public List<IqiyiAlbum2MQMsg> consumeIqiyiAlbumUpdateMsg(int count) {
|
List<IqiyiAlbum2MQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
IqiyiAlbum2MQMsg mm = new IqiyiAlbum2MQMsg();
|
mm.setHandler(msg.receiptHandle);
|
mm.setId(Long.parseLong(msg.msgBody));
|
list.add(mm);
|
}
|
return list;
|
}
|
|
//删除专辑更新消息
|
public void deleteIqiyiAlbumUpdateMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler);
|
}
|
}
|
|
|
//添加专辑更新消息
|
public void addFunTVAlbumUpdateMsg(String id) {
|
tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id);
|
}
|
|
//消费专辑更新消息
|
public List<FunTVAlbum2MQMsg> consumeFunTVAlbumUpdateMsg(int count) {
|
List<FunTVAlbum2MQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
FunTVAlbum2MQMsg mm = new FunTVAlbum2MQMsg();
|
mm.setHandler(msg.receiptHandle);
|
mm.setId(msg.msgBody);
|
list.add(mm);
|
}
|
return list;
|
}
|
|
|
//删除专辑更新消息
|
public void deleteFunTVAlbumUpdateMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler);
|
}
|
|
|
}
|
|
|
/**
|
* PPTV剧集更新
|
*/
|
|
//添加专辑更新消息
|
public void addPPTVSeriesUpdateMsg(PPTVMQMsg msg) {
|
tdmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_PPTV, new Gson().toJson(msg));
|
}
|
|
//消费专辑更新消息
|
public List<PPTVMQMsg> consumePPTVSeriesUpdateMsg(int count) {
|
List<PPTVMQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_PPTV);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
PPTVMQMsg mm = new Gson().fromJson(msg.msgBody, PPTVMQMsg.class);
|
mm.setHandler(msg.receiptHandle);
|
list.add(mm);
|
}
|
return list;
|
}
|
|
//删除专辑更新消息
|
public void deletePPTVSeriesUpdateMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_PPTV, handler);
|
}
|
}
|
|
|
/**
|
* 搜索引擎
|
* @param id
|
*/
|
|
/**
|
* 搜索引擎消息
|
*
|
* @param id
|
*/
|
public void addSolrMsg(String id) {
|
tdmqUtil.sendMsg(QUEUENAME_SOLR, id);
|
}
|
|
public List<SolrVideoMQMsg> consumeSolrMsg(int count) {
|
List<SolrVideoMQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_SOLR) : cmqUtil.recieveMsg(count, QUEUENAME_SOLR);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
SolrVideoMQMsg mm = new SolrVideoMQMsg();
|
mm.setHandler(msg.receiptHandle);
|
mm.setId(msg.msgBody);
|
list.add(mm);
|
}
|
return list;
|
}
|
|
|
public void deleteSolrMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_SOLR, handler);
|
}
|
}
|
|
|
/**
|
* 删除视频资源
|
*
|
* @param videoId
|
*/
|
|
public void addVideoResourceDeleteMsg(String videoId) {
|
tdmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
|
}
|
|
public List<CMQResult> consumeVideoResourceDeleteMsg(int count) {
|
List<CMQResult> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE) : cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
CMQResult mm = new CMQResult();
|
mm.setHandler(msg.receiptHandle);
|
mm.setData(msg.msgBody);
|
list.add(mm);
|
}
|
return list;
|
}
|
|
|
public void deleteVideoResourceDeleteMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler);
|
}
|
}
|
|
|
/**
|
* 视频附加信息改变
|
*/
|
public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) {
|
List<String> list = new ArrayList<>();
|
list.add(msg.getType());
|
tdmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, list, new Gson().toJson(msg));
|
}
|
|
|
public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) {
|
List<CMQResult> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
CMQResult mm = new CMQResult();
|
mm.setHandler(msg.receiptHandle);
|
mm.setData(new Gson().fromJson(msg.msgBody, VideoExtraInfoChangeMQMsg.class));
|
list.add(mm);
|
}
|
return list;
|
}
|
|
public void deleteUpdateVideoExtraInfoMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler);
|
}
|
}
|
|
|
/**
|
* 全网搜
|
*
|
* @param msg
|
*/
|
|
public void addInternetSearchVideoUpdateMsg(InternetSearchVideoMQMsg msg) {
|
tdmqUtil.sendMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, new Gson().toJson(msg));
|
}
|
|
public List<InternetSearchVideoMQMsg> consumeInternetSearchVideoUpdateMsg(int count) {
|
Gson gson = new Gson();
|
List<InternetSearchVideoMQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
InternetSearchVideoMQMsg mm = gson.fromJson(msg.msgBody, InternetSearchVideoMQMsg.class);
|
mm.setHandler(msg.receiptHandle);
|
list.add(mm);
|
}
|
return list;
|
}
|
|
|
public void deleteInternetSearchVideoUpdateMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_UPDATE_INTERNET_SEARCH, handler);
|
}
|
}
|
|
/**
|
* 视频来源更新
|
*
|
* @param msg
|
*/
|
public void addUpdateResourceVideoMsg(UpdateResourceVideoMQMsg msg) {
|
tdmqUtil.sendMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, new Gson().toJson(msg));
|
}
|
|
public void deleteUpdateResourceVideoMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_UPDATE_RESOURCE_VIDEO, handler);
|
}
|
}
|
|
|
public List<UpdateResourceVideoMQMsg> consumeUpdateResourceVideoMsg(int count) {
|
Gson gson = new Gson();
|
List<UpdateResourceVideoMQMsg> list = new ArrayList<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_RESOURCE_VIDEO);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
try {
|
UpdateResourceVideoMQMsg mm = gson.fromJson(msg.msgBody, UpdateResourceVideoMQMsg.class);
|
mm.setHandler(msg.receiptHandle);
|
list.add(mm);
|
} catch (Exception e) {
|
e.printStackTrace();
|
JSONObject jsonObject = JSONObject.fromObject(msg.msgBody);
|
jsonObject.remove("date");
|
UpdateResourceVideoMQMsg mm = gson.fromJson(jsonObject.toString(), UpdateResourceVideoMQMsg.class);
|
mm.setHandler(msg.receiptHandle);
|
list.add(mm);
|
}
|
}
|
return list;
|
}
|
|
//视频数据更改
|
public void addVideoDataChanged(VideoDataChangeMQMsg msg) {
|
tdmqUtil.publishTopicMessage(TOPIC_VIDEO_INFO_CHANGE, new Gson().toJson(msg));
|
}
|
|
|
public Map<String, VideoDataChangeMQMsg> consumeVideoSyncV2DataMsg(int count) {
|
Gson gson = new Gson();
|
Map<String, VideoDataChangeMQMsg> map = new HashMap<>();
|
List<Message> msgList = TDMQ_CONSUMER ? tdmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH) : cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_INTERNET_SEARCH);
|
if (msgList != null)
|
for (Message msg : msgList) {
|
map.put(msg.receiptHandle, gson.fromJson(msg.msgBody, VideoDataChangeMQMsg.class));
|
}
|
return map;
|
}
|
|
|
public void deleteVideoSyncV2DataMsg(String handler) {
|
if (TDMQ_CONSUMER) {
|
tdmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
|
} else {
|
cmqUtil.deleteMsg(QUEUENAME_VIDEO_SYNCDATA_V2, handler);
|
}
|
}
|
|
|
}
|