admin
2020-07-01 c54fb6a88876be994906d57d2d18e844686964d0
rcoketmq集中管理
24个文件已修改
205 ■■■■ 已修改文件
fanli/src/main/java/com/yeshi/fanli/aspect/ActiveUserAspect.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/job/MQJob.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/money/UserMoneyServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/CommonOrderServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/tb/TaoBaoWeiQuanOrderServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserActiveLogServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserCustomSettingsServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoExtraServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/cloud/UserCloudGoodsServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/cloud/UserCloudServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/UserInviteValidNumServiceImpl.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/tb/UserExtraTaoBaoInfoServiceImpl.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/vip/UserVIPInfoServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/vip/UserVIPPreInfoServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/manger/msg/RocketMQManager.java 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/aspect/ActiveUserAspect.java
@@ -58,9 +58,6 @@
    @Resource
    private IntegralGetService integralGetService;
    
    @Resource(name = "producer")
    private Producer producer;
    private ExpressionParser parser = new SpelExpressionParser();
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java
@@ -8,7 +8,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -24,14 +23,12 @@
import org.yeshi.utils.wx.WXUtil;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.yeshi.fanli.dto.ConfigParamsDTO;
import com.yeshi.fanli.dto.mq.order.OrderTopicTagEnum;
import com.yeshi.fanli.dto.mq.order.body.BanLiShopOrderMQMsg;
import com.yeshi.fanli.entity.bus.activity.ActivityRuleUser;
import com.yeshi.fanli.entity.bus.activity.ActivityUser;
import com.yeshi.fanli.entity.bus.msg.MsgDeviceReadState;
import com.yeshi.fanli.entity.bus.user.cloud.UserCloud;
import com.yeshi.fanli.entity.dynamic.ImgInfo;
import com.yeshi.fanli.entity.dynamic.ImgInfo.ImgEnum;
import com.yeshi.fanli.entity.dynamic.SimpleGoods;
@@ -50,6 +47,7 @@
import com.yeshi.fanli.service.inter.push.PushService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.service.inter.user.cloud.UserCloudService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.factory.goods.GoodsDetailVOFactory;
@@ -82,8 +80,8 @@
    @Resource
    private MsgDeviceReadStateService msgDeviceReadStateService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private BanLiShopOrderService banLiShopOrderService;
@@ -233,7 +231,7 @@
                                new BigDecimal(map.get("total_fee")).multiply(new BigDecimal("0.01")));
                        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid, msg);
                        producer.send(message);
                        rocketMQManager.sendNormalMsg(message, null,null);
                        Map<String, String> returnMap = new HashMap<>();
                        returnMap.put("return_code", "SUCCESS");
                        returnMap.put("return_msg", "OK");
fanli/src/main/java/com/yeshi/fanli/job/MQJob.java
@@ -9,11 +9,11 @@
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.yeshi.fanli.entity.mq.MQUnSendInfo;
import com.yeshi.fanli.log.LogHelper;
import com.yeshi.fanli.service.inter.mq.MQUnSendInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
@@ -28,8 +28,10 @@
    @Resource
    private MQUnSendInfoService mqUnSendInfoService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    public MQJob() {
    }
@@ -50,7 +52,7 @@
                    msg.setKey(sendInfo.getKey());
                if (sendInfo.getDeliverTime() != null)
                    msg.setStartDeliverTime(sendInfo.getDeliverTime().getTime());
                SendResult sendResult = producer.send(msg);
                SendResult sendResult =rocketMQManager.sendNormalMsg(msg, null,null);
                if (sendResult != null) {
                    mqUnSendInfoService.deleteByPrimaryKey(sendInfo.getId());
                    LogHelper.mqInfo("消息重发成功",sendResult.getMessageId(), sendInfo.getTopic(), sendInfo.getTag(), sendInfo.getBody());
fanli/src/main/java/com/yeshi/fanli/service/impl/money/UserMoneyServiceImpl.java
@@ -20,6 +20,7 @@
import com.yeshi.fanli.entity.money.UserMoneyDetail;
import com.yeshi.fanli.log.LogHelper;
import com.yeshi.fanli.service.inter.money.UserMoneyService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.mq.cmq.UserMoneyChangeCMQManager;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -37,8 +38,8 @@
    @Resource
    private UserMoneyDetailMapper userMoneyDetailMapper;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public BigDecimal getMoneyToday(Long uid) {
@@ -97,7 +98,7 @@
        if (!Constant.IS_TEST) {
            UserMoneyChangeMQMsg msg = new UserMoneyChangeMQMsg(uid, money, new Date());
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userMoneyAdd, msg);
            producer.send(message);
            rocketMQManager.sendNormalMsg(message, null);
        }
    }
fanli/src/main/java/com/yeshi/fanli/service/impl/order/CommonOrderServiceImpl.java
@@ -18,7 +18,6 @@
import org.yeshi.utils.taobao.TbImgUtil;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.yeshi.fanli.dao.mybatis.order.CommonOrderGoodsMapper;
import com.yeshi.fanli.dao.mybatis.order.CommonOrderMapper;
import com.yeshi.fanli.dao.mybatis.order.CommonOrderTradeIdMapMapper;
@@ -74,6 +73,7 @@
import com.yeshi.fanli.service.inter.user.UserSystemCouponService;
import com.yeshi.fanli.service.inter.user.invite.UserInviteService;
import com.yeshi.fanli.service.inter.user.vip.UserVIPInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.service.manger.user.UserLevelManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.MoneyBigDecimalUtil;
@@ -160,8 +160,8 @@
    @Resource
    private TaoBaoOrderGoodsMapper taoBaoOrderGoodsMapper;
    @Resource(name = "producer")
    private Producer orderProducer;
    @Resource
    private RocketMQManager rocketMQManager;
    // 奖励订单图片
    public final static String PIC_REWARD = "http://img.flqapp.com/resource/order/order_state_reward.png";
@@ -1097,7 +1097,7 @@
                            settlement, firstOrder.getUserInfo().getId(), placeDate, new Date());
                    Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderConfirm,
                            mqMsg);
                    orderProducer.send(msg);
                    rocketMQManager.sendNormalMsg(msg, null);
                }
            }
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java
@@ -86,6 +86,7 @@
import com.yeshi.fanli.service.inter.taobao.TaoBaoUnionConfigService;
import com.yeshi.fanli.service.inter.user.UserSystemCouponService;
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.service.manger.order.HongBaoV2AddManager;
import com.yeshi.fanli.util.CMQManager;
import com.yeshi.fanli.util.Constant;
@@ -169,8 +170,8 @@
    @Resource(name = "orderTransactionProducer")
    private TransactionProducer orderTransactionProducer;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private OrderMoneySettleService orderMoneySettleService;
@@ -1699,7 +1700,7 @@
                OrderMQMsg mqMsg = new OrderMQMsg(orderId, sourceType, uid, OrderMQMsg.HANDLE_TYPE_ADD,
                        isCommonOrderValid(coList), 0, new Date(), result.isMiandan(),coList.get(0).getThirdCreateTime());
                Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderStatistic, mqMsg);
                producer.send(msg);
                rocketMQManager.sendNormalMsg(msg, null);
            }
    }
fanli/src/main/java/com/yeshi/fanli/service/impl/order/tb/TaoBaoWeiQuanOrderServiceImpl.java
@@ -24,6 +24,7 @@
import com.yeshi.fanli.service.inter.order.msg.MsgOrderDetailService;
import com.yeshi.fanli.service.inter.order.msg.UserOrderMsgNotificationService;
import com.yeshi.fanli.service.inter.order.tb.TaoBaoWeiQuanOrderService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@@ -45,8 +46,8 @@
    @Resource
    private UserOrderWeiQuanRecordService userOrderWeiQuanRecordService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Transactional(rollbackFor = Exception.class)
    @Override
@@ -119,7 +120,7 @@
                        Constant.SOURCE_TYPE_TAOBAO);
                Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.taoBaoOrderWeiQuan,
                        msg);
                producer.send(message);
                rocketMQManager.sendNormalMsg(message, null);
            }
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java
@@ -37,6 +37,7 @@
import com.yeshi.fanli.service.inter.redpack.RedPackGiveRecordService;
import com.yeshi.fanli.service.inter.user.TokenRecordService;
import com.yeshi.fanli.service.inter.user.UserInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TokenUtil;
@@ -70,8 +71,8 @@
    @Resource
    private RedPackForbidService redPackForbidService;
    
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    
    @Resource
    private UserInfoService userInfoService;
@@ -191,9 +192,10 @@
            msg.setId(giveRecord.getId());
            msg.setUid(uid);
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.redPackGiftDrawback,    msg);
            message.setStartDeliverTime(endTime.getTime() + 1000 * 60); // 延迟一分钟
            try {
                producer.send(message);
                rocketMQManager.sendNormalMsg(message,1000 * 60L, null);
            } catch (Exception e) {
                LogHelper.errorDetailInfo(e);
                throw new RedPackGiveRecordException(1, "红包创建失败");
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java
@@ -41,6 +41,7 @@
import com.yeshi.fanli.service.inter.shop.BanLiShopGoodsSetService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderPayService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.ThreadUtil;
@@ -80,8 +81,8 @@
    @Resource
    private AdminUserService adminUserService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Transactional(rollbackFor = Exception.class)
    @Override
@@ -243,8 +244,7 @@
                    BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                    Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                            OrderTopicTagEnum.banLiShopOrderRefund, msg);
                    message.setStartDeliverTime(System.currentTimeMillis() + 1000 * 60 * 5L);// 延时5分钟通知检测微信退款状态
                    SendResult result = producer.send(message);
                    SendResult result = rocketMQManager.sendNormalMsg(message, 1000 * 60 * 5L,null);
                    if (result == null) {
                        throw new BanLiShopOrderException(8, "消息发送失败");
                    }
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java
@@ -46,6 +46,7 @@
import com.yeshi.fanli.service.inter.shop.BanLiShopGoodsSetPayService;
import com.yeshi.fanli.service.inter.shop.BanLiShopGoodsSetService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -74,8 +75,8 @@
    @Resource
    private RedPackBalanceService redPackBalanceService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private MQUnSendInfoService mqUnSendInfoService;
@@ -180,7 +181,8 @@
        msg.setKey(orderId + "");
        long delayTime = System.currentTimeMillis() + (Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 10);// 10分钟后通知
        msg.setStartDeliverTime(delayTime);
        SendResult sendResult = producer.send(msg);
        SendResult sendResult =    rocketMQManager.sendNormalMsg(msg,  Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 10L, orderId + "");
        if (sendResult == null) {
            MQUnSendInfo info = new MQUnSendInfo();
            info.setBody(new String(msg.getBody()));
@@ -349,7 +351,7 @@
                        BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid, msg);
                        producer.send(message);
                        rocketMQManager.sendNormalMsg(message, null);
                        return;
                    }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserActiveLogServiceImpl.java
@@ -17,6 +17,7 @@
import com.yeshi.fanli.service.inter.user.UserActiveLogService;
import com.yeshi.fanli.service.inter.user.UserInfoExtraService;
import com.yeshi.fanli.service.inter.user.UserInfoRegisterService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@@ -33,8 +34,8 @@
    @Resource
    private UserInfoRegisterService userInfoRegisterService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public void addUserActiveLog(UserActiveLog userActiveLog) {
@@ -91,7 +92,7 @@
        UserActiveMQMsg msg = new UserActiveMQMsg(uid, new Date());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userActve, msg);
        message.setStartDeliverTime(System.currentTimeMillis() + 1000 * 5L);// 5s后发送活跃消息
        producer.send(message);
        rocketMQManager.sendNormalMsg(message,  1000 * 5L, null);
    }
    @Override
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserCustomSettingsServiceImpl.java
@@ -18,6 +18,7 @@
import com.yeshi.fanli.entity.bus.user.UserInfo;
import com.yeshi.fanli.exception.user.UserCustomSettingsException;
import com.yeshi.fanli.service.inter.user.UserCustomSettingsService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@@ -29,8 +30,8 @@
    @Resource
    private UserCustomSettingsMapper userCustomSettingsMapper;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    
    @Override
    @Transactional
@@ -79,8 +80,8 @@
        
        if (mineTypeNum == UserSettingTypeEnum.noDisplayPhoneNum && !Constant.IS_TEST) {
            UserPhoneOpenMQMsg msg = new UserPhoneOpenMQMsg(uid, state == 1? false: true, new Date());
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userPhoneOpen, msg);
            producer.send(message);
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userPhoneOpen, msg);
            rocketMQManager.sendNormalMsg(message, null);
        }
    }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoExtraServiceImpl.java
@@ -55,6 +55,7 @@
import com.yeshi.fanli.service.inter.user.invite.ThreeSaleSerivce;
import com.yeshi.fanli.service.inter.user.msg.UserAccountMsgNotificationService;
import com.yeshi.fanli.service.inter.user.notify.UserActivedRecordService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.FilePathEnum;
import com.yeshi.fanli.util.InviteCodeFilterUtil;
@@ -107,8 +108,8 @@
    @Resource
    private UserAccountMsgNotificationService userAccountMsgNotificationService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public UserInfoExtraVO getRankInfo(Long uid) throws UserInfoExtraException, Exception {
@@ -591,7 +592,7 @@
        if (!Constant.IS_TEST) { // 发送激活成功消息
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.inviteCodeActive,
                    new InviteCodeActiveMQMsg(uid, new Date()));
            producer.send(message);
            rocketMQManager.sendNormalMsg(message, null);
        }
        
        return inviteCode;
@@ -838,7 +839,7 @@
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userInfoUpdate,
                    new UserInfoUpdateMQMsg(uid, UserInfoUpdateTypeEnum.inviteCode, inviteCodeVip,
                            new Date()));
            producer.send(message);
            rocketMQManager.sendNormalMsg(message, null);
        }
        
        // 消息
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java
@@ -29,6 +29,7 @@
import com.yeshi.fanli.service.inter.user.UserInfoService;
import com.yeshi.fanli.service.inter.user.integral.IntegralGetService;
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -55,8 +56,8 @@
    @Resource
    private UserExtraTaoBaoInfoService userExtraTaoBaoInfoService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Async()
    @Transactional
@@ -113,7 +114,7 @@
                    Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER,
                            UserTopicTagEnum.userAccountBinding, mqMsg);
                    if (!Constant.IS_TEST)
                        producer.send(message);
                        rocketMQManager.sendNormalMsg(message, null);
                } else {// 修改
                    UserInfoUpdateMQMsg mqMsg = null;
@@ -127,7 +128,7 @@
                    if (mqMsg != null) {
                        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER,
                                UserTopicTagEnum.userInfoUpdate, mqMsg);
                        producer.send(message);
                        rocketMQManager.sendNormalMsg(message, null);
                    }
                }
            }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java
@@ -74,6 +74,7 @@
import com.yeshi.fanli.service.inter.user.UserSystemCouponService;
import com.yeshi.fanli.service.inter.user.invite.ThreeSaleSerivce;
import com.yeshi.fanli.service.inter.user.invite.UserInviteService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.MoneyBigDecimalUtil;
import com.yeshi.fanli.util.RedisManager;
@@ -178,8 +179,8 @@
    @Resource
    private UserSystemCouponActivateService userSystemCouponActivateService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private UserInviteService userInviteService;
@@ -2009,9 +2010,8 @@
        msg.setCouponType(type.name());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.systemCouponDrawback, msg);
        // 延迟一分钟
        message.setStartDeliverTime(endTime.getTime() + 1000 * 60);
        try {
            producer.send(message);
            rocketMQManager.sendNormalMsg(message,1000 * 60L, null);
        } catch (Exception e) {
            throw new UserSystemCouponException(1, "创建赠送信息失败");
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/cloud/UserCloudGoodsServiceImpl.java
@@ -27,6 +27,7 @@
import com.yeshi.fanli.service.inter.goods.CommonGoodsService;
import com.yeshi.fanli.service.inter.user.cloud.UserCloudGoodsService;
import com.yeshi.fanli.service.inter.user.cloud.UserCloudManageService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.RedisManager;
import com.yeshi.fanli.util.cache.JDGoodsCacheUtil;
@@ -58,8 +59,8 @@
    @Resource
    private UserCloudManageService userCloudManageService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public void deleteByPrimaryKeyAndUid(Long id, Long uid) {
@@ -196,7 +197,7 @@
            UserCloudMQMsg msg = new UserCloudMQMsg(uid, cloudGoods.getId() + "", UserCloudMQMsg.TYPE_STORE);
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userCloud, msg);
            message.setStartDeliverTime(deliverTime);
            producer.send(message);
            rocketMQManager.sendNormalMsg(message, null);
            // 每5分钟查询一次
            deliverTime = deliverTime + 1000 * 60 * 5;
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/cloud/UserCloudServiceImpl.java
@@ -73,6 +73,7 @@
import com.yeshi.fanli.service.inter.user.cloud.UserCloudService;
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.service.manger.goods.ConvertLinkManager;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.ImageToBase64;
import com.yeshi.fanli.util.MoneyBigDecimalUtil;
@@ -161,8 +162,8 @@
    @Resource
    private UserCloudManageService userCloudManageService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private UserOtherMsgNotificationService userOtherMsgNotificationService;
@@ -1290,7 +1291,7 @@
                        UserCloudMQMsg msg = new UserCloudMQMsg(uid, evaluateId, UserCloudMQMsg.TYPE_EVALUATE);
                        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userCloud,
                                msg);
                        producer.send(message);
                        rocketMQManager.sendNormalMsg(message, null);
                    }
                } catch (Exception e) {
                    LogHelper.errorDetailInfo(e);
@@ -1324,7 +1325,7 @@
            UserCloudMQMsg msg = new UserCloudMQMsg(uid, UserCloudMQMsg.TYPE_PUSH);
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userCloud, msg);
            message.setStartDeliverTime(java.lang.System.currentTimeMillis() + 1000 * 60 * 10);
            producer.send(message);
            rocketMQManager.sendNormalMsg(message, null);
        }
        // 更新已提醒
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java
@@ -35,6 +35,7 @@
import com.yeshi.fanli.service.inter.user.integral.IntegralTaskClassService;
import com.yeshi.fanli.service.inter.user.integral.IntegralTaskRecordService;
import com.yeshi.fanli.service.inter.user.integral.IntegralTaskService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TimeUtil;
@@ -64,8 +65,8 @@
    @Resource
    private ConfigService configService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public Integer getTotalGoldCoin(long uid, Long cid, Integer dateType) {
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java
@@ -34,6 +34,7 @@
import com.yeshi.fanli.service.inter.user.invite.ThreeSaleExtraInfoSerivce;
import com.yeshi.fanli.service.inter.user.invite.ThreeSaleSerivce;
import com.yeshi.fanli.service.inter.user.invite.UserInviteMsgNotificationService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -84,8 +85,8 @@
    @Resource(name = "taskExecutor")
    private TaskExecutor executor;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    public UserInfo getBoss(final long uid) {
        return threeSaleMapper.selectBoss(uid);
@@ -441,7 +442,7 @@
                    UserInviteMQMsg.STATE_SUCCESS);
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.inviteSuccess, msg);
            message.setStartDeliverTime(java.lang.System.currentTimeMillis() + 5000);
            producer.send(message);
            rocketMQManager.sendNormalMsg(message,5000L, null);
        }
        executor.execute(new Runnable() {
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/UserInviteValidNumServiceImpl.java
@@ -25,6 +25,7 @@
import com.yeshi.fanli.service.inter.user.UserInfoService;
import com.yeshi.fanli.service.inter.user.invite.ThreeSaleSerivce;
import com.yeshi.fanli.service.inter.user.invite.UserInviteValidNumService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -50,8 +51,8 @@
    @Resource
    private CommonOrderService commonOrderService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Override
    public UserInviteValidNum selectByPrimaryKey(Long id) {
@@ -205,7 +206,7 @@
                BeComeValidUserMQMsg msg = new BeComeValidUserMQMsg(uid, new Date());
                Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.becomeValidUser,
                        msg);
                producer.send(message);
                rocketMQManager.sendNormalMsg(message, null);
            }
            // 是否上上级
fanli/src/main/java/com/yeshi/fanli/service/impl/user/tb/UserExtraTaoBaoInfoServiceImpl.java
@@ -22,6 +22,7 @@
import com.yeshi.fanli.service.inter.user.ForbiddenUserIdentifyCodeService;
import com.yeshi.fanli.service.inter.user.UserAccountBindingHistoryService;
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
@@ -39,8 +40,8 @@
    @Resource
    private UserAccountBindingHistoryService userAccountBindingHistoryService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Transactional(rollbackFor = Exception.class)
    @Override
@@ -202,7 +203,8 @@
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.taobaoBindSuccess,
                new TaoBaoBindSuccessMQMsg(uid, fromUid, taoBaoUid, new Date()));
        producer.send(message);
        rocketMQManager.sendNormalMsg(message, null);
    }
    /**
fanli/src/main/java/com/yeshi/fanli/service/impl/user/vip/UserVIPInfoServiceImpl.java
@@ -39,6 +39,7 @@
import com.yeshi.fanli.service.inter.user.vip.UserVIPInfoService;
import com.yeshi.fanli.service.inter.user.vip.UserVIPPreInfoService;
import com.yeshi.fanli.service.inter.user.vip.UserVipConfigService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TimeUtil;
@@ -80,8 +81,8 @@
    @Resource
    private UserInviteMsgNotificationService userInviteMsgNotificationService;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private ConfigService configService;
fanli/src/main/java/com/yeshi/fanli/service/impl/user/vip/UserVIPPreInfoServiceImpl.java
@@ -38,6 +38,7 @@
import com.yeshi.fanli.service.inter.user.vip.UserLevelUpgradedNotifyService;
import com.yeshi.fanli.service.inter.user.vip.UserVIPPreInfoService;
import com.yeshi.fanli.service.inter.user.vip.UserVipConfigService;
import com.yeshi.fanli.service.manger.msg.RocketMQManager;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TimeUtil;
@@ -88,8 +89,8 @@
    @Resource(name = "taskExecutor")
    private TaskExecutor executor;
    @Resource(name = "producer")
    private Producer producer;
    @Resource
    private RocketMQManager rocketMQManager;
    @Resource
    private TeamUserLevelStatisticService teamUserLevelStatisticService;
@@ -132,8 +133,7 @@
                    UserLevelUtil.getByLevel(info.getProcess()), new Date());
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.userLevelChanged, msg);
            //延时10s发送
            message.setStartDeliverTime(System.currentTimeMillis()+1000*10L);
            producer.send(message);
            rocketMQManager.sendNormalMsg(message,1000*10L, null);
        }
    }
fanli/src/main/java/com/yeshi/fanli/service/manger/msg/RocketMQManager.java
@@ -6,6 +6,7 @@
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
@@ -30,8 +31,8 @@
     * void 返回类型
     * @throws
     */
    public void sendNormalMsg(Message message, String key) {
        sendNormalMsg(message, null, key);
    public SendResult sendNormalMsg(Message message, String key) {
        return sendNormalMsg(message, null, key);
    }
    /**
@@ -43,14 +44,14 @@
     * void 返回类型
     * @throws
     */
    public void sendNormalMsg(Message message, Long delayTimeMS, String key) {
    public SendResult sendNormalMsg(Message message, Long delayTimeMS, String key) {
        if (message == null)
            return;
            return null;
        if (key != null)
            message.setKey(key);
        if (delayTimeMS != null)
            message.setStartDeliverTime(System.currentTimeMillis() + delayTimeMS);// 10s后发送活跃消息
        producer.send(message);
        return producer.send(message);
    }
    /**
@@ -65,7 +66,8 @@
     * void 返回类型
     * @throws
     */
    public void sendTransactionalMsg(Message message, Long delayTimeMS, String key,ITransactionalMQEvent mqEvent) throws Exception{
    public void sendTransactionalMsg(Message message, Long delayTimeMS, String key, ITransactionalMQEvent mqEvent)
            throws Exception {
        if (key != null)
            message.setKey(key);
        if (delayTimeMS != null)
@@ -74,13 +76,13 @@
        orderTransactionProducer.send(message, new LocalTransactionExecuter() {
            @Override
            public TransactionStatus execute(Message arg0, Object arg1) {
                if(mqEvent!=null)
                if (mqEvent != null)
                    mqEvent.excute();
                return TransactionStatus.CommitTransaction;
            }
        }, null);
    }
    /**
     * 发送事务消息
     * @Title: sendTransactionalMsg
@@ -92,12 +94,11 @@
     * void 返回类型
     * @throws
     */
    public void sendTransactionalMsg(Message message, String key,ITransactionalMQEvent mqEvent) throws Exception{
        sendTransactionalMsg(message,null,key,mqEvent);
    public void sendTransactionalMsg(Message message, String key, ITransactionalMQEvent mqEvent) throws Exception {
        sendTransactionalMsg(message, null, key, mqEvent);
    }
    interface ITransactionalMQEvent{
    interface ITransactionalMQEvent {
        public void excute();
    }
}