admin
2024-10-17 b30fb8afd3cd6228bda9b182dc412bb3c8daf69c
CMQ转为Rabbitmq
2个文件已删除
25个文件已修改
17个文件已添加
2863 ■■■■ 已修改文件
pom.xml 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/controller/parser/HomeParser.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/job/video/FunTV2VideoUpdate.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/CategoryVideoService.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/ClearService.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/ResourceVideoService.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/VideoInfoService.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/VideoManager.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/juhe/FunTV2ServiceImpl.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/juhe/InternetSearchVideoServiceImpl.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/juhe/Iqiyi2ServiceImpl.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/juhe/IqiyiService.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/service/imp/juhe/PPTVServiceImpl.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/SpringContext.java 327 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/CMQManager.java 359 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/TDMQUtil.java 503 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/DelayMsgInfo.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/QueueHelloWorldListener.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitDelayConsumeFailConsumer.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitDelayConsumer.java 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqConfig.java 132 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqManager.java 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqMsgConsumeUtil.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqSenderUtil.java 161 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/SolrNewListener.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/UpdateIntenetSearchListener.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/UpdateResourceVideoListener.java 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoResourceDeleteListener.java 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoSyncDataV2Listener.java 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateExtrainfoListener.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateFuntv2Listener.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateIqiyi2Listener.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdatePPTVListener.java 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/yeshi/buwan/videos/iqiyi/util/IqiyiUtil2.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/consumer.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/env-dev/logback.xml 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/env-dev/rabbitmq.properties 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/env-pro/rabbitmq.properties 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/spring-rabbitmq-consumer.xml 53 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/spring-rabbitmq-producer.xml 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/spring.xml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/hxh/spring/test/DES.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/hxh/spring/test/Iqiyi2.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/hxh/spring/test/mq/MQTest.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml
@@ -416,14 +416,6 @@
        </dependency>
        <!-- 消息 -->
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
@@ -959,6 +951,33 @@
            <version>2.6.5</version>
        </dependency>
        <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.3.10</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-tx</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-context</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-beans</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
src/main/java/com/yeshi/buwan/controller/parser/HomeParser.java
@@ -13,15 +13,6 @@
import com.yeshi.buwan.dto.log.BaseLog;
import com.yeshi.buwan.dto.statistic.video.VideoDetailStatisticData;
import com.yeshi.buwan.exception.video.VideoPlayException;
import com.yeshi.buwan.service.inter.video.VideoResourceInfoMapService;
import com.yeshi.buwan.service.manager.APPManager;
import com.yeshi.buwan.videos.hanmi.HanmiUtil;
import com.yeshi.buwan.videos.mogotv.MogoTVUtil;
import com.yeshi.buwan.videos.pptv.PPTVApiUtil;
import com.yeshi.buwan.videos.pptv.PPTVUtil;
import com.yeshi.buwan.videos.pptv.entity.PPTVProgram;
import com.yeshi.buwan.videos.pptv.entity.PPTVSeries;
import com.yeshi.buwan.videos.pptv.entity.VideoPPTVMap;
import com.yeshi.buwan.service.imp.*;
import com.yeshi.buwan.service.inter.ad.DeviceAdStrategyService;
import com.yeshi.buwan.service.inter.juhe.InternetSearchVideoService;
@@ -29,10 +20,11 @@
import com.yeshi.buwan.service.inter.recommend.HomeRecommendSpecialService;
import com.yeshi.buwan.service.inter.recommend.HomeVideoService;
import com.yeshi.buwan.service.inter.search.SearchSpecialPositionMapService;
import com.yeshi.buwan.service.inter.video.VideoResourceInfoMapService;
import com.yeshi.buwan.service.inter.video.VideoWatchHistoryService;
import com.yeshi.buwan.service.manager.APPManager;
import com.yeshi.buwan.service.manager.VideoPlayStatisticManager;
import com.yeshi.buwan.service.manager.search.SolrShortVideoDataManager;
import com.yeshi.buwan.videos.tencent.TencentVideoUtil;
import com.yeshi.buwan.util.*;
import com.yeshi.buwan.util.JuHe.VideoResourceUtil;
import com.yeshi.buwan.util.annotation.RequireUid;
@@ -40,14 +32,22 @@
import com.yeshi.buwan.util.log.LoggerUtil;
import com.yeshi.buwan.util.log.UserActiveLogFactory;
import com.yeshi.buwan.util.log.VideoLogFactory;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.video.VideoDetailUtil;
import com.yeshi.buwan.util.video.VideoUtil;
import com.yeshi.buwan.util.zhibo.MeiNvZhiBoUtil;
import com.yeshi.buwan.videos.bilibili.BilibiliUtil;
import com.yeshi.buwan.videos.hanmi.HanmiUtil;
import com.yeshi.buwan.videos.mogotv.MogoTVUtil;
import com.yeshi.buwan.videos.pptv.PPTVApiUtil;
import com.yeshi.buwan.videos.pptv.PPTVUtil;
import com.yeshi.buwan.videos.pptv.entity.PPTVProgram;
import com.yeshi.buwan.videos.pptv.entity.PPTVSeries;
import com.yeshi.buwan.videos.pptv.entity.VideoPPTVMap;
import com.yeshi.buwan.videos.tencent.TencentVideoUtil;
import com.yeshi.buwan.videos.youku.YouKuUtil;
import com.yeshi.buwan.vo.AcceptData;
import com.yeshi.buwan.vo.video.VideoDetailVO;
import com.yeshi.buwan.videos.youku.YouKuUtil;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.commons.httpclient.HttpClient;
@@ -128,6 +128,9 @@
    @Resource
    private HanmiUtil hanmiUtil;
    @Resource
    private RabbitmqManager rabbitmqManager;
    private final Logger userPlayLogger = LoggerFactory.getLogger("videoPlayUser");
@@ -787,7 +790,7 @@
            }
            //视频没有来源
            if (StringUtil.isNullOrEmpty(resourceId)) {
                CMQManager.getInstance().addVideoResourceDeleteMsg(videoId);
               rabbitmqManager.addVideoResourceDeleteMsg(videoId);
                out.print(JsonUtil.loadFalseJson("视频已下线"));
                return;
            }
@@ -1013,7 +1016,7 @@
            }
            //视频没有来源
            if (StringUtil.isNullOrEmpty(resourceId)) {
                CMQManager.getInstance().addVideoResourceDeleteMsg(videoId);
               rabbitmqManager.addVideoResourceDeleteMsg(videoId);
                out.print(JsonUtil.loadFalseJson("视频已下线"));
                return;
            }
src/main/java/com/yeshi/buwan/job/video/FunTV2VideoUpdate.java
@@ -2,19 +2,13 @@
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.yeshi.buwan.videos.funtv.entity.FunTVAlbum2;
import com.yeshi.buwan.videos.funtv.entity.FunTVShortVideo2;
import com.yeshi.buwan.videos.funtv.entity.FunTVVideo2;
import com.yeshi.buwan.service.inter.juhe.FunTV2Service;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.vo.video.funtv.Funtv2ResultVO;
import com.yeshi.buwan.videos.funtv.entity.FunTVShortVideo2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
src/main/java/com/yeshi/buwan/service/imp/CategoryVideoService.java
@@ -6,11 +6,12 @@
import com.yeshi.buwan.domain.VideoType;
import com.yeshi.buwan.dto.mq.VideoDataChangeMQMsg;
import com.yeshi.buwan.dto.mq.VideoExtraInfoChangeMQMsg;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@@ -20,6 +21,9 @@
    @Autowired
    private CategoryVideoDao categoryVideoDao;
    @Resource
    private RabbitmqManager rabbitmqManager;
    public void addCategoryVideo(String videoId, long categoryId) {
        List<CategoryVideo> list = categoryVideoDao.listByVideoIdAndCategoryId(videoId, categoryId);
        if (list == null || list.size() == 0) {
@@ -27,8 +31,8 @@
            cv.setVideo(new VideoInfo(videoId));
            cv.setVideoType(new VideoType(categoryId));
            categoryVideoDao.save(cv);
            CMQManager.getInstance().addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_CATEGORY, videoId, VideoExtraInfoChangeMQMsg.ACTION_ADD));
            CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_CATEGORY, videoId, VideoDataChangeMQMsg.ACTION_ADD));
            rabbitmqManager.addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_CATEGORY, videoId, VideoExtraInfoChangeMQMsg.ACTION_ADD));
            rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_CATEGORY, videoId, VideoDataChangeMQMsg.ACTION_ADD));
        }
    }
src/main/java/com/yeshi/buwan/service/imp/ClearService.java
@@ -2,13 +2,12 @@
import com.yeshi.buwan.dao.CategoryVideoDao;
import com.yeshi.buwan.dao.VideoInfoDao;
import com.yeshi.buwan.domain.CategoryVideo;
import com.yeshi.buwan.domain.VideoDetailInfo;
import com.yeshi.buwan.dto.mq.VideoDataChangeMQMsg;
import com.yeshi.buwan.service.imp.juhe.IqiyiService;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.TimeUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import net.sf.json.JSONObject;
import org.hibernate.HibernateException;
import org.hibernate.Session;
@@ -37,6 +36,9 @@
    @Resource
    private CategoryVideoDao categoryVideoDao;
    @Resource
    private RabbitmqManager rabbitmqManager;
    /**
@@ -122,7 +124,7 @@
                                session.createSQLQuery("delete from wk_video_video where " + ors).executeUpdate();
                                session.createSQLQuery("delete from wk_category_video where " + cvors).executeUpdate();
                                if (videoIds.size() > 0) {
                                    CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, org.yeshi.utils.StringUtil.concat(videoIds, ","), VideoDataChangeMQMsg.ACTION_DELETE));
                                    rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, org.yeshi.utils.StringUtil.concat(videoIds, ","), VideoDataChangeMQMsg.ACTION_DELETE));
                                }
                            }
src/main/java/com/yeshi/buwan/service/imp/ResourceVideoService.java
@@ -8,7 +8,7 @@
import com.yeshi.buwan.domain.VideoResourceMapExtraInfo;
import com.yeshi.buwan.dto.mq.VideoDataChangeMQMsg;
import com.yeshi.buwan.dto.mq.VideoExtraInfoChangeMQMsg;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import org.hibernate.HibernateException;
import org.hibernate.Query;
import org.hibernate.Session;
@@ -31,6 +31,9 @@
    @Resource
    private VideoResourceMapExtraInfoDao videoResourceMapExtraInfoDao;
    @Resource
    private RabbitmqManager rabbitmqManager;
    @SuppressWarnings("unchecked")
@@ -114,16 +117,16 @@
    }
    /**
     * @author hxh
     * @return java.util.List<com.yeshi.buwan.domain.ResourceVideo>
     * @author hxh
     * @description 根据ResourceId查询
     * @date 10:58 2024/8/16
     * @param: resourceId
     * @param: page
     * @param: pageSize
     * @return java.util.List<com.yeshi.buwan.domain.ResourceVideo>
     **/
    public List<ResourceVideo> listResourceVideo(Long resourceId,int page, int pageSize){
        return    resourceVideoDao.list("from ResourceVideo rv where rv.resource.id=? ", (page-1)*pageSize, pageSize,new Serializable[]{resourceId+""});
    public List<ResourceVideo> listResourceVideo(Long resourceId, int page, int pageSize) {
        return resourceVideoDao.list("from ResourceVideo rv where rv.resource.id=? ", (page - 1) * pageSize, pageSize, new Serializable[]{resourceId + ""});
    }
@@ -151,8 +154,8 @@
            rv.setResource(new VideoResource(resourceId + ""));
            rv.setVideo(new VideoInfo(videoId + ""));
            resourceVideoDao.save(rv);
            CMQManager.getInstance().addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_RESOURCE, videoId, VideoExtraInfoChangeMQMsg.ACTION_ADD));
            CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE,videoId, VideoDataChangeMQMsg.ACTION_ADD));
            rabbitmqManager.addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_RESOURCE, videoId, VideoExtraInfoChangeMQMsg.ACTION_ADD));
            rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, videoId, VideoDataChangeMQMsg.ACTION_ADD));
        }
    }
@@ -161,8 +164,8 @@
        ResourceVideo resourceVideo = resourceVideoDao.selectByVideoIdAndResourceId(videoId, resourceId + "");
        if (resourceVideo != null) {
            resourceVideoDao.delete(resourceVideo);
            CMQManager.getInstance().addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_RESOURCE, videoId, VideoExtraInfoChangeMQMsg.ACTION_DELETE));
            CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE,videoId, VideoDataChangeMQMsg.ACTION_DELETE));
            rabbitmqManager.addVideoExtraInfoChanged(new VideoExtraInfoChangeMQMsg(VideoExtraInfoChangeMQMsg.TYPE_RESOURCE, videoId, VideoExtraInfoChangeMQMsg.ACTION_DELETE));
            rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, videoId, VideoDataChangeMQMsg.ACTION_DELETE));
        }
        VideoResourceMapExtraInfo extraInfo = new VideoResourceMapExtraInfo(videoId, Long.parseLong(resourceId));
src/main/java/com/yeshi/buwan/service/imp/VideoInfoService.java
@@ -6,7 +6,7 @@
import com.yeshi.buwan.domain.entity.PlayUrl;
import com.yeshi.buwan.dto.mq.VideoDataChangeMQMsg;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.video.VideoConstant;
import org.hibernate.HibernateException;
import org.hibernate.SQLQuery;
@@ -40,6 +40,11 @@
    @Resource
    private ClearService clearService;
    @Resource
    private RabbitmqManager rabbitmqManager;
    @Transactional
    public VideoInfo getVideoInfo(String vid) {
@@ -80,12 +85,12 @@
    @Transactional
    public void saveWithCategoryAndResource(VideoInfo videoInfo) {
        videoInfoDao.save(videoInfo);
        CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        CategoryVideo ca = new CategoryVideo();
        ca.setVideo(videoInfo);
        ca.setVideoType(videoInfo.getVideoType());
        categoryVideoDao.create(ca);
        CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_CATEGORY, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
       rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_CATEGORY, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        List<VideoResource> resourceList = videoInfo.getResourceList();
        ResourceVideo rs = null;
        for (VideoResource videoResource : resourceList) {
@@ -94,21 +99,21 @@
            rs.setVideo(videoInfo);
            resourceVideoService.save(rs);
        }
        CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
    }
    @Transactional
    public Serializable save(VideoInfo videoInfo) {
        Serializable id = videoInfoDao.save(videoInfo);
        CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_ADD));
        return id;
    }
    @Transactional
    public void update(VideoInfo videoInfo) {
        videoInfoDao.update(videoInfo);
        CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_UPDATE));
        rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoInfo.getId(), VideoDataChangeMQMsg.ACTION_UPDATE));
    }
    public List<VideoDetailInfo> getVideoDetailList(String videoid, VideoResource vr, int page, int pageSize) {
@@ -332,7 +337,7 @@
        }
        if (needUpdateSolr) {
            CMQManager.getInstance().addSolrMsg(videoInfo.getId());
            rabbitmqManager.addSolrMsg(videoInfo.getId());
        }
src/main/java/com/yeshi/buwan/service/imp/VideoManager.java
@@ -7,7 +7,7 @@
import com.yeshi.buwan.service.inter.juhe.AlbumVideoMapService;
import com.yeshi.buwan.util.LogUtil;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.springframework.orm.hibernate4.HibernateCallback;
@@ -411,6 +411,9 @@
    @Resource
    private AlbumVideoMapService albumVideoMapService;
    @Resource
    private RabbitmqManager rabbitmqManager;
    /**
     * 删除视频
     *
@@ -422,7 +425,7 @@
        //清除MongoDB相关依赖
        albumVideoMapService.deleteByVideoId(videoId);
        //更新搜索引擎数据
        CMQManager.getInstance().addSolrMsg(videoId);
        rabbitmqManager.addSolrMsg(videoId);
        videoInfoDao.delete(new VideoInfo(videoId));
    }
src/main/java/com/yeshi/buwan/service/imp/juhe/FunTV2ServiceImpl.java
@@ -12,16 +12,16 @@
import com.yeshi.buwan.domain.entity.PlayUrl;
import com.yeshi.buwan.domain.system.DetailSystemConfig;
import com.yeshi.buwan.domain.video.AlbumVideoMap;
import com.yeshi.buwan.service.imp.*;
import com.yeshi.buwan.service.inter.juhe.FunTV2Service;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.video.VideoConstant;
import com.yeshi.buwan.videos.funtv.FunTVUtil2;
import com.yeshi.buwan.videos.funtv.entity.FunTVAlbum2;
import com.yeshi.buwan.videos.funtv.entity.FunTVShortVideo2;
import com.yeshi.buwan.videos.funtv.entity.FunTVVideo2;
import com.yeshi.buwan.videos.funtv.entity.VideoFunTV2;
import com.yeshi.buwan.service.imp.*;
import com.yeshi.buwan.service.inter.juhe.FunTV2Service;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.video.VideoConstant;
import com.yeshi.buwan.vo.AcceptData;
import net.sf.json.JSONObject;
import org.springframework.stereotype.Service;
@@ -29,7 +29,9 @@
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Service
public class FunTV2ServiceImpl implements FunTV2Service {
@@ -66,6 +68,9 @@
    @Resource
    private DetailSystemConfigService detailSystemConfigService;
    @Resource
    private RabbitmqManager rabbitmqManager;
    @Override
@@ -147,7 +152,7 @@
        resourceVideoService.addVideoResource(newVideoInfo.getId(), FunTVUtil2.RESOURCE_ID + "");
        //添加视频分类映射
        categoryVideoService.addCategoryVideo(newVideoInfo.getId(), newVideoInfo.getVideoType().getId());
        CMQManager.getInstance().addSolrMsg(newVideoInfo.getId());
        rabbitmqManager.addSolrMsg(newVideoInfo.getId());
    }
    @Override
src/main/java/com/yeshi/buwan/service/imp/juhe/InternetSearchVideoServiceImpl.java
@@ -7,7 +7,7 @@
import com.yeshi.buwan.exception.ParamsException;
import com.yeshi.buwan.service.inter.juhe.InternetSearchVideoService;
import com.yeshi.buwan.service.inter.system.SystemConfigService;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
@@ -27,6 +27,9 @@
    @Resource
    private SystemConfigService systemConfigService;
    @Resource
    private RabbitmqManager rabbitmqManager;
    private boolean isCanSave(InternetSearchVideo video) {
        SystemConfig systemConfig = systemConfigService.getConfigByKeyCache("internetSearchInvalidKeyForSave");
@@ -51,7 +54,7 @@
        InternetSearchVideo oldVideo = internetSearchVideoDao.get(video.getId());
        Set<Integer> resourceIds = new HashSet<>();
        if(oldVideo!=null) {
        if (oldVideo != null) {
            resourceIds.addAll(listResourceId(oldVideo));
        }
        int resourceId = Integer.parseInt(video.getResourceIds());
@@ -69,7 +72,7 @@
        InternetSearchVideoMQMsg msg = new InternetSearchVideoMQMsg();
        msg.setId(video.getId());
        msg.setResourceId(resourceId);
        CMQManager.getInstance().addInternetSearchVideoUpdateMsg(msg);
        rabbitmqManager.addInternetSearchVideoUpdateMsg(msg);
        return video;
    }
@@ -137,19 +140,19 @@
    @Override
    public void removeResourceId(String id, Integer resourceId) {
        InternetSearchVideo video = internetSearchVideoDao.get(id);
        if(video==null){
            return ;
        if (video == null) {
            return;
        }
        Set<Integer> resourceIds =  listResourceId(video);
        if(resourceIds.contains(resourceId)){
        Set<Integer> resourceIds = listResourceId(video);
        if (resourceIds.contains(resourceId)) {
            //移除
            resourceIds.remove(resourceId);
        }
        if(resourceIds.size()<=0){
        if (resourceIds.size() <= 0) {
            // 移除数据
            internetSearchVideoDao.deleteByPrimaryKey(id);
        }else{
            InternetSearchVideo update=new InternetSearchVideo();
        } else {
            InternetSearchVideo update = new InternetSearchVideo();
            update.setId(id);
            Integer[] rids = new Integer[resourceIds.size()];
            resourceIds.toArray(rids);
@@ -159,6 +162,6 @@
        InternetSearchVideoMQMsg msg = new InternetSearchVideoMQMsg();
        msg.setId(video.getId());
        msg.setResourceId(resourceId);
        CMQManager.getInstance().addInternetSearchVideoUpdateMsg(msg);
        rabbitmqManager.addInternetSearchVideoUpdateMsg(msg);
    }
}
src/main/java/com/yeshi/buwan/service/imp/juhe/Iqiyi2ServiceImpl.java
@@ -11,11 +11,6 @@
import com.yeshi.buwan.domain.video.AlbumVideoMap;
import com.yeshi.buwan.dto.mq.UpdateResourceVideoMQMsg;
import com.yeshi.buwan.exception.video.IqiyiVideoSolrException;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi2;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import com.yeshi.buwan.query.Iqiyi2AlbumQuery;
import com.yeshi.buwan.service.imp.CategoryVideoService;
import com.yeshi.buwan.service.imp.ResourceVideoService;
@@ -29,10 +24,14 @@
import com.yeshi.buwan.util.ThreadUtil;
import com.yeshi.buwan.util.TimeUtil;
import com.yeshi.buwan.util.log.VideoLogFactory;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi2;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.annotation.Transient;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
@@ -84,6 +83,11 @@
    @Resource
    private SolrAlbumVideoDataManager solrAlbumVideoDataManager;
    @Resource
    private RabbitmqManager rabbitmqManager;
    public List<VideoDetailInfo> getVideoDetailList(String videoId, int page, int pageSize) {
        //查询专辑
        VideoIqiyi2 videoIqiyi2 = videoIqiyi2Dao.selectByVideoId(Long.parseLong(videoId));
@@ -95,7 +99,7 @@
        ThreadUtil.run(new Runnable() {
            @Override
            public void run() {
                CMQManager.getInstance().addUpdateResourceVideoMsg(new UpdateResourceVideoMQMsg(videoIqiyi2.getIqiyiId() + "", IqiyiUtil2.RESOURCE_ID, new Date()));
                rabbitmqManager.addUpdateResourceVideoMsg(new UpdateResourceVideoMQMsg(videoIqiyi2.getIqiyiId() + "", IqiyiUtil2.RESOURCE_ID, new Date()));
            }
        });
//        }
@@ -277,7 +281,7 @@
        //添加视频分类映射
        categoryVideoService.addCategoryVideo(newVideoInfo.getId(), newVideoInfo.getVideoType().getId());
        CMQManager.getInstance().addSolrMsg(newVideoInfo.getId());
        rabbitmqManager.addSolrMsg(newVideoInfo.getId());
    }
    @Override
src/main/java/com/yeshi/buwan/service/imp/juhe/IqiyiService.java
@@ -1,44 +1,31 @@
package com.yeshi.buwan.service.imp.juhe;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Resource;
import com.yeshi.buwan.dao.ResourceVideoDao;
import com.yeshi.buwan.dao.juhe.iqiyi.IqiyiVideoInfoDao;
import com.yeshi.buwan.dao.juhe.iqiyi.VideoIqiyiDao;
import com.yeshi.buwan.domain.*;
import com.yeshi.buwan.domain.push.VideoPushHistory;
import com.yeshi.buwan.dto.mq.VideoDataChangeMQMsg;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.log.LogHelper;
import com.yeshi.buwan.service.imp.StatisticsService;
import com.yeshi.buwan.service.imp.push.PushService;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.TimeUtil;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiVideoInfo;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil;
import net.sf.json.JSONArray;
import org.apache.log4j.Logger;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.springframework.orm.hibernate4.HibernateCallback;
import org.springframework.stereotype.Service;
import com.yeshi.buwan.dao.ResourceVideoDao;
import com.yeshi.buwan.dao.juhe.iqiyi.IqiyiVideoInfoDao;
import com.yeshi.buwan.dao.juhe.iqiyi.VideoIqiyiDao;
import com.yeshi.buwan.domain.AdminInfo;
import com.yeshi.buwan.domain.CategoryVideo;
import com.yeshi.buwan.domain.ResourceVideo;
import com.yeshi.buwan.domain.VideoDetailInfo;
import com.yeshi.buwan.domain.VideoInfo;
import com.yeshi.buwan.domain.VideoResource;
import com.yeshi.buwan.domain.VideoType;
import com.yeshi.buwan.domain.push.VideoPushHistory;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiVideoInfo;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil;
import com.yeshi.buwan.log.LogHelper;
import com.yeshi.buwan.service.imp.StatisticsService;
import com.yeshi.buwan.service.imp.push.PushService;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.TimeUtil;
import net.sf.json.JSONArray;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.*;
@Service
public class IqiyiService {
@@ -54,6 +41,9 @@
    @Resource
    private ResourceVideoDao resourceVideoDao;
    @Resource
    private RabbitmqManager rabbitmqManager;
    static {
@@ -292,7 +282,7 @@
                            String id = "";
                            if (same == null) {
                                id = session.save(vi).toString();
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, id, VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, id, VideoDataChangeMQMsg.ACTION_ADD));
                                // if(id!=null)
                                // session.createSQLQuery(String.format("CALL
                                // copyvideotemp(%s)",id)).executeUpdate();
@@ -323,7 +313,7 @@
                                }
                                session.update(same);// 更新原来的视频
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, same.getId(), VideoDataChangeMQMsg.ACTION_UPDATE));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, same.getId(), VideoDataChangeMQMsg.ACTION_UPDATE));
                                Object obj = session
                                        .createQuery(
@@ -336,7 +326,7 @@
                                    rv.setResource(new VideoResource("13"));
                                    rv.setVideo(new VideoInfo(same.getId()));
                                    session.persist(rv);
                                    CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                    rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                }
                            }
@@ -352,7 +342,7 @@
                                rv.setResource(new VideoResource("13"));
                                rv.setVideo(new VideoInfo(id));
                                session.persist(rv);
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                            }
                            VideoIqiyi vii = new VideoIqiyi();
@@ -402,7 +392,7 @@
                            }
                            session.update(list.get(0).getVideo());
                            vid = list.get(0).getVideo().getId();
                            CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, vid, VideoDataChangeMQMsg.ACTION_UPDATE));
                            rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, vid, VideoDataChangeMQMsg.ACTION_UPDATE));
                            Object obj = session
                                    .createQuery(
@@ -416,7 +406,7 @@
                                rv.setResource(new VideoResource("13"));
                                rv.setVideo(new VideoInfo(list.get(0).getVideo().getId()));
                                session.persist(rv);
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                            }
                        }
                        // 更新tag
@@ -443,7 +433,7 @@
                            if (same == null) {
                                vi.setVideocount(videoDetailCount);
                                videoid = session.save(vi).toString();
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoid, VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoid, VideoDataChangeMQMsg.ACTION_ADD));
                            } else
                                videoid = same.getId();
@@ -464,7 +454,7 @@
                                rv.setResource(new VideoResource("13"));
                                rv.setVideo(new VideoInfo(videoid));
                                session.persist(rv);
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                            }
                            if (same != null)// 更新内容
@@ -489,7 +479,7 @@
                                                + "");
                                session.update(same);
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoid, VideoDataChangeMQMsg.ACTION_UPDATE));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_INFO, videoid, VideoDataChangeMQMsg.ACTION_UPDATE));
                                obj = session
                                        .createQuery(
@@ -503,7 +493,7 @@
                                    rv.setResource(new VideoResource("13"));
                                    rv.setVideo(same);
                                    session.persist(rv);
                                    CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                    rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                }
                            }
@@ -567,7 +557,7 @@
                                rv.setResource(new VideoResource("13"));
                                rv.setVideo(list.get(0).getVideo());
                                session.persist(rv);
                                CMQManager.getInstance().addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                                rabbitmqManager.addVideoDataChanged(new VideoDataChangeMQMsg(VideoDataChangeMQMsg.TYPE_VIDEO_RESOURCE, rv.getVideo().getId(), VideoDataChangeMQMsg.ACTION_ADD));
                            }
                            vid = list.get(0).getVideo().getId();
                        }
src/main/java/com/yeshi/buwan/service/imp/juhe/PPTVServiceImpl.java
@@ -12,16 +12,16 @@
import com.yeshi.buwan.domain.VideoType;
import com.yeshi.buwan.domain.video.AlbumVideoMap;
import com.yeshi.buwan.dto.mq.PPTVMQMsg;
import com.yeshi.buwan.videos.pptv.PPTVApiUtil;
import com.yeshi.buwan.videos.pptv.PPTVQuery;
import com.yeshi.buwan.videos.pptv.PPTVUtil;
import com.yeshi.buwan.videos.pptv.entity.*;
import com.yeshi.buwan.service.imp.CategoryVideoService;
import com.yeshi.buwan.service.imp.ResourceVideoService;
import com.yeshi.buwan.service.imp.VideoInfoService;
import com.yeshi.buwan.service.imp.VideoTypeService;
import com.yeshi.buwan.service.inter.juhe.PPTVService;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.videos.pptv.PPTVApiUtil;
import com.yeshi.buwan.videos.pptv.PPTVQuery;
import com.yeshi.buwan.videos.pptv.PPTVUtil;
import com.yeshi.buwan.videos.pptv.entity.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
@@ -57,6 +57,9 @@
    @Resource
    private VideoResourceMapExtraInfoDao videoResourceMapExtraInfoDao;
    @Resource
    private RabbitmqManager rabbitmqManager;
    @Override
@@ -157,7 +160,7 @@
            resourceVideoService.addVideoResource(newVideoInfo.getId(), PPTVUtil.RESOURCE_ID + "");
            //添加视频分类映射
            categoryVideoService.addCategoryVideo(newVideoInfo.getId(), videoType.getId());
            CMQManager.getInstance().addSolrMsg(newVideoInfo.getId());
            rabbitmqManager.addSolrMsg(newVideoInfo.getId());
        }
    }
@@ -237,7 +240,7 @@
                    series.setCreateTime(new Date());
                    pptvSeriesDao.save(series);
                    //处理新增/改变的剧集信息
                    CMQManager.getInstance().addPPTVSeriesUpdateMsg(new PPTVMQMsg(series.getInfoID(), PPTVMQMsg.TYPE_ADD_OR_UPDATE));
                    rabbitmqManager.addPPTVSeriesUpdateMsg(new PPTVMQMsg(series.getInfoID(), PPTVMQMsg.TYPE_ADD_OR_UPDATE));
                }
                System.out.println("addorupdate:" + series.getName());
@@ -261,7 +264,7 @@
        for (String id : infoIds) {
            //处理删除的剧集信息
            CMQManager.getInstance().addPPTVSeriesUpdateMsg(new PPTVMQMsg(id, PPTVMQMsg.TYPE_DELETE));
            rabbitmqManager.addPPTVSeriesUpdateMsg(new PPTVMQMsg(id, PPTVMQMsg.TYPE_DELETE));
        }
src/main/java/com/yeshi/buwan/util/SpringContext.java
@@ -1,28 +1,6 @@
package com.yeshi.buwan.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.yeshi.buwan.dao.juhe.iqiyi.VideoIqiyi2Dao;
import com.yeshi.buwan.domain.ResourceVideo;
import com.yeshi.buwan.domain.VideoInfo;
import com.yeshi.buwan.domain.VideoResource;
import com.yeshi.buwan.domain.video.InternetSearchVideo;
import com.yeshi.buwan.dto.mq.*;
import com.yeshi.buwan.videos.funtv.entity.FunTVAlbum2;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi2;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import com.yeshi.buwan.videos.pptv.entity.PPTVSeries;
import com.yeshi.buwan.service.imp.JobThreadExecutorServiceImpl;
import com.yeshi.buwan.service.imp.ResourceVideoService;
import com.yeshi.buwan.service.imp.VideoInfoService;
import com.yeshi.buwan.service.inter.juhe.FunTV2Service;
import com.yeshi.buwan.service.inter.juhe.InternetSearchVideoService;
import com.yeshi.buwan.service.inter.juhe.Iqiyi2Service;
import com.yeshi.buwan.service.inter.juhe.PPTVService;
import com.yeshi.buwan.service.manager.search.SolrAlbumVideoDataManager;
import com.yeshi.buwan.service.manager.search.SolrInternetSearchVideoDataManager;
import com.yeshi.buwan.util.mq.CMQManager;
import io.seata.rm.datasource.DataSourceProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +9,6 @@
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * 系统初始化
 *
@@ -42,27 +16,6 @@
 */
@Component
public class SpringContext implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private VideoInfoService videoInfoService;
    @Resource
    private SolrAlbumVideoDataManager solrDataManager;
    @Resource
    private Iqiyi2Service iqiyi2Service;
    @Resource
    private IqiyiUtil2 iqiyiUtil2;
    @Resource
    private FunTV2Service funTV2Service;
    @Resource
    private ResourceVideoService resourceVideoService;
    @Resource
    private PPTVService pptvService;
    private static boolean isInited = false;
@@ -92,286 +45,6 @@
    private void init() {
        logger.error("初始化");
//        if (!Constant.JobTasker) {
            doSolrJob();
            doAddIqiyi2Video();
            doAddFunTV2Video();
            doAddPPTVVideo();
            doDeleteVideoResource();
            doUpdateVideoExtraInfo();
            doInternetSearchVideoUpdateJob();
            doUpdateResourceVideoJob();
//        }
//        doUpdateResourceVideoJob();
    }
    private void doSolrJob() {
        for (int i = 0; i < 5; i++) {
            new JobThreadExecutorServiceImpl().run(new Runnable() {
                @Override
                public void run() {
                    logger.info("doSolrJob");
                    List<SolrVideoMQMsg> solrMsgList = CMQManager.getInstance().consumeSolrMsg(16);
                    if (solrMsgList != null)
                        for (SolrVideoMQMsg solrVideo : solrMsgList) {
                            try {
                                VideoInfo videoInfo = videoInfoService.getVideoInfo(solrVideo.getId());
                                if (videoInfo != null) {
                                    if ("1".equalsIgnoreCase(videoInfo.getShow())) {
                                        List<VideoResource> resourceList = new ArrayList<>();
                                        List<ResourceVideo> rvList = resourceVideoService.getResourceList(videoInfo.getId());
                                        if (rvList != null)
                                            for (ResourceVideo rv : rvList)
                                                resourceList.add(rv.getResource());
                                        videoInfo.setResourceList(resourceList);
                                        solrDataManager.saveOrUpdate(videoInfo);
                                    } else
                                        solrDataManager.deleteById(videoInfo.getId());
                                } else {//视频已经删除
                                    solrDataManager.deleteById(solrVideo.getId());
                                }
                                CMQManager.getInstance().deleteSolrMsg(solrVideo.getHandler());
                            } catch (Exception e) {
                                logger.error("添加到搜索引擎出错", e);
                            }
                        }
                }
            });
        }
    }
    @Resource
    private InternetSearchVideoService internetSearchVideoService;
    @Resource
    private SolrInternetSearchVideoDataManager solrInternetSearchVideoDataManager;
    private void doInternetSearchVideoUpdateJob() {
        for (int i = 0; i < 1; i++) {
            new JobThreadExecutorServiceImpl().run(new Runnable() {
                @Override
                public void run() {
                    List<InternetSearchVideoMQMsg> solrMsgList = CMQManager.getInstance().consumeInternetSearchVideoUpdateMsg(16);
                    if (solrMsgList != null)
                        for (InternetSearchVideoMQMsg solrVideo : solrMsgList) {
                            try {
                                InternetSearchVideo video = internetSearchVideoService.selectByPrimaryKey(solrVideo.getId());
                                if (video != null)
                                    solrInternetSearchVideoDataManager.saveOrUpdate(video);
                                else
                                    solrInternetSearchVideoDataManager.deleteById(video.getId());
                                CMQManager.getInstance().deleteInternetSearchVideoUpdateMsg(solrVideo.getHandler());
                            } catch (Exception e) {
                                logger.error("更新全网搜搜索引擎出错", e);
                            }
                        }
                }
            });
        }
    }
    @Resource
    private RedisManager redisManager;
    @Resource
    private VideoIqiyi2Dao videoIqiyi2Dao;
    private void doUpdateResourceVideoJob() {
        for (int i = 0; i < 1; i++) {
            new JobThreadExecutorServiceImpl().run(new Runnable() {
                @Override
                public void run() {
                    System.out.println("消费结束:doUpdateResourceVideoJob");
                    List<UpdateResourceVideoMQMsg> msgList = CMQManager.getInstance().consumeUpdateResourceVideoMsg(16);
                    System.out.println("消费结束:doUpdateResourceVideoJob");
                    if (msgList != null)
                        for (UpdateResourceVideoMQMsg videoMQMsg : msgList) {
                            try {
                                if (videoMQMsg != null) {
                                    String key = "resourcevideo-update-" + StringUtil.Md5(videoMQMsg.getResourceId() + "#" + videoMQMsg.getId());
                                    //可以更新
                                    if (StringUtil.isNullOrEmpty(redisManager.getCommonString(key))) {
                                        //12小时内不再更新
                                        redisManager.cacheCommonString(key, "1", 60 * 60 * 12);
                                        switch (videoMQMsg.getResourceId()) {
                                            case IqiyiUtil2
                                                    .RESOURCE_ID:
                                                //redis查询是否更新过
                                                IqiyiAlbum2 iqiyiAlbum2 = IqiYiNewAPI.getAlbumOrVideoDetail(Long.parseLong(videoMQMsg.getId()));
                                                if (iqiyiAlbum2 == null) {
                                                    VideoIqiyi2 videoIqiyi2 = videoIqiyi2Dao.selectByIqiyiId(Long.parseLong(videoMQMsg.getId()));
                                                    if (videoIqiyi2 != null) {
                                                        //下架
                                                        resourceVideoService.delete(videoIqiyi2.getVideoId() + "", videoMQMsg.getResourceId() + "");
                                                    }
                                                    //删除爱奇艺资源
                                                    iqiyi2Service.offlineIqiyiAlbum(Long.parseLong(videoMQMsg.getId()));
                                                } else {
                                                    //保存
                                                    iqiyiUtil2.syncByAid(iqiyiAlbum2.getId(), false);
                                                }
                                                break;
                                        }
                                    }
                                }
                            } catch (Exception e) {
                            } finally {
                                CMQManager.getInstance().deleteUpdateResourceVideoMsg(videoMQMsg.getHandler());
                            }
                        }
                }
            });
        }
    }
    private void doAddIqiyi2Video() {
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                logger.info("doAddIqiyi2Video");
                List<IqiyiAlbum2MQMsg> iqiyiAlbumMsgList = CMQManager.getInstance().consumeIqiyiAlbumUpdateMsg(16);
                if (iqiyiAlbumMsgList != null)
                    for (IqiyiAlbum2MQMsg iqiyiAlbum2MQMsg : iqiyiAlbumMsgList) {
                        try {
                            Long qikuID = iqiyiAlbum2MQMsg.getId();
                            IqiyiAlbum2 album2 = iqiyi2Service.selectAlbumById(qikuID);
                            logger.info("爱奇艺专辑:" + album2.getName());
                            if (album2 != null) {
                                iqiyi2Service.addToVideoInfo(album2);
                            }
                            CMQManager.getInstance().deleteIqiyiAlbumUpdateMsg(iqiyiAlbum2MQMsg.getHandler());
                        } catch (Exception e) {
                            logger.error("爱奇艺专辑添加到视频出错:" + e.getMessage());
                            logger.error("ID:" + iqiyiAlbum2MQMsg.getId());
                        }
                    }
            }
        });
    }
    private void doAddFunTV2Video() {
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                List<FunTVAlbum2MQMsg> funTVAlbum2MsgList = CMQManager.getInstance().consumeFunTVAlbumUpdateMsg(16);
                if (funTVAlbum2MsgList != null)
                    for (FunTVAlbum2MQMsg funTVAlbum2MQMsg : funTVAlbum2MsgList) {
                        try {
                            String mediaId = funTVAlbum2MQMsg.getId();
                            FunTVAlbum2 album2 = funTV2Service.getAlbumDetail(mediaId);
                            if (album2 != null) {
                                funTV2Service.processAlbum(album2);
                            }
                            CMQManager.getInstance().deleteFunTVAlbumUpdateMsg(funTVAlbum2MQMsg.getHandler());
                        } catch (Exception e) {
                            logger.error("风行专辑添加到视频出错:" + e.getMessage());
                            logger.error("ID:" + funTVAlbum2MQMsg.getId());
                        }
                    }
            }
        });
    }
    private void doAddPPTVVideo() {
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                List<PPTVMQMsg> pptvMsgList = CMQManager.getInstance().consumePPTVSeriesUpdateMsg(16);
                if (pptvMsgList != null)
                    for (PPTVMQMsg pptvmqMsg : pptvMsgList) {
                        try {
                            switch (pptvmqMsg.getType()) {
                                case PPTVMQMsg.TYPE_ADD_OR_UPDATE:
                                    PPTVSeries pptvSeries = pptvService.getSeriesDetail(pptvmqMsg.getInfoId());
                                    if (pptvSeries != null) {
                                        pptvService.addToVideoInfo(pptvSeries);
                                    }
                                    break;
                                case PPTVMQMsg.TYPE_DELETE:
                                    pptvService.offLineSeries(pptvmqMsg.getInfoId());
                                    break;
                            }
                            CMQManager.getInstance().deletePPTVSeriesUpdateMsg(pptvmqMsg.getHandler());
                        } catch (Exception e) {
                            e.printStackTrace();
                            logger.error("PPTV添加到视频出错:" + e.getMessage());
                            logger.error("infoId:" + pptvmqMsg.getInfoId());
                        }
                    }
            }
        });
    }
    private void doDeleteVideoResource() {
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                List<CMQResult> cmqMsgList = CMQManager.getInstance().consumeVideoResourceDeleteMsg(16);
                if (cmqMsgList != null)
                    for (CMQResult msg : cmqMsgList) {
                        try {
                            String videoId = msg.getData() + "";
                            //查询资源列表
                            List<ResourceVideo> resourceVideoList = resourceVideoService.getResourceList(videoId);
                            //隐藏视频
                            if (resourceVideoList == null || resourceVideoList.size() == 0)
                                videoInfoService.hiddenVideo(videoId);
                            //更新搜索引擎
                            CMQManager.getInstance().addSolrMsg(videoId);
                            CMQManager.getInstance().deleteVideoResourceDeleteMsg(msg.getHandler());
                        } catch (Exception e) {
                            logger.error("视频资源删除处理出错:" + e.getMessage());
                            logger.error("ID:" + msg.getData());
                        }
                    }
            }
        });
    }
    private void doUpdateVideoExtraInfo() {
        new JobThreadExecutorServiceImpl().run(new Runnable() {
            @Override
            public void run() {
                List<CMQResult> cmqMsgList = CMQManager.getInstance().consumeUpdateVideoExtraInfoMsg(16);
                if (cmqMsgList != null)
                    for (CMQResult msg : cmqMsgList) {
                        try {
                            VideoExtraInfoChangeMQMsg videoExtraInfoChangeMQMsg = (VideoExtraInfoChangeMQMsg) msg.getData();
                            if (videoExtraInfoChangeMQMsg != null) {
                                switch (videoExtraInfoChangeMQMsg.getType()) {
                                    case VideoExtraInfoChangeMQMsg.TYPE_RESOURCE:
                                        if (VideoExtraInfoChangeMQMsg.ACTION_DELETE.equalsIgnoreCase(videoExtraInfoChangeMQMsg.getAction())) {//删除视频源
                                            CMQManager.getInstance().addVideoResourceDeleteMsg(videoExtraInfoChangeMQMsg.getVideoId());
                                        }
                                        break;
                                    case VideoExtraInfoChangeMQMsg.TYPE_CATEGORY:
                                        break;
                                }
                                videoInfoService.statisticVideoExtraInfo(videoExtraInfoChangeMQMsg.getVideoId());
                            }
                            CMQManager.getInstance().deleteUpdateVideoExtraInfoMsg(msg.getHandler());
                        } catch (Exception e) {
                            logger.error("视频资源删除处理出错:" + e.getMessage());
                            logger.error("ID:" + msg.getData());
                        }
                    }
            }
        });
    }
src/main/java/com/yeshi/buwan/util/mq/CMQManager.java
File was deleted
src/main/java/com/yeshi/buwan/util/mq/TDMQUtil.java
File was deleted
src/main/java/com/yeshi/buwan/util/mq/rabbit/DelayMsgInfo.java
New file
@@ -0,0 +1,65 @@
package com.yeshi.buwan.util.mq.rabbit;
/**
 * @author hxh
 * @title: DelayMsgInfo
 * @description: 延迟消息的数据
 * @date 2024/10/12 13:31
 */
public class DelayMsgInfo {
    private String queueName; // 队列名称
    private String exchangeName; // 交换机名称
    private String routingKey;
    private int delayMs; // 延迟的时间
    private int sendCount;// 发送次数
    private String msg;// 消息内容
    public String getRoutingKey() {
        return routingKey;
    }
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
    public String getQueueName() {
        return queueName;
    }
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    public String getExchangeName() {
        return exchangeName;
    }
    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }
    public int getDelayMs() {
        return delayMs;
    }
    public void setDelayMs(int delayMs) {
        this.delayMs = delayMs;
    }
    public int getSendCount() {
        return sendCount;
    }
    public void setSendCount(int sendCount) {
        this.sendCount = sendCount;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/QueueHelloWorldListener.java
New file
@@ -0,0 +1,23 @@
package com.yeshi.buwan.util.mq.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import java.io.UnsupportedEncodingException;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description: 测试消息
 * @date 2024/9/26 13:47
 */
public class QueueHelloWorldListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到消息:"+ new String(message.getBody(),"UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitDelayConsumeFailConsumer.java
New file
@@ -0,0 +1,38 @@
package com.yeshi.buwan.util.mq.rabbit;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitDelayConsumer
 * @description: 消费重试队列处理
 * @date 2024/10/12 11:21
 */
public class RabbitDelayConsumeFailConsumer {
    private Logger logger = LoggerFactory.getLogger("infoLog");
    @Resource
    private RabbitTemplate rabbitTemplate;
//    @RabbitListener(queues = RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.info("RabbitDelayConsumeFailConsumer-{}", msg);
        DelayMsgInfo msgInfo = new Gson().fromJson(msg, DelayMsgInfo.class);
        Map<String, String> headers = new HashMap<>();
        headers.put("retry_count", (msgInfo.getSendCount() + 1) + "");
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg(), headers);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitDelayConsumer.java
New file
@@ -0,0 +1,45 @@
package com.yeshi.buwan.util.mq.rabbit;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: RabbitDelayConsumer
 * @description: 延时队列消费处理
 * @date 2024/10/12 11:21
 */
//@Component
public class RabbitDelayConsumer  {
   private Logger logger = LoggerFactory.getLogger("infoLog");
    @Resource
    private RabbitTemplate rabbitTemplate;
//    @RabbitListener(queues = RabbitmqSenderUtil.DELAY_QUEUE_NAME, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg =  new String( message.getBody(), StandardCharsets.UTF_8);
        logger.info("RabbitDelayConsumer-{}",msg);
        DelayMsgInfo msgInfo =  new Gson().fromJson(msg, DelayMsgInfo.class);
        if(!StringUtil.isNullOrEmpty(msgInfo.getQueueName())){
          // 队列消息
            RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, msgInfo.getQueueName(), msgInfo.getMsg());
        }else{
            // 交换机消息
            RabbitmqSenderUtil.sendExchangeMsg(rabbitTemplate, msgInfo.getExchangeName(), msgInfo.getRoutingKey(), msgInfo.getMsg());
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqConfig.java
New file
@@ -0,0 +1,132 @@
package com.yeshi.buwan.util.mq.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitmqConfig
 * @description: TODO
 * @date 2024/10/12 11:10
 */
@Configuration
//@EnableRabbit
public class RabbitmqConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;
    private Queue createQueue(String queueName){
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new Queue(queueName,true, false, false, headers);
    }
    @Bean("pluginDelayExchange")
    public CustomExchange pluginDelayExchange() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
        //第二个参数必须为x-delayed-message
        return new CustomExchange(RabbitmqSenderUtil.DELAY_EXCHANGE_NAME, "x-delayed-message", false, false, argMap);
    }
    @Bean("pluginDelayQueue")
    public Queue pluginDelayQueue() {
        return createQueue(RabbitmqSenderUtil.DELAY_QUEUE_NAME);
    }
    @Bean("pluginDelayConsumeFailQueue")
    public Queue pluginDelayConsumeFailQueue() {
        return createQueue(RabbitmqSenderUtil.CONSUME_FAIL_QUEUE_NAME);
    }
    @Bean
    public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_COMMON_DELAY).noargs();
    }
    @Bean
    public Binding pluginDelayConsumeFailBinding(@Qualifier("pluginDelayConsumeFailQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(RabbitmqSenderUtil.ROUTING_KEY_CONSUME_FAIL_DELAY).noargs();
    }
    @Bean
    public Queue solrQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_SOLR);
    }
    @Bean
    public Queue updateVideoIqiyi2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_IQIYI_2);
    }
    @Bean
    public Queue updateVideoFuntv2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_FUNTV_2);
    }
    @Bean
    public Queue updateVideoPPTVQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_UPDATE_PPTV);
    }
    @Bean
    public Queue videoResourceDeleteQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_RESOURCE_DELETE);
    }
    @Bean("updateVideoExtraInfoQueue")
    public Queue updateVideoExtraInfoQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_VIDEO_EXTRAINFO);
    }
    @Bean
    public Queue updateInternetSearchQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_INTERNET_SEARCH);
    }
    @Bean
    public Queue updateResourceVideoQueue() {
        return createQueue(RabbitmqManager.QUEUENAME_UPDATE_RESOURCE_VIDEO);
    }
    @Bean("videoSyncDataV2Queue")
    public Queue videoSyncDataV2Queue() {
        return createQueue(RabbitmqManager.QUEUENAME_VIDEO_SYNCDATA_V2);
    }
    @Bean("videoExtraInfoExchange")
    public FanoutExchange videoExtraInfoExchange() {
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new FanoutExchange(RabbitmqManager.TOPIC_VIDEO_EXTRAINFO, true, false, headers);
    }
    @Bean
    public Binding videoExtraInfoExchangeBinding(@Qualifier("updateVideoExtraInfoQueue") Queue queue, @Qualifier("videoExtraInfoExchange") FanoutExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange);
    }
    @Bean("videoInfoChangeExchange")
    public FanoutExchange videoInfoChangeExchange() {
        Map<String, Object> headers=new HashMap<>();
        headers.put("x-message-ttl", 24*60*60*1000);
        return new FanoutExchange(RabbitmqManager.TOPIC_VIDEO_INFO_CHANGE, true, false, headers);
    }
    @Bean
    public Binding videoInfoChangeExchangeBinding(@Qualifier("videoSyncDataV2Queue") Queue queue, @Qualifier("videoInfoChangeExchange") FanoutExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange);
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqManager.java
New file
@@ -0,0 +1,110 @@
package com.yeshi.buwan.util.mq.rabbit;
import com.google.gson.Gson;
import com.yeshi.buwan.dto.mq.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * @author hxh
 * @title: RabbitmqManager
 * @description: 消息管理
 * @date 2024/10/14 10:19
 */
@Component
public class RabbitmqManager {
    // 搜索引擎
    public static String QUEUENAME_SOLR = "buwan-solr-new";
    //视频更新-爱奇艺2
    public static String QUEUENAME_VIDEO_UPDATE_IQIYI_2 = "buwan-video-update-iqiyi2";
    //视频更新-风行2
    public static String QUEUENAME_VIDEO_UPDATE_FUNTV_2 = "buwan-video-update-funtv2";
    //视频更新-PPTV
    public static String QUEUENAME_VIDEO_UPDATE_PPTV = "buwan-video-update-pptv";
    //删除视频资源
    public static String QUEUENAME_VIDEO_RESOURCE_DELETE = "buwan-video-resource-delete";
    //更新视频附加信息
    public static String QUEUENAME_UPDATE_VIDEO_EXTRAINFO = "buwan-video-video-update-extrainfo";
    //全网搜
    public static String QUEUENAME_UPDATE_INTERNET_SEARCH = "buwan-video-update-internet-search";
    public static String QUEUENAME_UPDATE_RESOURCE_VIDEO = "buwan-video-update-resource-video";
    //同步V2版本的视频信息
    public static String QUEUENAME_VIDEO_SYNCDATA_V2 = "buwan-video-syncdata-v2";
    public static String TOPIC_VIDEO_EXTRAINFO = "buwan_topic_video_extrainfo_change";
    //视频信息修改
    public static String TOPIC_VIDEO_INFO_CHANGE = "buwan_topic_video_info_change";
    @Resource
    private RabbitTemplate rabbitTemplate;
    //添加专辑更新消息
    public void addIqiyiAlbumUpdateMsg(Long id) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_VIDEO_UPDATE_IQIYI_2, id + "");
    }
    //添加专辑更新消息
    public void addPPTVSeriesUpdateMsg(PPTVMQMsg msg) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_VIDEO_UPDATE_PPTV, new Gson().toJson(msg));
    }
    public void addSolrMsg(String id) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_SOLR, id);
    }
    public void addVideoResourceDeleteMsg(String videoId) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_VIDEO_RESOURCE_DELETE, videoId);
    }
    /**
     * 视频附加信息改变
     */
    public void addVideoExtraInfoChanged(VideoExtraInfoChangeMQMsg msg) {
        RabbitmqSenderUtil.sendExchangeMsg(rabbitTemplate, TOPIC_VIDEO_EXTRAINFO, "*", new Gson().toJson(msg));
    }
    /**
     * 全网搜
     *
     * @param msg
     */
    public void addInternetSearchVideoUpdateMsg(InternetSearchVideoMQMsg msg) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_UPDATE_INTERNET_SEARCH, new Gson().toJson(msg));
    }
    /**
     * 视频来源更新
     *
     * @param msg
     */
    public void addUpdateResourceVideoMsg(UpdateResourceVideoMQMsg msg) {
        RabbitmqSenderUtil.sendQueueMsg(rabbitTemplate, QUEUENAME_UPDATE_RESOURCE_VIDEO, new Gson().toJson(msg));
    }
    //视频数据更改
    public void addVideoDataChanged(VideoDataChangeMQMsg msg) {
        RabbitmqSenderUtil.sendExchangeMsg(rabbitTemplate, TOPIC_VIDEO_INFO_CHANGE, "*", new Gson().toJson(msg));
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqMsgConsumeUtil.java
New file
@@ -0,0 +1,35 @@
package com.yeshi.buwan.util.mq.rabbit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.io.IOException;
/**
 * @author hxh
 * @title: RabbitmqMsgConsumeUtil
 * @description: 消息消费工具
 * @date 2024/10/12 14:43
 */
public class RabbitmqMsgConsumeUtil {
    public interface  IMessageProcess{
        public void onProcess() throws Exception;
    }
    public static void processMessage(Message message,  Channel channel, RabbitTemplate rabbitTemplate,IMessageProcess messageProcessor) throws IOException {
        try {
            messageProcessor.onProcess();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch(Exception e){
            if(RabbitmqSenderUtil.retryConsume(rabbitTemplate, message)){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }else{
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/RabbitmqSenderUtil.java
New file
@@ -0,0 +1,161 @@
package com.yeshi.buwan.util.mq.rabbit;
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
 * @author hxh
 * @title: RabbitmqUtil
 * @description: mq消息发送工具
 * @date 2024/10/12 13:04
 */
public class RabbitmqSenderUtil {
    // 延迟交换机
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
    // 延迟队列
    public static final String DELAY_QUEUE_NAME = "delay_queue";
    //常规延迟队列的routingKey
    public static final String ROUTING_KEY_COMMON_DELAY = "delay";
    // 消费失败的延迟消息
    public static final String ROUTING_KEY_CONSUME_FAIL_DELAY = "consume_fail_delay";
    // 消费失败处理队列
    public static final String CONSUME_FAIL_QUEUE_NAME = "consume_fail_queue";
    /**
     * @return void
     * @author hxh
     * @description 发送队列消息
     * @date 13:06 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: message
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String message) {
        rabbitTemplate.convertAndSend(queueName, message);
    }
    /**
     * @return void
     * @author hxh
     * @description 发送带头部的消息
     * @date 13:54 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: headers
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, Map<String, String> headers) {
        MessageProperties messageProperties = new MessageProperties();
        if (headers != null) {
            for (String key : headers.keySet()) {
                messageProperties.setHeader(key, headers.get(key));//延迟5秒被删除
            }
        }
        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(queueName, message);//交换机和路由键必须和配置文件类中保持一致
    }
    /**
     * @return void
     * @author hxh
     * @description 发送队列延迟消息
     * @date 13:07 2024/10/12
     * @param: rabbitTemplate
     * @param: queueName
     * @param: msg
     * @param: delayMS
     **/
    public static void sendQueueMsg(RabbitTemplate rabbitTemplate, String queueName, String msg, int delayMS) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMS);
        msgInfo.setQueueName(queueName);
        msgInfo.setMsg(msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMS);//延迟5秒被删除
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);//交换机和路由键必须和配置文件类中保持一致
    }
    /**
     * @return void
     * @author hxh
     * @description 发送交换机消息
     * @date 13:13 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: message
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
    /**
     * @return void
     * @author hxh
     * @description 发送延迟交换机消息
     * @date 13:39 2024/10/12
     * @param: rabbitTemplate
     * @param: exchangeName
     * @param: routingKey
     * @param: msg
     * @param: delayMs
     **/
    public static void sendExchangeMsg(RabbitTemplate rabbitTemplate, String exchangeName, String routingKey, String msg, int delayMs) {
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setDelayMs(delayMs);
        msgInfo.setExchangeName(exchangeName);
        msgInfo.setMsg(msg);
        msgInfo.setRoutingKey(routingKey);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayMs);
        Message message = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_COMMON_DELAY, message);
    }
    /**
     * @author hxh
     * @description 添加消息到重新消费队列
     * @date 14:10 2024/10/12
     * @param: rabbitTemplate
     * @param: message
     * @return boolean 加入重新消费队列
     **/
    public static boolean retryConsume(RabbitTemplate rabbitTemplate, Message message) {
        Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
        int retryCount = 0;
        if (messageHeaders.containsKey("retry_count")) {
            retryCount = Integer.parseInt(messageHeaders.get("retry_count") + "");
        }
        // 重试5次
        if (retryCount >= 5) {
            return false;
        }
        // 发送延迟消息
        DelayMsgInfo msgInfo = new DelayMsgInfo();
        msgInfo.setQueueName(message.getMessageProperties().getConsumerQueue());
        msgInfo.setMsg(new String(message.getBody(), StandardCharsets.UTF_8));
        msgInfo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
        msgInfo.setSendCount(retryCount);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", 5*60*1000);
//        messageProperties.setHeader("x-delay", 5*1000);// 测试为5s
        Message messageNew = new Message(new Gson().toJson(msgInfo).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, ROUTING_KEY_CONSUME_FAIL_DELAY, messageNew);
        return true;
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/SolrNewListener.java
New file
@@ -0,0 +1,74 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.domain.ResourceVideo;
import com.yeshi.buwan.domain.VideoInfo;
import com.yeshi.buwan.domain.VideoResource;
import com.yeshi.buwan.service.imp.ResourceVideoService;
import com.yeshi.buwan.service.imp.VideoInfoService;
import com.yeshi.buwan.service.manager.search.SolrAlbumVideoDataManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class SolrNewListener {
    private final static Logger logger = LoggerFactory.getLogger(SolrNewListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private ResourceVideoService resourceVideoService;
    @Resource
    private VideoInfoService videoInfoService;
    @Resource
    private SolrAlbumVideoDataManager solrDataManager;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                try {
                    VideoInfo videoInfo = videoInfoService.getVideoInfo(result);
                    if (videoInfo != null) {
                        if ("1".equalsIgnoreCase(videoInfo.getShow())) {
                            List<VideoResource> resourceList = new ArrayList<>();
                            List<ResourceVideo> rvList = resourceVideoService.getResourceList(videoInfo.getId());
                            if (rvList != null)
                                for (ResourceVideo rv : rvList)
                                    resourceList.add(rv.getResource());
                            videoInfo.setResourceList(resourceList);
                            solrDataManager.saveOrUpdate(videoInfo);
                        } else
                            solrDataManager.deleteById(videoInfo.getId());
                    } else {//视频已经删除
                        solrDataManager.deleteById(result);
                    }
                } catch (Exception e) {
                    logger.error("添加到搜索引擎出错", e);
                    throw e;
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/UpdateIntenetSearchListener.java
New file
@@ -0,0 +1,57 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.domain.video.InternetSearchVideo;
import com.yeshi.buwan.dto.mq.InternetSearchVideoMQMsg;
import com.yeshi.buwan.service.inter.juhe.InternetSearchVideoService;
import com.yeshi.buwan.service.manager.search.SolrInternetSearchVideoDataManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class UpdateIntenetSearchListener {
    private final static Logger logger = LoggerFactory.getLogger(UpdateIntenetSearchListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private InternetSearchVideoService internetSearchVideoService;
    @Resource
    private SolrInternetSearchVideoDataManager solrInternetSearchVideoDataManager;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if(!StringUtil.isNullOrEmpty(result)) {
                InternetSearchVideoMQMsg solrVideo = new Gson().fromJson(result, InternetSearchVideoMQMsg.class);
                try {
                    InternetSearchVideo video = internetSearchVideoService.selectByPrimaryKey(solrVideo.getId());
                    if (video != null)
                        solrInternetSearchVideoDataManager.saveOrUpdate(video);
                    else
                        solrInternetSearchVideoDataManager.deleteById(video.getId());
                } catch (Exception e) {
                    logger.error("更新全网搜搜索引擎出错", e);
                    throw e;
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/UpdateResourceVideoListener.java
New file
@@ -0,0 +1,98 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.dao.juhe.iqiyi.VideoIqiyi2Dao;
import com.yeshi.buwan.dto.mq.UpdateResourceVideoMQMsg;
import com.yeshi.buwan.service.imp.ResourceVideoService;
import com.yeshi.buwan.service.inter.juhe.Iqiyi2Service;
import com.yeshi.buwan.util.RedisManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi2;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class UpdateResourceVideoListener {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private RedisManager redisManager;
    @Resource
    private VideoIqiyi2Dao videoIqiyi2Dao;
    @Resource
    private ResourceVideoService resourceVideoService;
    @Resource
    private Iqiyi2Service iqiyi2Service;
    @Resource
    private IqiyiUtil2 iqiyiUtil2;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                Gson gson = new Gson();
                UpdateResourceVideoMQMsg videoMQMsg = null;
                try {
                    videoMQMsg = gson.fromJson(result, UpdateResourceVideoMQMsg.class);
                } catch (Exception e) {
                    JSONObject jsonObject = JSONObject.fromObject(result);
                    jsonObject.remove("date");
                    videoMQMsg = gson.fromJson(jsonObject.toString(), UpdateResourceVideoMQMsg.class);
                }
                if (videoMQMsg != null) {
                    try {
                        String key = "resourcevideo-update-" + com.yeshi.buwan.util.StringUtil.Md5(videoMQMsg.getResourceId() + "#" + videoMQMsg.getId());
                        //可以更新
                        if (com.yeshi.buwan.util.StringUtil.isNullOrEmpty(redisManager.getCommonString(key))) {
                            //12小时内不再更新
                            redisManager.cacheCommonString(key, "1", 60 * 60 * 12);
                            switch (videoMQMsg.getResourceId()) {
                                case IqiyiUtil2
                                        .RESOURCE_ID:
                                    //redis查询是否更新过
                                    IqiyiAlbum2 iqiyiAlbum2 = IqiYiNewAPI.getAlbumOrVideoDetail(Long.parseLong(videoMQMsg.getId()));
                                    if (iqiyiAlbum2 == null) {
                                        VideoIqiyi2 videoIqiyi2 = videoIqiyi2Dao.selectByIqiyiId(Long.parseLong(videoMQMsg.getId()));
                                        if (videoIqiyi2 != null) {
                                            //下架
                                            resourceVideoService.delete(videoIqiyi2.getVideoId() + "", videoMQMsg.getResourceId() + "");
                                        }
                                        //删除爱奇艺资源
                                        iqiyi2Service.offlineIqiyiAlbum(Long.parseLong(videoMQMsg.getId()));
                                    } else {
                                        //保存
                                        iqiyiUtil2.syncByAid(iqiyiAlbum2.getId(), false);
                                    }
                                    break;
                            }
                        }
                    } catch (Exception e) {
                    } finally {
                    }
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoResourceDeleteListener.java
New file
@@ -0,0 +1,63 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.domain.ResourceVideo;
import com.yeshi.buwan.service.imp.ResourceVideoService;
import com.yeshi.buwan.service.imp.VideoInfoService;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoResourceDeleteListener {
    private final static Logger logger = LoggerFactory.getLogger(VideoResourceDeleteListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private ResourceVideoService resourceVideoService;
    @Resource
    private VideoInfoService videoInfoService;
    @Resource
    private RabbitmqManager rabbitmqManager;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                try {
                    String videoId = result;
                    //查询资源列表
                    List<ResourceVideo> resourceVideoList = resourceVideoService.getResourceList(videoId);
                    //隐藏视频
                    if (resourceVideoList == null || resourceVideoList.size() == 0)
                        videoInfoService.hiddenVideo(videoId);
                    //更新搜索引擎
                    rabbitmqManager.addSolrMsg(videoId);
                } catch (Exception e) {
                    logger.error("视频资源删除处理出错:" + e.getMessage());
                    logger.error("ID:" + result);
                    throw e;
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoSyncDataV2Listener.java
New file
@@ -0,0 +1,34 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoSyncDataV2Listener {
    @Resource
    private RabbitTemplate rabbitTemplate;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            //TODO 处理消息
            if(!StringUtil.isNullOrEmpty(result)) {
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateExtrainfoListener.java
New file
@@ -0,0 +1,65 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.dto.mq.VideoExtraInfoChangeMQMsg;
import com.yeshi.buwan.service.imp.VideoInfoService;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoUpdateExtrainfoListener {
    private final static Logger logger = LoggerFactory.getLogger(VideoUpdateExtrainfoListener.class);
    @Resource
    private VideoInfoService videoInfoService;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private RabbitmqManager rabbitmqManager;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                try {
                    VideoExtraInfoChangeMQMsg videoExtraInfoChangeMQMsg = new Gson().fromJson(result, VideoExtraInfoChangeMQMsg.class);
                    if (videoExtraInfoChangeMQMsg != null) {
                        switch (videoExtraInfoChangeMQMsg.getType()) {
                            case VideoExtraInfoChangeMQMsg.TYPE_RESOURCE:
                                if (VideoExtraInfoChangeMQMsg.ACTION_DELETE.equalsIgnoreCase(videoExtraInfoChangeMQMsg.getAction())) {//删除视频源
                                    rabbitmqManager.addVideoResourceDeleteMsg(videoExtraInfoChangeMQMsg.getVideoId());
                                }
                                break;
                            case VideoExtraInfoChangeMQMsg.TYPE_CATEGORY:
                                break;
                        }
                        videoInfoService.statisticVideoExtraInfo(videoExtraInfoChangeMQMsg.getVideoId());
                    }
                } catch (Exception e) {
                    logger.error("视频资源删除处理出错:" + e.getMessage());
                    logger.error("ID:" + result);
                    throw e;
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateFuntv2Listener.java
New file
@@ -0,0 +1,51 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.service.inter.juhe.FunTV2Service;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import com.yeshi.buwan.videos.funtv.entity.FunTVAlbum2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.yeshi.utils.StringUtil;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoUpdateFuntv2Listener {
    private final static Logger logger = LoggerFactory.getLogger(VideoUpdateFuntv2Listener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private FunTV2Service funTV2Service;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                try {
                    String mediaId = result;
                    FunTVAlbum2 album2 = funTV2Service.getAlbumDetail(mediaId);
                    if (album2 != null) {
                        funTV2Service.processAlbum(album2);
                    }
                } catch (Exception e) {
                    logger.error("风行专辑添加到视频出错:" + e.getMessage());
                    logger.error("ID:" + result);
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdateIqiyi2Listener.java
New file
@@ -0,0 +1,53 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.service.inter.juhe.Iqiyi2Service;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoUpdateIqiyi2Listener {
    private final static Logger logger = LoggerFactory.getLogger(VideoUpdateIqiyi2Listener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private Iqiyi2Service iqiyi2Service;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            if (!StringUtil.isNullOrEmpty(result)) {
                Long qikuID = Long.parseLong(result);
                try {
                    IqiyiAlbum2 album2 = iqiyi2Service.selectAlbumById(qikuID);
                    logger.info("爱奇艺专辑:" + album2.getName());
                    if (album2 != null) {
                        iqiyi2Service.addToVideoInfo(album2);
                    }
                } catch (Exception e) {
                    logger.error("爱奇艺专辑添加到视频出错:" + e.getMessage());
                    logger.error("ID:" + qikuID);
                    throw e;
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/util/mq/rabbit/consumer/VideoUpdatePPTVListener.java
New file
@@ -0,0 +1,63 @@
package com.yeshi.buwan.util.mq.rabbit.consumer;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.yeshi.buwan.dto.mq.PPTVMQMsg;
import com.yeshi.buwan.service.inter.juhe.PPTVService;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqMsgConsumeUtil;
import com.yeshi.buwan.videos.pptv.entity.PPTVSeries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
 * @author hxh
 * @title: QueueHelloWorldListener
 * @description:
 * @date 2024/9/26 13:47
 */
@Component
public class VideoUpdatePPTVListener {
    private final static Logger logger = LoggerFactory.getLogger(VideoUpdatePPTVListener.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private PPTVService pptvService;
    public void onMessage(Message message, Channel channel) throws Exception {
        RabbitmqMsgConsumeUtil.processMessage(message, channel, rabbitTemplate, () -> {
            String result = new String(message.getBody(), StandardCharsets.UTF_8);
            //TODO 处理消息
            if(!StringUtil.isNullOrEmpty(result)) {
                PPTVMQMsg pptvmqMsg = new Gson().fromJson(result, PPTVMQMsg.class);
                try {
                    switch (pptvmqMsg.getType()) {
                        case PPTVMQMsg.TYPE_ADD_OR_UPDATE:
                            PPTVSeries pptvSeries = pptvService.getSeriesDetail(pptvmqMsg.getInfoId());
                            if (pptvSeries != null) {
                                pptvService.addToVideoInfo(pptvSeries);
                            }
                            break;
                        case PPTVMQMsg.TYPE_DELETE:
                            pptvService.offLineSeries(pptvmqMsg.getInfoId());
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("PPTV添加到视频出错:" + e.getMessage());
                    logger.error("infoId:" + pptvmqMsg.getInfoId());
                }
            }
        });
    }
}
src/main/java/com/yeshi/buwan/videos/iqiyi/util/IqiyiUtil2.java
@@ -3,14 +3,14 @@
import com.yeshi.buwan.domain.AdminInfo;
import com.yeshi.buwan.domain.VideoDetailInfo;
import com.yeshi.buwan.domain.VideoType;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.vo.IqiyiAlbumListResult;
import com.yeshi.buwan.service.inter.juhe.Iqiyi2Service;
import com.yeshi.buwan.util.TimeUtil;
import com.yeshi.buwan.util.log.VideoLogFactory;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.util.video.VideoConstant;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.vo.IqiyiAlbumListResult;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
@@ -36,6 +36,9 @@
    public final static String RESOURCE_NAME = "爱奇艺.";
    @Resource
    private Iqiyi2Service iqiyi2Service;
    @Resource
    private RabbitmqManager rabbitmqManager;
    private void saveAlbumAndVideo(List<IqiyiAlbum2> list) {
        if (list == null)
@@ -94,7 +97,7 @@
                }
            }
            iqiyi2Service.saveIqiyiAlbum(album);
            CMQManager.getInstance().addIqiyiAlbumUpdateMsg(album.getId());
            rabbitmqManager.addIqiyiAlbumUpdateMsg(album.getId());
        }
    }
@@ -143,7 +146,7 @@
                iqiyi2Service.deleteByAid(album.getId());
            }else {
                iqiyi2Service.saveIqiyiAlbum(album);
                CMQManager.getInstance().addIqiyiAlbumUpdateMsg(album.getId());
                rabbitmqManager.addIqiyiAlbumUpdateMsg(album.getId());
            }
        }
    }
src/main/resources/consumer.xml
@@ -9,9 +9,9 @@
        <dubbo:parameter key="qos.port" value="33333"></dubbo:parameter>
    </dubbo:application>
    <dubbo:registry address="zookeeper://172.16.16.46:2181"/>
    <!--<dubbo:registry address="zookeeper://172.16.16.46:2181"/>-->
    <!--<dubbo:registry address="zookeeper://193.112.35.168:2182"/>-->
    <dubbo:registry address="zookeeper://193.112.35.168:2182"/>
    <dubbo:annotation package="com"/>
src/main/resources/env-dev/logback.xml
@@ -241,29 +241,9 @@
    </logger>
    <appender name="KAFKA_SEARCH_KEY" class="com.yeshi.buwan.log.KafkaAppender">
        <!-- encoder必须配置, 日志格式 -->
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--<pattern>-->
            <!--%red(%d{yyyy-MM-dd HH:mm:ss.SSS}) %highlight(%-5level) %green([%thread]) %boldMagenta(%logger{10}) - %cyan(%msg%n)-->
            <!--</pattern>-->
            <!--为了便于分析将日志数据转为json格式-->
            <pattern>${log.pattern}</pattern>
            <!-- 控制台也要使用UTF-8,不要使用GBK,否则会中文乱码 -->
            <charset>UTF-8</charset>
        </encoder>
        <bootstrapServers>${log.config.kafka.bootstrapServers}</bootstrapServers>
        <topic>${log.config.kafka.topic}</topic>
        <batchSize>${log.config.kafka.batchSize}</batchSize>
        <lingerMs>${log.config.kafka.lingerMs}</lingerMs>
        <compressionType>${log.config.kafka.compressionType}</compressionType>
        <retries>${log.config.kafka.retries}</retries>
        <maxRequestSize>${log.config.kafka.maxRequestSize}</maxRequestSize>
        <isSend>${log.config.kafka.isSend}</isSend>
    </appender>
    <!-- 关键词搜索 -->
    <logger name="searchKey" level="INFO" additivity="false">
        <appender-ref ref="KAFKA_SEARCH_KEY"></appender-ref>
        <!--<appender-ref ref="KAFKA_SEARCH_KEY"></appender-ref>-->
    </logger>
src/main/resources/env-dev/rabbitmq.properties
@@ -2,4 +2,4 @@
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=yeshi2014
rabbitmq.virtual-host=myRabbitmq
rabbitmq.virtual-host=/buwan
src/main/resources/env-pro/rabbitmq.properties
@@ -2,4 +2,4 @@
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=yeshi2014
rabbitmq.virtual-host=myRabbitmq
rabbitmq.virtual-host=/buwan
src/main/resources/spring-rabbitmq-consumer.xml
@@ -1,27 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <context:annotation-config />
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
                               virtual-host="${rabbitmq.virtual-host}" />
    <!--基础部分-->
    <bean id="queueHelloWorldListener" class="com.yeshi.buwan.util.mq.consumer.QueueHelloWorldListener"/>
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="queueHelloWorldListener" queue-names="queue-hello-world"/>
    <context:component-scan base-package="com.yeshi.buwan.util.mq.rabbit" />
    <bean id="rabbitDelayConsumeFailConsumer" class="com.yeshi.buwan.util.mq.rabbit.RabbitDelayConsumeFailConsumer" />
    <bean id="rabbitDelayConsumer" class="com.yeshi.buwan.util.mq.rabbit.RabbitDelayConsumer" />
    <bean id="queueHelloWorldListener" class="com.yeshi.buwan.util.mq.rabbit.QueueHelloWorldListener" />
    <bean id="solrNewListener0" class="com.yeshi.buwan.util.mq.rabbit.consumer.SolrNewListener" />
    <bean id="solrNewListener1" class="com.yeshi.buwan.util.mq.rabbit.consumer.SolrNewListener" />
    <bean id="solrNewListener2" class="com.yeshi.buwan.util.mq.rabbit.consumer.SolrNewListener" />
    <bean id="updateIntenetSearchListener" class="com.yeshi.buwan.util.mq.rabbit.consumer.UpdateIntenetSearchListener" />
    <bean id="updateResourceVideoListener" class="com.yeshi.buwan.util.mq.rabbit.consumer.UpdateResourceVideoListener" />
    <bean id="videoResourceDeleteListener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoResourceDeleteListener" />
    <bean id="videoSyncDataV2Listener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoSyncDataV2Listener" />
    <bean id="videoUpdateExtrainfoListener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoUpdateExtrainfoListener" />
    <bean id="videoUpdateFuntv2Listener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoUpdateFuntv2Listener" />
    <bean id="videoUpdateIqiyi2Listener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoUpdateIqiyi2Listener" />
    <bean id="videoUpdatePPTVListener" class="com.yeshi.buwan.util.mq.rabbit.consumer.VideoUpdatePPTVListener" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener ref="queueHelloWorldListener" queue-names="test" />
        <rabbit:listener ref="solrNewListener0" queue-names="buwan-solr-new" />
        <rabbit:listener ref="solrNewListener1" queue-names="buwan-solr-new" />
        <rabbit:listener ref="solrNewListener2" queue-names="buwan-solr-new" />
        <rabbit:listener ref="updateIntenetSearchListener" queue-names="buwan-video-update-internet-search" />
        <rabbit:listener ref="updateResourceVideoListener" queue-names="buwan-video-update-resource-video" />
        <rabbit:listener ref="videoResourceDeleteListener" queue-names="buwan-video-resource-delete" />
        <rabbit:listener ref="videoSyncDataV2Listener" queue-names="buwan-video-syncdata-v2" />
        <rabbit:listener ref="videoUpdateExtrainfoListener" queue-names="buwan-video-video-update-extrainfo" />
        <rabbit:listener ref="videoUpdateFuntv2Listener" queue-names="buwan-video-update-funtv2" />
        <rabbit:listener ref="videoUpdateIqiyi2Listener" queue-names="buwan-video-update-iqiyi2" />
        <rabbit:listener ref="videoUpdatePPTVListener" queue-names="buwan-video-update-pptv" />
        <rabbit:listener ref="videoUpdatePPTVListener" queue-names="buwan-video-update-pptv" />
        <rabbit:listener ref="rabbitDelayConsumeFailConsumer" queue-names="consume_fail_queue" />
        <rabbit:listener ref="rabbitDelayConsumer" queue-names="delay_queue" />
    </rabbit:listener-container>
</beans>
</beans>
src/main/resources/spring-rabbitmq-producer.xml
@@ -1,76 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
                               virtual-host="${rabbitmq.virtual-host}"  />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机. 默认交换机类型为direct,名字为:"",路由键为队列的名称-->
    <!--
        id:bean的名称
        name:queue的名称
        auto-declare:自动创建
        auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
        durable:是否持久化
    -->
    <!-- rabbitTemplate-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" />
    <!-- 简单模式 -->
    <!-- ignore-declaration-exceptions: 忽略队列已存在且声明时属性不一致的报错 -->
    <rabbit:queue id="queue-hello-world" name="queue-hello-world" auto-declare="true" durable="false" ignore-declaration-exceptions="true"/>
    <!-- Retry Template  -->
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="1000"/> <!-- 初始间隔 -->
                <property name="multiplier" value="3"/>      <!-- 乘数 -->
                <property name="maxInterval" value="30000"/>   <!-- 最大间隔 -->
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="5"></property>
            </bean>
        </property>
    </bean>
    <!--工作队列模式-->
    <rabbit:queue id="queue-work-queue" name="queue-work-queue" auto-declare="true" durable="false"/>
    <!-- 发布订阅模式 -->
    <!-- fanout: 广播, 所有绑定到交换机的队列都能收到消息 -->
    <rabbit:queue id="queue-fanout-1" name="queue-fanout-1" auto-declare="true" durable="false"/>
    <rabbit:queue id="queue-fanout-2" name="queue-fanout-2" auto-declare="true" durable="false"/>
    <rabbit:fanout-exchange id="exchange-fanout" name="exchange-fanout" auto-declare="true" durable="false">
    <rabbit:queue id="deadLetterQueue" name="deadLetterQueue" auto-declare="true">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="172800000" value-type="java.lang.Integer"/> <!-- 设置消息 TTL 为 2天 -->
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!-- 定义死信交换机和队列 -->
    <rabbit:fanout-exchange id="deadLetterExchange" name="dead.letter.exchange"  >
        <rabbit:bindings>
            <rabbit:binding queue="queue-fanout-1"/>
            <rabbit:binding queue="queue-fanout-2"/>
            <rabbit:binding queue="deadLetterQueue"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    <!-- 路由模式 -->
    <!-- direct: 定向, 绑定到交换机且指定的队列才能收到消息 -->
    <rabbit:queue id="queue-direct" name="queue-direct" auto-declare="true" durable="false"/>
    <rabbit:direct-exchange id="exchange-direct" name="exchange-direct">
        <rabbit:bindings>
            <!--key: 路由键-->
            <rabbit:binding queue="queue-direct" key="direct"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 主题模式 -->
    <!-- topic: 主题, 绑定到交换机且满足条件的队列才能收到消息, 路由键:*匹配一个单词,#匹配多个单词 -->
    <rabbit:queue id="queue-topic-1" name="queue-topic-1" auto-declare="true" durable="false"/>
    <rabbit:queue id="queue-topic-2" name="queue-topic-2" auto-declare="true" durable="false"/>
    <rabbit:queue id="queue-topic-3" name="queue-topic-3" auto-declare="true" durable="false"/>
    <rabbit:topic-exchange id="exchange-topic" name="exchange-topic" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="queue-topic-1" pattern="topic.*"/>
            <rabbit:binding queue="queue-topic-2" pattern="topic.#"/>
            <rabbit:binding queue="queue-topic-3" pattern="topic.three.*"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- rabbitTemplate-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
</beans>
src/main/resources/spring.xml
@@ -43,12 +43,17 @@
                <value>classpath:druid.properties</value>
                <value>classpath:solr.properties</value>
                <value>classpath:xxl-job-executor.properties</value>
                <value>classpath:rabbitmq.properties</value>
            </array>
        </property>
    </bean>
    <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
    <import resource="classpath:spring-rabbitmq-producer.xml"/>
    <import resource="classpath:spring-rabbitmq-consumer.xml"/>
    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
@@ -213,9 +218,12 @@
    </bean>
    <!-- Solr搜索引擎数据管理 -->
    <import resource="solr.xml"></import>
    <import resource="xxl-job.xml"></import>
    <import resource="consumer.xml"></import>
    <import resource="seata.xml"></import>
</beans>
src/test/java/com/hxh/spring/test/DES.java
@@ -9,7 +9,7 @@
public class DES {
    @org.junit.Test
    public void test1() {
        String st = "iDp+mknN2urZPWPAdmBTv1ME3YqW7QwsJH0zD9ebFH7kI5NtvSWSUl4+FvBTCeOb+Nk40C4Fn/d0u3Kmr8vDhx9+u8SNrBJtF9KAfFmBfP98oR1Z7YwBK4RijnPgnZcLnPIXcMTy9B/iTloTk3Meg2697fyeTOwzR9aXcTW7kTiwC/6Vu+dTe0ZGlQpzgRM4e0JEfq4bFMFDe9TV+exA6ikwrh10uow/YHVNonEfqt32nQB55NOHgBLFxEHcdgdJB9hfynN/Ixx3yoGoTiQwteerArZHj5FUZW+d2naxn2bxny+TI+WuBjtSkfsY+wMdSk4ewaukUejM5TrLta6hzwW0vzNpzaKFp/GZ+A2MpIcggJcZhgrx753WPb2FudcoHSBeYGL9YfEze5EKt6dB2Bbe4OLmw/HaQDLFlyjR2VIcOFTQ6JWVbQ==";
        String st = "iDp+mknN2urZPWPAdmBTv1ME3YqW7QwsO+mtFhn9CSuJsa0ddEmM7KSxz6j88MudfmuIYIKtu1CyGwDF9IOAlFfaOMQXMkfbYXo7dcgpfSFTm73++BarsfJ3BjpVXOFHCHMHBsrWnZYAwfUx+D5qg9sn1AJAaHgG7EmjOxhr8IS9ICzBR8P20HaTPq+np4XTybxUt1NatSdJwuxixnRRP7uLCA0RrSlDGbxDQFB0+jk2suc3p8OKUwCKQg7NxmZ8baDGIZCC4PjCTsLMYJCxkTzsDUWmugEMA3/1qBBIOHIxBafxHVcaAnbGo17KFj50RwwQGwUR8bLqTPpUDOIf8uTwvBDmFi69";
        System.out.println(DESUtil.decode(st));
    }
src/test/java/com/hxh/spring/test/Iqiyi2.java
@@ -8,11 +8,6 @@
import com.yeshi.buwan.domain.VideoInfo;
import com.yeshi.buwan.domain.VideoResource;
import com.yeshi.buwan.exception.video.IqiyiVideoSolrException;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import com.yeshi.buwan.videos.iqiyi.vo.IqiyiAlbumListResult;
import com.yeshi.buwan.job.video.Iqiyi2VideoUpdate;
import com.yeshi.buwan.query.Iqiyi2AlbumQuery;
import com.yeshi.buwan.service.imp.CategoryVideoService;
@@ -23,7 +18,12 @@
import com.yeshi.buwan.util.HtmlToolUtil;
import com.yeshi.buwan.util.StringUtil;
import com.yeshi.buwan.util.TimeUtil;
import com.yeshi.buwan.util.mq.CMQManager;
import com.yeshi.buwan.util.mq.rabbit.RabbitmqManager;
import com.yeshi.buwan.videos.iqiyi.IqiYiNewAPI;
import com.yeshi.buwan.videos.iqiyi.entity.IqiyiAlbum2;
import com.yeshi.buwan.videos.iqiyi.entity.VideoIqiyi;
import com.yeshi.buwan.videos.iqiyi.util.IqiyiUtil2;
import com.yeshi.buwan.videos.iqiyi.vo.IqiyiAlbumListResult;
import net.sf.json.JSONArray;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -221,6 +221,9 @@
        }
    }
    @Resource
    private RabbitmqManager rabbitmqManager;
    @Test
    public void addToMQ() {
@@ -237,7 +240,7 @@
            for (IqiyiAlbum2 album2 : album2List) {
                System.out.println(album2.getName());
                if (IqiyiUtil2.albumIsValid(album2)) {
                    CMQManager.getInstance().addIqiyiAlbumUpdateMsg(album2.getId());
                    rabbitmqManager.addIqiyiAlbumUpdateMsg(album2.getId());
                }
            }
        }
src/test/java/com/hxh/spring/test/mq/MQTest.java
@@ -1,6 +1,5 @@
package com.hxh.spring.test.mq;
import com.yeshi.buwan.util.mq.CMQManager;
import org.junit.Test;
/**
@@ -14,7 +13,6 @@
    @Test
    public void init(){
        CMQManager.getInstance();
    }