yujian
2020-04-18 b803abf95ed0cc721f8f9e767283e5ac6b8ca090
活跃消息
1个文件已修改
1个文件已添加
88 ■■■■■ 已修改文件
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/user/SyncBeforeInfoMessageListener.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/resource/rocket/consumer.xml 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fanli/src/main/java/com/yeshi/fanli/util/rocketmq/consumer/user/SyncBeforeInfoMessageListener.java
New file
@@ -0,0 +1,58 @@
package com.yeshi.fanli.util.rocketmq.consumer.user;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.google.gson.Gson;
import com.yeshi.fanli.dto.mq.user.UserTopicTagEnum;
import com.yeshi.fanli.dto.mq.user.body.UserActiveMQMsg;
import com.yeshi.fanli.log.LogHelper;
import com.yeshi.fanli.service.inter.user.UserInfoModifyRecordService;
import com.yeshi.fanli.service.inter.user.vip.TeamUserLevelStatisticService;
import com.yeshi.fanli.util.rocketmq.MQTopicName;
/**
 * 同步消息
 *
 * @author Administrator
 *
 */
@Component
public class SyncBeforeInfoMessageListener implements MessageListener {
    @Resource
    private UserInfoModifyRecordService userInfoModifyRecordService;
    @Resource
    private TeamUserLevelStatisticService teamUserLevelStatisticService;
    @Override
    public Action consume(Message message, ConsumeContext context) {
        LogHelper.mqInfo("consumer-SyncBeforeInfoMessageListener", message.getMsgID(), message.getTopic(), message.getTag(),
                new String(message.getBody()));
        String tag = message.getTag();
        if (tag == null)
            tag = "";
        // 邀请相关
        if (MQTopicName.TOPIC_USER.name().equalsIgnoreCase(message.getTopic())) {
            // 用户活跃
            if (tag.equalsIgnoreCase(UserTopicTagEnum.userActve.name())) {
                UserActiveMQMsg msg = new Gson().fromJson(new String(message.getBody()), UserActiveMQMsg.class);
                Long uid = msg.getUid();
                // 老用户同步绑定信息
                userInfoModifyRecordService.syncBeforeInfo(uid);
            }
        }
        return Action.CommitMessage;
    }
}
fanli/src/main/resource/rocket/consumer.xml
@@ -283,7 +283,35 @@
    </bean>
    <!-- 同步老用户信息 -->
    <bean id="syncBeforeInfoMessageListener"
        class="com.yeshi.fanli.util.rocketmq.consumer.user.SyncBeforeInfoMessageListener"></bean>
    <!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean -->
    <bean id="syncBeforeInfoConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean"
        init-method="start" destroy-method="shutdown">
        <property name="properties"> <!--消费者配置信息 -->
            <props>
                <prop key="AccessKey">${rocketmq.AccessKey}</prop>
                <prop key="SecretKey">${rocketmq.SecretKey}</prop>
                <prop key="GROUP_ID">GID_SYNC_BEFORE_INFO</prop>
                <prop key="NAMESRV_ADDR">${rocketmq.NAMESRV_ADDR}</prop>
                <prop key="ConsumeThreadNums">50</prop>
            </props>
        </property>
        <property name="subscriptionTable">
            <map>
                <!--用户活跃 -->
                <entry value-ref="syncBeforeInfoMessageListener">
                    <key>
                        <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                            <property name="topic" value="TOPIC_USER" />
                            <property name="expression" value="userActve" />
                        </bean>
                    </key>
                </entry>
            </map>
        </property>
    </bean>