From 97f1284c1acd29801332c4964428b7c8835f0544 Mon Sep 17 00:00:00 2001 From: wayn <1669738430@qq.com> Date: Thu, 5 Aug 2021 00:09:17 +0800 Subject: [PATCH] =?UTF-8?q?refactor(=E5=95=86=E5=9F=8E):=20=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=A4=84=E7=90=86=E6=A8=A1=E5=9D=97=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/wayn/data/redis/manager/RedisCache.java | 8 ++++++++ .../wayn/message/reciver/EmailDirectReceiver.java | 14 +++++++++----- .../wayn/message/reciver/OrderDirectReceiver.java | 15 ++++++++++----- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/waynboot-data/waynboot-data-redis/src/main/java/com/wayn/data/redis/manager/RedisCache.java b/waynboot-data/waynboot-data-redis/src/main/java/com/wayn/data/redis/manager/RedisCache.java index 94e0519..3d8210a 100644 --- a/waynboot-data/waynboot-data-redis/src/main/java/com/wayn/data/redis/manager/RedisCache.java +++ b/waynboot-data/waynboot-data-redis/src/main/java/com/wayn/data/redis/manager/RedisCache.java @@ -197,6 +197,14 @@ public class RedisCache { redisTemplate.opsForHash().put(key, hKey, value); } + public void delCacheMapValue(final String key, final String hKey) { + redisTemplate.opsForHash().delete(key, hKey); + } + + public long incrByCacheMapValue(final String key, final String hKey, long value) { + return redisTemplate.opsForHash().increment(key, hKey, value); + } + /** * 获取Hash中的数据 * diff --git a/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/EmailDirectReceiver.java b/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/EmailDirectReceiver.java index 6b50561..f8390e6 100644 --- a/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/EmailDirectReceiver.java +++ b/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/EmailDirectReceiver.java @@ -20,7 +20,6 @@ import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.util.Map; -import java.util.concurrent.TimeUnit; @Slf4j @RabbitListener(queues = "EmailDirectQueue") @@ -38,9 +37,15 @@ public class EmailDirectReceiver { String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消费者消费消息时幂等性处理 - if (redisCache.getCacheMap("email_consumer_set").containsKey(msgId)) { + if (redisCache.getCacheMap("email_consumer_map").containsKey(msgId)) { // redis中包含该 key,说明该消息已经被消费过 - log.info(msgId + ":消息已经被消费"); + log.error("msgId: {},消息已经被消费", msgId); + channel.basicAck(deliveryTag, false);// 确认消息已消费 + return; + } + int retryCount = 3; + if (redisCache.incrByCacheMapValue("email_consumer_map", msgId, 1) > retryCount) { + log.error("msgId: {},已经消费{}次,超过最大消费次数!", msgId, retryCount); channel.basicAck(deliveryTag, false);// 确认消息已消费 return; } @@ -68,8 +73,7 @@ public class EmailDirectReceiver { } // multiple参数:确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息 channel.basicAck(deliveryTag, false); - redisCache.setCacheMapValue("email_consumer_set", msgId, "email has send"); - redisCache.expire("email_consumer_set", 180, TimeUnit.SECONDS); + redisCache.delCacheMapValue("email_consumer_map", msgId); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); log.error(e.getMessage(), e); diff --git a/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/OrderDirectReceiver.java b/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/OrderDirectReceiver.java index 598132d..61a9a7a 100644 --- a/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/OrderDirectReceiver.java +++ b/waynboot-message-consumer/src/main/java/com/wayn/message/reciver/OrderDirectReceiver.java @@ -19,7 +19,6 @@ import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; import java.io.IOException; -import java.util.concurrent.TimeUnit; @Slf4j public class OrderDirectReceiver { @@ -42,9 +41,16 @@ public class OrderDirectReceiver { String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消费者消费消息时幂等性处理 - if (redisCache.getCacheMap("order_consumer_set").containsKey(msgId)) { + if (redisCache.getCacheMap("order_consumer_map").containsKey(msgId)) { // redis中包含该 key,说明该消息已经被消费过 - log.info(msgId + ":消息已经被消费"); + log.error("msgId: {},消息已经被消费", msgId); + channel.basicAck(deliveryTag, false);// 确认消息已消费 + return; + } + int retryCount = 3; + if (redisCache.incrByCacheMapValue("order_consumer_map", msgId, 1) > retryCount) { + // redis中包含该 key,说明该消息已经被消费过 + log.error("msgId: {},已经消费{}次,超过最大消费次数!", msgId, retryCount); channel.basicAck(deliveryTag, false);// 确认消息已消费 return; } @@ -70,8 +76,7 @@ public class OrderDirectReceiver { } // multiple参数:确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息 channel.basicAck(deliveryTag, false); - redisCache.setCacheMapValue("order_consumer_set", msgId, "order done"); - redisCache.expire("order_consumer_set", 180, TimeUnit.SECONDS); + redisCache.delCacheMapValue("order_consumer_map", msgId); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); log.error(e.getMessage(), e);