| | |
| | | package com.yeshi.buwan.util.mq; |
| | | |
| | | import com.google.gson.Gson; |
| | | import com.qcloud.cmq.Message; |
| | | import com.yeshi.buwan.dto.mq.IqiyiAlbum2MQMsg; |
| | | import com.yeshi.buwan.dto.mq.SolrVideoMQMsg; |
| | | import com.yeshi.buwan.dto.mq.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | |
| | | public static String QUEUENAME_SOLR = "buwan-solr"; |
| | | //视频更新-爱奇艺2 |
| | | public static String QUEUENAME_VIDEO_UPDATE_IQIYI_2 = "buwan-video-update-iqiyi2"; |
| | | //视频更新-风行2 |
| | | public static String QUEUENAME_VIDEO_UPDATE_FUNTV_2 = "buwan-video-update-funtv2"; |
| | | |
| | | //删除视频资源 |
| | | public static String QUEUENAME_VIDEO_RESOURCE_DELETE = "buwan-video-resource-delete"; |
| | | |
| | | //更新视频附加信息 |
| | | public static String QUEUENAME_UPDATE_VIDEO_EXTRAINFO = "buwan-video-video-update-extrainfo"; |
| | | |
| | | |
| | | private static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change"; |
| | | |
| | | |
| | | static { |
| | |
| | | // 最大消息为1M |
| | | cmqUtil.createQueue(QUEUENAME_SOLR, 1024 * 1024); |
| | | cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_IQIYI_2, 1024 * 1024); |
| | | cmqUtil.createQueue(QUEUENAME_VIDEO_UPDATE_FUNTV_2, 1024 * 1024); |
| | | cmqUtil.createQueue(QUEUENAME_VIDEO_RESOURCE_DELETE, 1024 * 1024); |
| | | cmqUtil.createQueue(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, 1024 * 1024); |
| | | |
| | | //创建主题 |
| | | cmqUtil.createTopic(TOPIC_VIDEO_EXTRAINFO); |
| | | //订阅主题 |
| | | try { |
| | | List<String> filters = new ArrayList<>(); |
| | | filters.add("resource"); |
| | | filters.add("category"); |
| | | cmqUtil.subscribeTopic(TOPIC_VIDEO_EXTRAINFO, TOPIC_VIDEO_EXTRAINFO + "#" + QUEUENAME_UPDATE_VIDEO_EXTRAINFO, QUEUENAME_UPDATE_VIDEO_EXTRAINFO, filters); |
| | | } catch (Exception e) { |
| | | } |
| | | } |
| | | |
| | | public static CMQManager getInstance() { |
| | |
| | | |
| | | |
| | | //添加专辑更新消息 |
| | | public void addAlbumUpdateMsg(Long id) { |
| | | public void addIqiyiAlbumUpdateMsg(Long id) { |
| | | cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + ""); |
| | | } |
| | | |
| | | //消费专辑更新消息 |
| | | public List<IqiyiAlbum2MQMsg> consumeAlbumUpdateMsg(int count) { |
| | | public List<IqiyiAlbum2MQMsg> consumeIqiyiAlbumUpdateMsg(int count) { |
| | | List<IqiyiAlbum2MQMsg> list = new ArrayList<>(); |
| | | List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_IQIYI_2); |
| | | if (msgList != null) |
| | |
| | | } |
| | | |
| | | //删除专辑更新消息 |
| | | public void deleteAlbumUpdateMsg(String handler) { |
| | | public void deleteIqiyiAlbumUpdateMsg(String handler) { |
| | | cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_IQIYI_2, handler); |
| | | } |
| | | |
| | | |
| | | //添加专辑更新消息 |
| | | public void addFunTVAlbumUpdateMsg(String id) { |
| | | cmqUtil.sendMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, id); |
| | | } |
| | | |
| | | //消费专辑更新消息 |
| | | public List<FunTVAlbum2MQMsg> consumeFunTVAlbumUpdateMsg(int count) { |
| | | List<FunTVAlbum2MQMsg> list = new ArrayList<>(); |
| | | List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_UPDATE_FUNTV_2); |
| | | if (msgList != null) |
| | | for (Message msg : msgList) { |
| | | FunTVAlbum2MQMsg mm = new FunTVAlbum2MQMsg(); |
| | | mm.setHandler(msg.receiptHandle); |
| | | mm.setId(msg.msgBody); |
| | | list.add(mm); |
| | | } |
| | | return list; |
| | | } |
| | | |
| | | //删除专辑更新消息 |
| | | public void deleteFunTVAlbumUpdateMsg(String handler) { |
| | | cmqUtil.deleteMsg(QUEUENAME_VIDEO_UPDATE_FUNTV_2, handler); |
| | | } |
| | | |
| | | |
| | |
| | | cmqUtil.deleteMsg(QUEUENAME_SOLR, handler); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 删除视频资源 |
| | | * |
| | | * @param videoId |
| | | */ |
| | | |
| | | public void addVideoResourceDeleteMsg(String videoId) { |
| | | cmqUtil.sendMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, videoId); |
| | | } |
| | | |
| | | public List<CMQResult> consumeVideoResourceDeleteMsg(int count) { |
| | | List<CMQResult> list = new ArrayList<>(); |
| | | List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_VIDEO_RESOURCE_DELETE); |
| | | if (msgList != null) |
| | | for (Message msg : msgList) { |
| | | CMQResult mm = new CMQResult(); |
| | | mm.setHandler(msg.receiptHandle); |
| | | mm.setData(msg.msgBody); |
| | | list.add(mm); |
| | | } |
| | | return list; |
| | | } |
| | | |
| | | |
| | | public void deleteVideoResourceDeleteMsg(String handler) { |
| | | cmqUtil.deleteMsg(QUEUENAME_VIDEO_RESOURCE_DELETE, handler); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 视频附加信息改变 |
| | | */ |
| | | public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) { |
| | | List<String> list = new ArrayList<>(); |
| | | list.add(msg.getType()); |
| | | cmqUtil.publishTopicMessage(TOPIC_VIDEO_EXTRAINFO, new Gson().toJson(msg), list); |
| | | } |
| | | |
| | | |
| | | public List<CMQResult> consumeUpdateVideoExtraInfoMsg(int count) { |
| | | List<CMQResult> list = new ArrayList<>(); |
| | | List<Message> msgList = cmqUtil.recieveMsg(count, QUEUENAME_UPDATE_VIDEO_EXTRAINFO); |
| | | if (msgList != null) |
| | | for (Message msg : msgList) { |
| | | CMQResult mm = new CMQResult(); |
| | | mm.setHandler(msg.receiptHandle); |
| | | mm.setData(new Gson().fromJson(msg.msgBody, VideoExtraInfoChangeMQMsg.class)); |
| | | list.add(mm); |
| | | } |
| | | return list; |
| | | } |
| | | |
| | | public void deleteUpdateVideoExtraInfoMsg(String handler) { |
| | | cmqUtil.deleteMsg(QUEUENAME_UPDATE_VIDEO_EXTRAINFO, handler); |
| | | } |
| | | |
| | | |
| | | } |