admin
2019-11-20 8b2191df2f7d94aa299bd43dcbe97c94e5a61bbd
RocketMQ的消息整改
11个文件已修改
81 ■■■■■ 已修改文件
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderMoneySettleServiceImpl.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/InviteOrderSubsidyMessageListener.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java
@@ -35,6 +35,7 @@
import com.yeshi.fanli.service.inter.push.PushService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.shop.BanLiShopOrderUtil;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -191,8 +192,8 @@
                    if (order != null) {// 支付成功消息推送
                        BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid(),
                                new BigDecimal(map.get("total_fee")).multiply(new BigDecimal("0.01")));
                        Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                                OrderTopicTagEnum.banLiShopOrderPaid.name(), new Gson().toJson(msg).getBytes());
                        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid, msg);
                        producer.send(message);
                        Map<String, String> returnMap = new HashMap<>();
                        returnMap.put("return_code", "SUCCESS");
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderMoneySettleServiceImpl.java
@@ -60,6 +60,7 @@
import com.yeshi.fanli.util.TimeUtil;
import com.yeshi.fanli.util.cmq.HongBaoRecieveCMQManager;
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.taobao.TaoKeOrderApiUtil;
@@ -155,8 +156,8 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(),
                new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual,
                mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -219,7 +220,7 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -278,7 +279,7 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -319,7 +320,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(),OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -364,7 +365,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -405,7 +406,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java
@@ -88,6 +88,7 @@
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.jd.JDApiUtil;
import com.yeshi.fanli.util.pinduoduo.PinDuoDuoApiUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTagConstant;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@@ -420,8 +421,8 @@
                mainUser.getId(), hongBaoOrder.getCommonOrder().getSourceType(),
                hongBaoOrder.getCommonOrder().getOrderNo(), null, new Date(),0);
        mqMsg.setOrderFirst(first);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(),
                new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual,
                mqMsg);
        String key = mainUser.getId() + "-" + UUID.randomUUID().toString();
        msg.setKey(key);
        try {
@@ -1220,8 +1221,8 @@
        if (resultCode == 1) {// 只发送新增消息
            OrderMQMsg mqMsg = new OrderMQMsg(orderId, sourceType, uid, OrderMQMsg.HANDLE_TYPE_ADD,
                    isCommonOrderValid(coList),0,new Date());
            Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderStatistic.name(),
                    new Gson().toJson(mqMsg).getBytes());
            Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderStatistic,
                    mqMsg);
            producer.send(msg);
        }
    }
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java
@@ -36,6 +36,7 @@
import com.yeshi.fanli.util.TokenUtil;
import com.yeshi.fanli.util.annotation.RequestSerializableByKeyService;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -175,8 +176,8 @@
        UserRedPackGiftMQMsg msg = new UserRedPackGiftMQMsg();
        msg.setId(giveRecord.getId());
        msg.setUid(uid);
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.redPackGiftDrawback.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.redPackGiftDrawback,
                msg);
        try {
            producer.send(message);
        } catch (Exception e) {
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java
@@ -40,6 +40,7 @@
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.charge.FuLuChargeApiUtil;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -210,8 +211,8 @@
                    order.setMoneyPaymentState(BanLiShopOrder.PAY_STATE_REFUNDING);
                    // 发送退款消息
                    BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                    Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                            OrderTopicTagEnum.banLiShopOrderRefund.name(), new Gson().toJson(msg).getBytes());
                    Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                            OrderTopicTagEnum.banLiShopOrderRefund, msg);
                    message.setStartDeliverTime(System.currentTimeMillis() + 1000 * 60 * 5L);// 延时24小时通知检测微信退款状态
                    SendResult result = producer.send(message);
                    if (result == null) {
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java
@@ -47,6 +47,7 @@
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.shop.BanLiShopOrderUtil;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -170,8 +171,8 @@
    private void sendPlaceOrderMsg(Long orderId, Long uid) {
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.banLiShopOrderDelay.name(),
                new Gson().toJson(new BanLiShopOrderMQMsg(orderId, uid)).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.banLiShopOrderDelay,
                new BanLiShopOrderMQMsg(orderId, uid));
        msg.setKey(orderId + "");
        long delayTime = System.currentTimeMillis() + (Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 10);// 10分钟后通知
        msg.setStartDeliverTime(delayTime);
@@ -341,8 +342,8 @@
                    if (isS)// 支付成功,重新发送支付成功消息
                    {
                        BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                        Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                                OrderTopicTagEnum.banLiShopOrderPaid.name(), new Gson().toJson(msg).getBytes());
                        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid,msg);
                        producer.send(message);
                        return;
                    }
@@ -373,8 +374,8 @@
        if (order.getState() != BanLiShopOrder.STATE_PAID)
            throw new BanLiShopOrderException(1, "订单未处于待审核状态");
        Message message = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.banLiShopOrderRefund.name(),
                new Gson().toJson(new BanLiShopOrderMQMsg(order.getId(), order.getUid())).getBytes());
        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.banLiShopOrderRefund,
            new BanLiShopOrderMQMsg(order.getId(), order.getUid()));
        try {
            orderTransactionProducer.send(message, new LocalTransactionExecuter() {
                @Override
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java
@@ -29,6 +29,7 @@
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -107,8 +108,8 @@
                        mqMsg = new UserAccountBindingMQMsg(uid, UserAccountBindingMQMsg.TYPE_ALIPAY, value,
                                new Date());
                    }
                    Message message = new Message(MQTopicName.TOPIC_USER.name(),
                            UserTopicTagEnum.userAccountBinding.name(), new Gson().toJson(mqMsg).getBytes());
                    Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_USER,
                            UserTopicTagEnum.userAccountBinding,mqMsg);
                    producer.send(message);
                }
            }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java
@@ -84,6 +84,7 @@
import com.yeshi.fanli.util.annotation.RequestSerializableByKeyService;
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.factory.msg.MsgOtherSystemGiveDTOFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.taobao.TaoBaoUtil;
import com.yeshi.fanli.vo.msg.ClientTextStyleVO;
@@ -896,8 +897,8 @@
            UserSystemCouponUseMQMsg mqMsg = new UserSystemCouponUseMQMsg(userSystemCoupon.getId(), order.getOrderId(),
                    sourceType, systemCoupon.getType().name());
            // 事务消息
            Message msg = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.useSystemCoupon.name(),
                    new Gson().toJson(mqMsg).getBytes());
            Message msg =MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.useSystemCoupon,
                    mqMsg);
            try {
                SendResult result = orderTransactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
@@ -2082,8 +2083,8 @@
        UserSystemCouponUseMQMsg msg = new UserSystemCouponUseMQMsg();
        msg.setUserSystemCouponId(id);
        msg.setCouponType(type.name());
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.systemCouponDrawback.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.systemCouponDrawback,
                msg);
        try {
            producer.send(message);
        } catch (Exception e) {
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java
@@ -42,6 +42,7 @@
import com.yeshi.fanli.service.inter.user.integral.IntegralTaskService;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TimeUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -369,8 +370,8 @@
        long count = integralTaskRecordMapper.getTotalGoldCoinByUid(record.getUid());
        if (count >= 200) {// 大于200发送消息
            IntegralTaskMQMsg msg = new IntegralTaskMQMsg(record.getUid(), count, new Date());
            Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.integralTaskFinish.name(),
                    new Gson().toJson(msg).getBytes());
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.integralTaskFinish,
                    msg);
            producer.send(message);
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java
@@ -37,6 +37,7 @@
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.VersionUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import net.sf.json.JSONArray;
@@ -618,8 +619,8 @@
            }
        });
        UserInviteMQMsg msg = new UserInviteMQMsg(bossId, workerId, null, new Date(), UserInviteMQMsg.STATE_SUCCESS);
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.inviteSuccess.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.inviteSuccess,
                msg);
        producer.send(message);
    }
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/InviteOrderSubsidyMessageListener.java
@@ -31,6 +31,7 @@
import com.yeshi.fanli.service.inter.order.OrderMoneySettleService;
import com.yeshi.fanli.service.inter.user.UserSystemCouponRecordService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
/**
@@ -150,8 +151,8 @@
     */
    private boolean sendOrderSubsidyUpdateLater(OrderMoneyRecievedMQMsg dto, int day) {
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiDelay.name(),
                new Gson().toJson(dto).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiDelay,
                dto);
        msg.setKey(dto.getSourceType() + "_" + dto.getOrderId());
        long delayTime = System.currentTimeMillis() + (Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 60 * 24L * day);
        msg.setStartDeliverTime(delayTime);