fanli/src/main/java/com/yeshi/fanli/service/impl/hongbao/ThreeSaleSerivceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
fanli/src/main/java/com/yeshi/fanli/util/SpringContext.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
fanli/src/main/java/com/yeshi/fanli/util/ThreeSaleCMQManager.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utils/src/main/java/org/yeshi/utils/CMQUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
fanli/src/main/java/com/yeshi/fanli/service/impl/hongbao/ThreeSaleSerivceImpl.java
@@ -34,6 +34,7 @@ import com.yeshi.fanli.service.inter.user.UserInfoService; import com.yeshi.fanli.util.Constant; import com.yeshi.fanli.util.StringUtil; import com.yeshi.fanli.util.ThreeSaleCMQManager; import net.sf.json.JSONArray; import net.sf.json.JSONObject; @@ -196,6 +197,8 @@ dao.update(threeSale); if (inviter != null) reComputeUserRank(inviter.getId()); //添加到队列 ThreeSaleCMQManager.getInstance().addThreeSaleMsg(threeSale); // 通知 userInviteMsgNotificationService.inviteSuccess(inviter.getId(), threeSale); // } fanli/src/main/java/com/yeshi/fanli/util/SpringContext.java
@@ -17,6 +17,7 @@ import com.yeshi.fanli.entity.bus.user.AlipayTransferResultInfo; import com.yeshi.fanli.entity.bus.user.HongBaoV2; import com.yeshi.fanli.entity.bus.user.ThreeSale; import com.yeshi.fanli.entity.push.PushQueueRecord; import com.yeshi.fanli.entity.taobao.TaoBaoOrder; import com.yeshi.fanli.entity.taobao.TaoBaoWeiQuanOrder; @@ -386,6 +387,59 @@ } } }); } /** * 邀请关系变化后券的更新 */ public void doThreeSaleUserCouponJob() { // 采用2个线程做更新 for (int i = 0; i < 2; i++) executor.execute(new Runnable() { @Override public void run() { while (true) { try { Map<String, ThreeSale> map = ThreeSaleCMQManager.getInstance() .consumeQueueMsg(ThreeSaleCMQManager.QUEUE_USER_COUPON, 16); if (map != null) { Iterator<String> its = map.keySet().iterator(); while (its.hasNext()) { String key = its.next(); try { ThreeSale threeSale = map.get(key); if (threeSale != null) { if (threeSale.getState() != null && threeSale.getState() == true) { // 邀请成功 // TODO 券激活生效 // threeSale.getBoss().getId(); } } ThreeSaleCMQManager.getInstance() .deleteQueueMsg(ThreeSaleCMQManager.QUEUE_USER_COUPON, key); } catch (Exception e) { try { LogHelper.errorDetailInfo(e); } catch (Exception e1) { e1.printStackTrace(); } } } } } catch (Exception e) { LogHelper.error("更新商品出错:" + e.getMessage()); } } } }); } } fanli/src/main/java/com/yeshi/fanli/util/ThreeSaleCMQManager.java
New file @@ -0,0 +1,88 @@ package com.yeshi.fanli.util; import java.lang.reflect.Type; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.yeshi.utils.CMQUtil; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.qcloud.cmq.Message; import com.yeshi.fanli.entity.bus.user.ThreeSale; import com.yeshi.fanli.entity.taobao.TaoBaoOrder; import net.sf.json.JSONObject; public class ThreeSaleCMQManager { private static String secretId = "AKIDTlpgJhLjOozvd6QI2XnpfGbgV4NQJk25"; private static String secretKey = "xhCSUHo55oHUQ6XicFcmfIgspX0EEzWo"; private static ThreeSaleCMQManager threeSaleCMQManager; private static CMQUtil cmqUtil; private final static String TOPIC_NAME = "topic_three_sale"; public static String QUEUE_USER_COUPON = TOPIC_NAME + "_" + "user_coupon"; public static String SUBSCRIBE_USER_COUPON = "user_coupon"; static { cmqUtil = CMQUtil.getInstance(secretId, secretKey); // 创建主题,添加订阅 cmqUtil.createTopic(TOPIC_NAME); // 用户券订阅 String[] subscripts = new String[] { SUBSCRIBE_USER_COUPON }; String[] queues = new String[] { QUEUE_USER_COUPON }; for (int i = 0; i < subscripts.length; i++) { String queueName = queues[i]; cmqUtil.createQueue(queueName); cmqUtil.subscribeTopic(TOPIC_NAME, subscripts[i], queueName); } } public static ThreeSaleCMQManager getInstance() { if (threeSaleCMQManager == null) threeSaleCMQManager = new ThreeSaleCMQManager(); return threeSaleCMQManager; } /** * 发布 * * @param history */ public void addThreeSaleMsg(ThreeSale threeSale) { if (threeSale == null) return; cmqUtil.publishTopicMessage(TOPIC_NAME, new Gson().toJson(threeSale)); } /** * 消费队列消息 * * @param queueName * @param count * @return */ public Map<String, ThreeSale> consumeQueueMsg(String queueName, int count) { List<Message> list = cmqUtil.recieveMsg(count, queueName); Map<String, ThreeSale> map = new HashMap<>(); if (list != null) for (Message msg : list) { String result = msg.msgBody; ThreeSale threeSale = new Gson().fromJson(result, ThreeSale.class); map.put(msg.receiptHandle, threeSale); } return map; } public void deleteQueueMsg(String queueName, String receiptHandle) { cmqUtil.deleteMsg(queueName, receiptHandle); } } utils/src/main/java/org/yeshi/utils/CMQUtil.java
@@ -7,6 +7,7 @@ import com.qcloud.cmq.Message; import com.qcloud.cmq.Queue; import com.qcloud.cmq.QueueMeta; import com.qcloud.cmq.Topic; //腾讯CMQ消息 public class CMQUtil { @@ -26,10 +27,12 @@ // 内网 http://cmq-queue-gz.api.tencentyun.com // 外网 http://cmq-queue-gz.api.qcloud.com private static String endpoint = "http://cmq-queue-gz.api.qcloud.com"; private static String topicEndPoint = "https://cmq-topic-gz.api.qcloud.com"; // private static String endpoint = // "http://cmq-queue-gz.api.tencentyun.com"; private Account account; private Account topicAccount; static { // if (SystemUtil.getSystemType() == SystemUtil.SYSTEM_LINUX) @@ -42,6 +45,7 @@ this.secretId = secretId; this.secretKey = secretKey; account = new Account(endpoint, this.secretId, this.secretKey); topicAccount = new Account(topicEndPoint, this.secretId, this.secretKey); } public boolean existQueue(String queueName) { @@ -192,17 +196,17 @@ */ public List<Message> recieveMsg(int count, String queueName) { Queue queue = getQueue(queueName); if (queue == null ) { if (queue == null) { return null; } List<Message> msgList = null; try { msgList = queue.batchReceiveMessage(count, 20); return msgList; } catch (Exception e) { if (e.getMessage()!=null&& !e.getMessage().contains("no message")) if (e.getMessage() != null && !e.getMessage().contains("no message")) e.printStackTrace(); } return null; @@ -230,4 +234,118 @@ } return false; } /** * 订阅消息相关 */ /** * 创建订阅主题 * * @param topicName-主题名称 * @param maxMsgSize-消息最大长度 * @param filterType-过滤类型 * @return */ public boolean createTopic(String topicName, int maxMsgSize, int filterType) { try { topicAccount.createTopic(topicName, maxMsgSize, filterType); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 创建默认参数的主题 * * @param topicName * @return */ public boolean createTopic(String topicName) { try { topicAccount.createTopic(topicName, 65536); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 订阅主题 * * @param topicName-主题名称 * @param subscriptionName-订阅名称 * @param queueName-接受消息的队列名称 * @return */ public boolean subscribeTopic(String topicName, String subscriptionName, String queueName) { try { topicAccount.createSubscribe(topicName, subscriptionName, queueName, "queue"); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 删除订阅 * * @param topicName * @param subscriptionName * @return */ public boolean deleteSubscribeTopic(String topicName, String subscriptionName) { try { topicAccount.deleteSubscribe(topicName, subscriptionName); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 发布订阅消息 * * @param topicName * @param message * @return */ public boolean publishTopicMessage(String topicName, String message) { try { Topic topic = topicAccount.getTopic(topicName); if (topic == null) return false; topic.publishMessage(message); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 批量发布消息 * * @param topicName * @param msgList * @return */ public boolean batchPublishTopicMessage(String topicName, List<String> msgList) { try { Topic topic = topicAccount.getTopic(topicName); if (topic == null) return false; topic.batchPublishMessage(msgList); return true; } catch (Exception e) { e.printStackTrace(); } return false; } }