|
|
|
@ -31,7 +31,7 @@ public class OrderDelayConsumer {
|
|
|
|
|
String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
|
|
|
|
|
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
|
|
|
|
// 消费者消费消息时幂等性处理
|
|
|
|
|
if (redisCache.getCacheMap(UNPAID_ORDER_CONSUMER_MAP.getKey()).containsKey(msgId)) {
|
|
|
|
|
if (redisCache.getCacheObject(UNPAID_ORDER_CONSUMER_MAP.getKey(msgId)) != null) {
|
|
|
|
|
// redis中包含该 key,说明该消息已经被消费过
|
|
|
|
|
log.error("msgId: {},消息已经被消费", msgId);
|
|
|
|
|
channel.basicAck(deliveryTag, false);// 确认消息已消费
|
|
|
|
@ -41,7 +41,7 @@ public class OrderDelayConsumer {
|
|
|
|
|
mobileApi.unpaidOrder(body);
|
|
|
|
|
// multiple参数:确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
|
|
|
|
|
channel.basicAck(deliveryTag, false);
|
|
|
|
|
redisCache.setCacheObject(UNPAID_ORDER_CONSUMER_MAP.getKey(), msgId, ORDER_CONSUMER_MAP.getExpireSecond());
|
|
|
|
|
redisCache.setCacheObject(UNPAID_ORDER_CONSUMER_MAP.getKey(msgId), msgId, ORDER_CONSUMER_MAP.getExpireSecond());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
channel.basicNack(deliveryTag, false, false);
|
|
|
|
|
log.error(e.getMessage(), e);
|
|
|
|
|