| | |
| | | package com.ks.push; |
| | | |
| | | import com.ks.push.consumer.mq.PushTaskConsumer; |
| | | import com.ks.push.dto.mq.InvalidDeviceTokenInfo; |
| | | import com.ks.push.manager.CMQManager; |
| | | import com.ks.push.manager.PushDeviceTokenManager; |
| | | import com.ks.push.pojo.DO.BPushDeviceToken; |
| | | import com.ks.push.pojo.DO.PushPlatform; |
| | | import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; |
| | | import org.mybatis.spring.annotation.MapperScan; |
| | | import org.slf4j.Logger; |
| | |
| | | import org.springframework.context.ApplicationListener; |
| | | import org.springframework.context.event.ContextRefreshedEvent; |
| | | import org.springframework.transaction.annotation.EnableTransactionManagement; |
| | | import org.yeshi.utils.mq.JobThreadExecutorServiceImpl; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.List; |
| | | |
| | | @SpringBootApplication |
| | | @EnableTransactionManagement |
| | |
| | | @EnableDubbo(scanBasePackages = "com.ks.push.service.remote") |
| | | public class PushApplication implements ApplicationListener<ContextRefreshedEvent> { |
| | | private final static Logger logger = LoggerFactory.getLogger(PushApplication.class); |
| | | |
| | | @Resource |
| | | private PushTaskConsumer pushTaskConsumer; |
| | | |
| | | @Resource |
| | | private PushDeviceTokenManager pushDeviceTokenManager; |
| | | |
| | | public static void main(String[] args) { |
| | | SpringApplication.run(PushApplication.class, args); |
| | |
| | | @Override |
| | | public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { |
| | | logger.info("容器加载完毕"); |
| | | initMQMsgConsumer(); |
| | | } |
| | | |
| | | private void initMQMsgConsumer() { |
| | | final int THREAD_NUM = 3; |
| | | for (PushPlatform pushPlatform : PushPlatform.values()) { |
| | | //创建三条队列处理 |
| | | for (int i = 0; i < THREAD_NUM; i++) { |
| | | new JobThreadExecutorServiceImpl().run(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | pushTaskConsumer.consumeMsg(pushPlatform); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | //清理无效token |
| | | new JobThreadExecutorServiceImpl().run(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | List<CMQManager.MQMsgConsumeResult> list = CMQManager.getInstance().consumeInvalidDeviceTokenQueue(16); |
| | | if (list != null) { |
| | | logger.info("清理无效token数量:" + list.size()); |
| | | for (CMQManager.MQMsgConsumeResult result : list) { |
| | | InvalidDeviceTokenInfo tokenInfo = (InvalidDeviceTokenInfo) result.getData(); |
| | | List<BPushDeviceToken> tokenList = pushDeviceTokenManager.list(tokenInfo.getAppCode(), tokenInfo.getPushPlatform(), tokenInfo.getToken(), 1, 20); |
| | | if (tokenList != null) { |
| | | for (BPushDeviceToken token : tokenList) { |
| | | logger.info("清理无效token id-{}" + token.getId()); |
| | | //删除 |
| | | pushDeviceTokenManager.deleteByPrimaryKey(token.getId()); |
| | | } |
| | | } |
| | | CMQManager.getInstance().deleteMsg(CMQManager.PUSH_TOKEN_INVALID, result.getReceiptHandle()); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | |
| | | } |