refactor(商城): 消息处理模块代码优化

master
wayn 3 years ago
parent c3b0493b8b
commit 97f1284c1a

@ -197,6 +197,14 @@ public class RedisCache {
redisTemplate.opsForHash().put(key, hKey, value);
}
public <T> void delCacheMapValue(final String key, final String hKey) {
redisTemplate.opsForHash().delete(key, hKey);
}
public long incrByCacheMapValue(final String key, final String hKey, long value) {
return redisTemplate.opsForHash().increment(key, hKey, value);
}
/**
* Hash
*

@ -20,7 +20,6 @@ import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@RabbitListener(queues = "EmailDirectQueue")
@ -38,9 +37,15 @@ public class EmailDirectReceiver {
String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 消费者消费消息时幂等性处理
if (redisCache.getCacheMap("email_consumer_set").containsKey(msgId)) {
if (redisCache.getCacheMap("email_consumer_map").containsKey(msgId)) {
// redis中包含该 key说明该消息已经被消费过
log.info(msgId + ":消息已经被消费");
log.error("msgId: {},消息已经被消费", msgId);
channel.basicAck(deliveryTag, false);// 确认消息已消费
return;
}
int retryCount = 3;
if (redisCache.incrByCacheMapValue("email_consumer_map", msgId, 1) > retryCount) {
log.error("msgId: {},已经消费{}次,超过最大消费次数!", msgId, retryCount);
channel.basicAck(deliveryTag, false);// 确认消息已消费
return;
}
@ -68,8 +73,7 @@ public class EmailDirectReceiver {
}
// multiple参数确认收到消息false只确认当前consumer一个消息收到true确认所有consumer获得的消息
channel.basicAck(deliveryTag, false);
redisCache.setCacheMapValue("email_consumer_set", msgId, "email has send");
redisCache.expire("email_consumer_set", 180, TimeUnit.SECONDS);
redisCache.delCacheMapValue("email_consumer_map", msgId);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
log.error(e.getMessage(), e);

@ -19,7 +19,6 @@ import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class OrderDirectReceiver {
@ -42,9 +41,16 @@ public class OrderDirectReceiver {
String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 消费者消费消息时幂等性处理
if (redisCache.getCacheMap("order_consumer_set").containsKey(msgId)) {
if (redisCache.getCacheMap("order_consumer_map").containsKey(msgId)) {
// redis中包含该 key说明该消息已经被消费过
log.info(msgId + ":消息已经被消费");
log.error("msgId: {},消息已经被消费", msgId);
channel.basicAck(deliveryTag, false);// 确认消息已消费
return;
}
int retryCount = 3;
if (redisCache.incrByCacheMapValue("order_consumer_map", msgId, 1) > retryCount) {
// redis中包含该 key说明该消息已经被消费过
log.error("msgId: {},已经消费{}次,超过最大消费次数!", msgId, retryCount);
channel.basicAck(deliveryTag, false);// 确认消息已消费
return;
}
@ -70,8 +76,7 @@ public class OrderDirectReceiver {
}
// multiple参数确认收到消息false只确认当前consumer一个消息收到true确认所有consumer获得的消息
channel.basicAck(deliveryTag, false);
redisCache.setCacheMapValue("order_consumer_set", msgId, "order done");
redisCache.expire("order_consumer_set", 180, TimeUnit.SECONDS);
redisCache.delCacheMapValue("order_consumer_map", msgId);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
log.error(e.getMessage(), e);

Loading…
Cancel
Save