From fa02ce3cabeba407240db14325c3e1cadfcd514f Mon Sep 17 00:00:00 2001 From: wayn <1669738430@qq.com> Date: Sun, 23 Jan 2022 20:09:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E5=95=86=E5=9F=8E):=20=E4=BC=98=E5=8C=96r?= =?UTF-8?q?abbitMQ=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +-- .../wayn/message/config/WorkRabbitConfig.java | 4 +++ .../core/config/DirectRabbitConfig.java | 9 ++++--- .../core/config/RabbitTemplateConfig.java | 26 ++++++++++--------- .../message/core/constant/SysConstants.java | 8 ++++++ .../api/service/impl/OrderServiceImpl.java | 3 ++- 6 files changed, 35 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index e6aa187..bcbb100 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ ${java.version} 2.5.8 8.0.25 - 6.1.3.RELEASE + 6.1.6.RELEASE 7.13.4 3.5.0 1.2.8 @@ -50,7 +50,7 @@ 1.6.2 3.0.0 4.4.0 - 3.18.2 + 3.18.3 7.9.1 1.4.7 diff --git a/waynboot-message-consumer/src/main/java/com/wayn/message/config/WorkRabbitConfig.java b/waynboot-message-consumer/src/main/java/com/wayn/message/config/WorkRabbitConfig.java index e8e1714..f5d93d1 100644 --- a/waynboot-message-consumer/src/main/java/com/wayn/message/config/WorkRabbitConfig.java +++ b/waynboot-message-consumer/src/main/java/com/wayn/message/config/WorkRabbitConfig.java @@ -27,4 +27,8 @@ public class WorkRabbitConfig { return new OrderDirectReceiver(2); } + @Bean + public OrderDirectReceiver orderWorkReceiver3() { + return new OrderDirectReceiver(3); + } } diff --git a/waynboot-message-core/src/main/java/com/wayn/message/core/config/DirectRabbitConfig.java b/waynboot-message-core/src/main/java/com/wayn/message/core/config/DirectRabbitConfig.java index 65106cd..1358b6d 100644 --- a/waynboot-message-core/src/main/java/com/wayn/message/core/config/DirectRabbitConfig.java +++ b/waynboot-message-core/src/main/java/com/wayn/message/core/config/DirectRabbitConfig.java @@ -1,5 +1,6 @@ package com.wayn.message.core.config; +import com.wayn.message.core.constant.SysConstants; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; @@ -22,7 +23,7 @@ public class DirectRabbitConfig { */ @Bean public Queue EmailDirectQueue() { - return new Queue("EmailDirectQueue", true); + return new Queue(SysConstants.email_direct_queue, true); } /** @@ -32,7 +33,7 @@ public class DirectRabbitConfig { */ @Bean DirectExchange EmailDirectExchange() { - return new DirectExchange("EmailDirectExchange"); + return new DirectExchange(SysConstants.EMAIL_DIRECT_EXCHANGE); } /** @@ -48,12 +49,12 @@ public class DirectRabbitConfig { /************************************ 订单队列、交换机 begin *******************************************/ @Bean public Queue OrderDirectQueue() { - return new Queue("OrderDirectQueue", true); + return new Queue(SysConstants.ORDER_DIRECT_QUEUE, true); } @Bean DirectExchange OrderDirectExchange() { - return new DirectExchange("OrderDirectExchange"); + return new DirectExchange(SysConstants.ORDER_DIRECT_EXCHANGE); } @Bean diff --git a/waynboot-message-core/src/main/java/com/wayn/message/core/config/RabbitTemplateConfig.java b/waynboot-message-core/src/main/java/com/wayn/message/core/config/RabbitTemplateConfig.java index 7865b29..ea9b113 100644 --- a/waynboot-message-core/src/main/java/com/wayn/message/core/config/RabbitTemplateConfig.java +++ b/waynboot-message-core/src/main/java/com/wayn/message/core/config/RabbitTemplateConfig.java @@ -1,6 +1,7 @@ package com.wayn.message.core.config; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @@ -8,27 +9,28 @@ import org.springframework.stereotype.Component; /** * RabbitTemplate配置,设置生产者confirm确认和消费者手动确认 */ +@Slf4j @Component public class RabbitTemplateConfig { @Bean - public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { - RabbitTemplate rabbitTemplate = new RabbitTemplate(); - rabbitTemplate.setConnectionFactory(connectionFactory); + public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); + // 服务器收到消息确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { - System.out.println("ConfirmCallback: " + "相关数据:" + correlationData); - System.out.println("ConfirmCallback: " + "确认情况:" + ack); - System.out.println("ConfirmCallback: " + "原因:" + cause); + log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); }); + // 消息投递到队列失败回调处理 rabbitTemplate.setReturnsCallback(returned -> { - System.out.println("ReturnCallback: " + "消息:" + returned.getMessage()); - System.out.println("ReturnCallback: " + "回应码:" + returned.getReplyCode()); - System.out.println("ReturnCallback: " + "回应信息:" + returned.getReplyText()); - System.out.println("ReturnCallback: " + "交换机:" + returned.getExchange()); - System.out.println("ReturnCallback: " + "路由键:" + returned.getRoutingKey()); + log.info("ReturnCallback: " + "消息:" + returned.getMessage()); + log.info("ReturnCallback: " + "回应码:" + returned.getReplyCode()); + log.info("ReturnCallback: " + "回应信息:" + returned.getReplyText()); + log.info("ReturnCallback: " + "交换机:" + returned.getExchange()); + log.info("ReturnCallback: " + "路由键:" + returned.getRoutingKey()); }); + return rabbitTemplate; } } diff --git a/waynboot-message-core/src/main/java/com/wayn/message/core/constant/SysConstants.java b/waynboot-message-core/src/main/java/com/wayn/message/core/constant/SysConstants.java index 26f3586..2ecf408 100644 --- a/waynboot-message-core/src/main/java/com/wayn/message/core/constant/SysConstants.java +++ b/waynboot-message-core/src/main/java/com/wayn/message/core/constant/SysConstants.java @@ -3,4 +3,12 @@ package com.wayn.message.core.constant; public class SysConstants { public static final int RESULT_SUCCESS_CODE = 200; + + public static final String email_direct_queue = "EmailDirectQueue"; + public static final String EMAIL_DIRECT_EXCHANGE = "EmailDirectExchange"; + public static final String EMAIL_DIRECT_ROUTING = "EmailDirectRouting"; + + public static final String ORDER_DIRECT_QUEUE = "OrderDirectQueue"; + public static final String ORDER_DIRECT_EXCHANGE = "OrderDirectExchange"; + public static final String ORDER_DIRECT_ROUTING = "OrderDirectRouting"; } diff --git a/waynboot-mobile-api/src/main/java/com/wayn/mobile/api/service/impl/OrderServiceImpl.java b/waynboot-mobile-api/src/main/java/com/wayn/mobile/api/service/impl/OrderServiceImpl.java index cf0b09e..78f6acf 100644 --- a/waynboot-mobile-api/src/main/java/com/wayn/mobile/api/service/impl/OrderServiceImpl.java +++ b/waynboot-mobile-api/src/main/java/com/wayn/mobile/api/service/impl/OrderServiceImpl.java @@ -39,6 +39,7 @@ import com.wayn.common.util.R; import com.wayn.common.util.bean.MyBeanUtil; import com.wayn.common.util.ip.IpUtils; import com.wayn.data.redis.manager.RedisCache; +import com.wayn.message.core.constant.SysConstants; import com.wayn.message.core.messsage.OrderDTO; import com.wayn.mobile.api.domain.Cart; import com.wayn.mobile.api.mapper.OrderMapper; @@ -262,7 +263,7 @@ public class OrderServiceImpl extends ServiceImpl implements .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); - rabbitTemplate.convertAndSend("OrderDirectExchange", "OrderDirectRouting", message, correlationData); + rabbitTemplate.convertAndSend(SysConstants.ORDER_DIRECT_EXCHANGE, SysConstants.ORDER_DIRECT_ROUTING, message, correlationData); } catch (UnsupportedEncodingException e) { log.error(e.getMessage(), e); }