yujian
2019-11-20 a2f028be7690813b828e2aeecc0bd1a1f5d25600
Merge remote-tracking branch 'origin/div' into div
2个文件已添加
12个文件已修改
245 ■■■■ 已修改文件
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderMoneySettleServiceImpl.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/MQMsgBodyFactory.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/InviteOrderSubsidyMessageListener.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/OrderMoneyRecievedMessageListener.java 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/resource/rocket/consumer.xml 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/controller/CallBackController.java
@@ -35,6 +35,7 @@
import com.yeshi.fanli.service.inter.push.PushService;
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.shop.BanLiShopOrderUtil;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -191,8 +192,8 @@
                    if (order != null) {// 支付成功消息推送
                        BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid(),
                                new BigDecimal(map.get("total_fee")).multiply(new BigDecimal("0.01")));
                        Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                                OrderTopicTagEnum.banLiShopOrderPaid.name(), new Gson().toJson(msg).getBytes());
                        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid, msg);
                        producer.send(message);
                        Map<String, String> returnMap = new HashMap<>();
                        returnMap.put("return_code", "SUCCESS");
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderMoneySettleServiceImpl.java
@@ -60,6 +60,7 @@
import com.yeshi.fanli.util.TimeUtil;
import com.yeshi.fanli.util.cmq.HongBaoRecieveCMQManager;
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.taobao.TaoKeOrderApiUtil;
@@ -155,8 +156,8 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(),
                new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual,
                mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -219,7 +220,7 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -278,7 +279,7 @@
        // 邀请赚到账事务消息
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_INVITE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -319,7 +320,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(),OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -364,7 +365,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
@@ -405,7 +406,7 @@
        OrderMoneyRecievedMQMsg mqMsg = new OrderMoneyRecievedMQMsg(OrderMoneyRecievedMQMsg.TYPE_SHARE, uid,
                sourceType, null, null, new Date(),0);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(), new Gson().toJson(mqMsg).getBytes());
        Message msg =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual, mqMsg);
        String taskKey = getTaskKey(uid);
        msg.setKey(taskKey);
        // 添加事务消息
fanli/src/main/java/com/yeshi/fanli/service/impl/order/OrderProcessServiceImpl.java
@@ -88,6 +88,7 @@
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.jd.JDApiUtil;
import com.yeshi.fanli.util.pinduoduo.PinDuoDuoApiUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTagConstant;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@@ -420,8 +421,8 @@
                mainUser.getId(), hongBaoOrder.getCommonOrder().getSourceType(),
                hongBaoOrder.getCommonOrder().getOrderNo(), null, new Date(),0);
        mqMsg.setOrderFirst(first);
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiActual.name(),
                new Gson().toJson(mqMsg).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiActual,
                mqMsg);
        String key = mainUser.getId() + "-" + UUID.randomUUID().toString();
        msg.setKey(key);
        try {
@@ -1220,8 +1221,8 @@
        if (resultCode == 1) {// 只发送新增消息
            OrderMQMsg mqMsg = new OrderMQMsg(orderId, sourceType, uid, OrderMQMsg.HANDLE_TYPE_ADD,
                    isCommonOrderValid(coList),0,new Date());
            Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderStatistic.name(),
                    new Gson().toJson(mqMsg).getBytes());
            Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderStatistic,
                    mqMsg);
            producer.send(msg);
        }
    }
fanli/src/main/java/com/yeshi/fanli/service/impl/redpack/RedPackGiveRecordServiceImpl.java
@@ -36,6 +36,7 @@
import com.yeshi.fanli.util.TokenUtil;
import com.yeshi.fanli.util.annotation.RequestSerializableByKeyService;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -175,8 +176,8 @@
        UserRedPackGiftMQMsg msg = new UserRedPackGiftMQMsg();
        msg.setId(giveRecord.getId());
        msg.setUid(uid);
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.redPackGiftDrawback.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.redPackGiftDrawback,
                msg);
        try {
            producer.send(message);
        } catch (Exception e) {
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderPayServiceImpl.java
@@ -40,6 +40,7 @@
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.charge.FuLuChargeApiUtil;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -210,8 +211,8 @@
                    order.setMoneyPaymentState(BanLiShopOrder.PAY_STATE_REFUNDING);
                    // 发送退款消息
                    BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                    Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                            OrderTopicTagEnum.banLiShopOrderRefund.name(), new Gson().toJson(msg).getBytes());
                    Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                            OrderTopicTagEnum.banLiShopOrderRefund, msg);
                    message.setStartDeliverTime(System.currentTimeMillis() + 1000 * 60 * 5L);// 延时24小时通知检测微信退款状态
                    SendResult result = producer.send(message);
                    if (result == null) {
fanli/src/main/java/com/yeshi/fanli/service/impl/shop/BanLiShopOrderServiceImpl.java
@@ -47,6 +47,7 @@
import com.yeshi.fanli.service.inter.shop.BanLiShopOrderService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.factory.RedPackDetailFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.shop.BanLiShopOrderUtil;
import com.yeshi.fanli.util.wx.BanLiShopWXPayUtil;
@@ -170,8 +171,8 @@
    private void sendPlaceOrderMsg(Long orderId, Long uid) {
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.banLiShopOrderDelay.name(),
                new Gson().toJson(new BanLiShopOrderMQMsg(orderId, uid)).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.banLiShopOrderDelay,
                new BanLiShopOrderMQMsg(orderId, uid));
        msg.setKey(orderId + "");
        long delayTime = System.currentTimeMillis() + (Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 10);// 10分钟后通知
        msg.setStartDeliverTime(delayTime);
@@ -341,8 +342,8 @@
                    if (isS)// 支付成功,重新发送支付成功消息
                    {
                        BanLiShopOrderMQMsg msg = new BanLiShopOrderMQMsg(order.getId(), order.getUid());
                        Message message = new Message(MQTopicName.TOPIC_ORDER.name(),
                                OrderTopicTagEnum.banLiShopOrderPaid.name(), new Gson().toJson(msg).getBytes());
                        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                                OrderTopicTagEnum.banLiShopOrderPaid,msg);
                        producer.send(message);
                        return;
                    }
@@ -373,8 +374,8 @@
        if (order.getState() != BanLiShopOrder.STATE_PAID)
            throw new BanLiShopOrderException(1, "订单未处于待审核状态");
        Message message = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.banLiShopOrderRefund.name(),
                new Gson().toJson(new BanLiShopOrderMQMsg(order.getId(), order.getUid())).getBytes());
        Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.banLiShopOrderRefund,
            new BanLiShopOrderMQMsg(order.getId(), order.getUid()));
        try {
            orderTransactionProducer.send(message, new LocalTransactionExecuter() {
                @Override
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserInfoModifyRecordServiceImpl.java
@@ -29,6 +29,7 @@
import com.yeshi.fanli.service.inter.user.tb.UserExtraTaoBaoInfoService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -107,8 +108,8 @@
                        mqMsg = new UserAccountBindingMQMsg(uid, UserAccountBindingMQMsg.TYPE_ALIPAY, value,
                                new Date());
                    }
                    Message message = new Message(MQTopicName.TOPIC_USER.name(),
                            UserTopicTagEnum.userAccountBinding.name(), new Gson().toJson(mqMsg).getBytes());
                    Message message =MQMsgBodyFactory.create(MQTopicName.TOPIC_USER,
                            UserTopicTagEnum.userAccountBinding,mqMsg);
                    producer.send(message);
                }
            }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/UserSystemCouponServiceImpl.java
@@ -84,6 +84,7 @@
import com.yeshi.fanli.util.annotation.RequestSerializableByKeyService;
import com.yeshi.fanli.util.factory.UserMoneyDetailFactory;
import com.yeshi.fanli.util.factory.msg.MsgOtherSystemGiveDTOFactory;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import com.yeshi.fanli.util.taobao.TaoBaoUtil;
import com.yeshi.fanli.vo.msg.ClientTextStyleVO;
@@ -896,8 +897,8 @@
            UserSystemCouponUseMQMsg mqMsg = new UserSystemCouponUseMQMsg(userSystemCoupon.getId(), order.getOrderId(),
                    sourceType, systemCoupon.getType().name());
            // 事务消息
            Message msg = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.useSystemCoupon.name(),
                    new Gson().toJson(mqMsg).getBytes());
            Message msg =MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.useSystemCoupon,
                    mqMsg);
            try {
                SendResult result = orderTransactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
@@ -2082,8 +2083,8 @@
        UserSystemCouponUseMQMsg msg = new UserSystemCouponUseMQMsg();
        msg.setUserSystemCouponId(id);
        msg.setCouponType(type.name());
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.systemCouponDrawback.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.systemCouponDrawback,
                msg);
        try {
            producer.send(message);
        } catch (Exception e) {
fanli/src/main/java/com/yeshi/fanli/service/impl/user/integral/IntegralTaskRecordServiceImpl.java
@@ -42,6 +42,7 @@
import com.yeshi.fanli.service.inter.user.integral.IntegralTaskService;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.TimeUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
@Service
@@ -369,8 +370,8 @@
        long count = integralTaskRecordMapper.getTotalGoldCoinByUid(record.getUid());
        if (count >= 200) {// 大于200发送消息
            IntegralTaskMQMsg msg = new IntegralTaskMQMsg(record.getUid(), count, new Date());
            Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.integralTaskFinish.name(),
                    new Gson().toJson(msg).getBytes());
            Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.integralTaskFinish,
                    msg);
            producer.send(message);
        }
fanli/src/main/java/com/yeshi/fanli/service/impl/user/invite/ThreeSaleSerivceImpl.java
@@ -37,6 +37,7 @@
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.StringUtil;
import com.yeshi.fanli.util.VersionUtil;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
import net.sf.json.JSONArray;
@@ -618,8 +619,8 @@
            }
        });
        UserInviteMQMsg msg = new UserInviteMQMsg(bossId, workerId, null, new Date(), UserInviteMQMsg.STATE_SUCCESS);
        Message message = new Message(MQTopicName.TOPIC_USER.name(), UserTopicTagEnum.inviteSuccess.name(),
                new Gson().toJson(msg).getBytes());
        Message message = MQMsgBodyFactory.create(MQTopicName.TOPIC_USER, UserTopicTagEnum.inviteSuccess,
                msg);
        producer.send(message);
    }
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/MQMsgBodyFactory.java
New file
@@ -0,0 +1,18 @@
package com.yeshi.fanli.util.rocketmq;
import com.aliyun.openservices.ons.api.Message;
import com.google.gson.Gson;
import com.yeshi.fanli.dto.mq.order.OrderTopicTagEnum;
import com.yeshi.fanli.dto.mq.user.UserTopicTagEnum;
public class MQMsgBodyFactory {
    public static Message create(MQTopicName topic,OrderTopicTagEnum tag,Object body){
        return new Message(topic.name(), tag.name(), new Gson().toJson(body).getBytes());
    }
    public static Message create(MQTopicName topic,UserTopicTagEnum tag,Object body){
        return new Message(topic.name(), tag.name(), new Gson().toJson(body).getBytes());
    }
}
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/InviteOrderSubsidyMessageListener.java
@@ -31,6 +31,7 @@
import com.yeshi.fanli.service.inter.order.OrderMoneySettleService;
import com.yeshi.fanli.service.inter.user.UserSystemCouponRecordService;
import com.yeshi.fanli.util.Constant;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
/**
@@ -150,8 +151,8 @@
     */
    private boolean sendOrderSubsidyUpdateLater(OrderMoneyRecievedMQMsg dto, int day) {
        Message msg = new Message(MQTopicName.TOPIC_ORDER.name(), OrderTopicTagEnum.orderFanLiDelay.name(),
                new Gson().toJson(dto).getBytes());
        Message msg = MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER, OrderTopicTagEnum.orderFanLiDelay,
                dto);
        msg.setKey(dto.getSourceType() + "_" + dto.getOrderId());
        long delayTime = System.currentTimeMillis() + (Constant.IS_TEST ? 1000 * 30L : 1000 * 60 * 60 * 24L * day);
        msg.setStartDeliverTime(delayTime);
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/order/OrderMoneyRecievedMessageListener.java
New file
@@ -0,0 +1,115 @@
package com.yeshi.fanli.util.rocketmq.consumer.order;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.yeshi.fanli.dto.mq.order.OrderTopicTagEnum;
import com.yeshi.fanli.dto.mq.order.body.OrderMoneyRecievedMQMsg;
import com.yeshi.fanli.entity.bus.user.HongBaoV2;
import com.yeshi.fanli.entity.order.HongBaoOrder;
import com.yeshi.fanli.entity.order.HongBaoV2SettleTemp;
import com.yeshi.fanli.service.inter.hongbao.HongBaoV2SettleTempService;
import com.yeshi.fanli.service.inter.order.HongBaoOrderService;
import com.yeshi.fanli.service.inter.order.HongBaoV2Service;
import com.yeshi.fanli.util.rocketmq.MQMsgBodyFactory;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
/**
 * 订单到账消费
 *
 * @author Administrator
 *
 */
@Component
public class OrderMoneyRecievedMessageListener implements MessageListener {
    @Resource
    private HongBaoV2SettleTempService hongBaoV2SettleTempService;
    @Resource
    private HongBaoV2Service hongBaoV2Service;
    @Resource
    private HongBaoOrderService hongBaoOrderService;
    @Resource
    private Producer producer;
    private int getHongBaoType(int type) {
        switch (type) {
        case HongBaoV2.TYPE_YIJI:
        case HongBaoV2.TYPE_ERJI:
        case HongBaoV2.TYPE_SHARE_YIJI:
        case HongBaoV2.TYPE_SHARE_ERJI:
            return OrderMoneyRecievedMQMsg.TYPE_INVITE;
        case HongBaoV2.TYPE_SHARE_GOODS:
            return OrderMoneyRecievedMQMsg.TYPE_SHARE;
        case HongBaoV2.TYPE_ZIGOU:
            return OrderMoneyRecievedMQMsg.TYPE_ZIGOU;
        }
        return 0;
    }
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // 根据红包做订单分发
        if (MQTopicName.TOPIC_ORDER.name().equalsIgnoreCase(message.getTopic())) {
            if (OrderTopicTagEnum.orderFanLiActual.name().equalsIgnoreCase(message.getTag())) {
                String key = message.getKey();
                List<HongBaoV2SettleTemp> list = hongBaoV2SettleTempService.listByKey(key);
                Map<String, BigDecimal> moneyMap = new HashMap<>();
                // 临时订单消息
                for (HongBaoV2SettleTemp temp : list) {
                    long hongBaoId = temp.getHongBaoId();
                    HongBaoV2 v2 = hongBaoV2Service.selectByPrimaryKey(hongBaoId);
                    if (v2 != null) {
                        long mainHongBaoId = v2.getId();
                        if (v2.getParent() != null)
                            mainHongBaoId = v2.getParent().getId();
                        HongBaoOrder hongBaoOrder = hongBaoOrderService.selectDetailByHongBaoId(mainHongBaoId);
                        if (hongBaoOrder != null && hongBaoOrder.getCommonOrder() != null) {
                            String mapKey = String.format("%s#%s#%s#%s", hongBaoOrder.getCommonOrder().getOrderNo(),
                                    hongBaoOrder.getCommonOrder().getSourceType(), v2.getUserInfo().getId(),
                                    getHongBaoType(v2.getType()));
                            if (moneyMap.get(mapKey) == null)
                                moneyMap.put(mapKey, new BigDecimal(0));
                            moneyMap.put(mapKey, moneyMap.get(mapKey).add(v2.getMoney()));
                        }
                    }
                }
                for (Iterator<String> its = moneyMap.keySet().iterator(); its.hasNext();) {
                    String mapKey = its.next();
                    String[] mapKeys = mapKey.split("#");
                    String orderNo = mapKeys[0];
                    String sourceType = mapKeys[1];
                    String uid = mapKeys[2];
                    String type = mapKeys[3];
                    OrderMoneyRecievedMQMsg msg = new OrderMoneyRecievedMQMsg(Integer.parseInt(type),
                            Long.parseLong(uid), Integer.parseInt(sourceType), orderNo, moneyMap.get(mapKey),
                            new Date(), 0);
                    producer.send(MQMsgBodyFactory.create(MQTopicName.TOPIC_ORDER,
                            OrderTopicTagEnum.orderFanLiSeparateByOrderNo, msg));
                }
                return Action.CommitMessage;
            }
        }
        return Action.CommitMessage;
    }
}
fanli/src/main/resource/rocket/consumer.xml
@@ -147,6 +147,37 @@
            </map>
        </property>
    </bean>
    <!-- 订单到账消息订阅 -->
    <bean id="orderMoneyRecievedMessageListener"
        class="com.yeshi.fanli.util.rocketmq.consumer.order.OrderMoneyRecievedMessageListener"></bean>
    <!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean -->
    <bean id="orderMoneyRecievedConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean"
        init-method="start" destroy-method="shutdown">
        <property name="properties"> <!--消费者配置信息 -->
            <props>
                <prop key="AccessKey">${rocketmq.AccessKey}</prop>
                <prop key="SecretKey">${rocketmq.SecretKey}</prop>
                <prop key="GROUP_ID">GID_FANLI</prop>
                <prop key="NAMESRV_ADDR">${rocketmq.NAMESRV_ADDR}</prop>
                <prop key="ConsumeThreadNums">50</prop>
            </props>
        </property>
        <property name="subscriptionTable">
            <map>
                <!-- 订单到账 -->
                <entry value-ref="orderMoneyRecievedMessageListener">
                    <key>
                        <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                            <property name="topic" value="TOPIC_ORDER" />
                            <property name="expression" value="orderFanLiActual" />
                        </bean>
                    </key>
                </entry>
            </map>
        </property>
    </bean>