目录:
- Spring Boot 整合 "RabbitMQ" ( 消息中间件 )实现 :
- 一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )
- 1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用
- (1) 创建项目,"全局配置文件" 中配置信息
- (2) 使用 AmqpAdmin "定制消息发送组件"
- (3) 查看 "RabbitMQ消息组件" 的定制效果
- (4) "消息发送者" 发送消息 :
- ① 创建 "实体类对象" ( 存储发送的"消息内容" )
- ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
- ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
- (5) "消息消费者" 接收消息
- 1.2 基于 "配置类" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用
- (1) 创建项目,"全局配置文件" 中配置信息
- (2) 使用 "配置类"的方式 "定制消息发送组件"
- (3) "消息发送者" 发送消息 :
- ① 创建 "实体类对象" ( 存储发送的"消息内容" )
- ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
- ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
- (4) "消息消费者" 接收消息
- 1.3 基于 "注解" 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用
- (1) 创建项目,"全局配置文件" 中配置信息
- (2) 使用 "注解" 的方式定制 "消息发送组件" 和 "消息消费者" "接收消息"
- (3) "消息发送者" 发送消息 :
- ① 创建 "实体类对象" ( 存储发送的"消息内容" )
- ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
- ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
- (4) 控制台查看 "消费者" 消费信息情况
- 二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式
- 2.1 基于 "注解" 的方式 ( 实现 Routing "路由模式"工作模式 )
- (1) 创建项目,"全局配置文件" 中配置信息
- (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
- (3) "消息发送者" 发送消息 :
- ① 创建 "实体类对象" ( 存储发送的"消息内容" )
- ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
- ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
- (4) 控制台 查看 "消费者" 消费信息情况
- 三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式
- 3.1 基于 "注解" 的方式 ( 实现 Topics"通配符模式"工作模式 )
- (1) 创建项目,"全局配置文件" 中配置信息
- (2) 使用 "注解" 的方式 "定制消息发送组件" 和 "消息消费者" "接收消息"
- (3) "消息发送者" 发送消息 :
- ① 创建 "实体类对象" ( 存储发送的"消息内容" )
- ② 使用 "RabbitTemplate模板类" 实现 "消息发送"
- ③ 自定义 "消息转换器" ( 对"消息"进行转换,保持较好的"可视化效果")
- (4) 控制台 查看 "消费者" 消费信息情况
Spring Boot 整合 “RabbitMQ” ( 消息中间件 )实现 :
作者简介 :一只大皮卡丘,计算机专业学生,正在努力学习、努力敲代码中! 让我们一起继续努力学习!
该文章参考学习教材为:
《Spring Boot企业级开发教程》 黑马程序员 / 编著
文章以课本知识点 + 代码为主线,结合自己看书学习过程中的理解和感悟 ,最终成就了该文章文章用于本人学习使用 , 同时希望能帮助大家。
欢迎大家点赞👍 收藏⭐ 关注💖哦!!!(侵权可联系我,进行删除,如果雷同,纯属巧合)
- Spring Boot 可对 RabbitMQ 的 工作模式 ( 6种工作模式 )进行了 整合,并支持 多种整合方式,包括 : ① 基于API 的方式、 ② 基于配置类的方式、 ③ 基于注解的方式。
- 下面将 选取常用 的 ① Publish/Subscribe ( 发布订阅 )工作模式、 ② Routing ( 路由 )工作模式 和 Topics ( 通配符模式 ) 工作模式 3种工作模式完成在 Spring Boot 项目 中的 消息服务整合实现 ( 整合实现"消息中间件" )。
一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )
- Spring Boot 整合 RabbitMQ 中间件实现消息服务,主要围绕3个部分的工作进行展开 :
定制中间件、消息发送者发送消息、消息消费者接收消息。其中,定制中间件是 比较麻烦的工作,且必须预先定制。- 下面我们以 用户注册成功后 同时 “发送邮件通知” 和 “发送短信通知” 这一场景为例,分别使用 : ① 基于API 的方式、 ② 基于配置类的方式、 ③ 基于注解的方式 这 3种 方式实现 Publish/Subscribe ( 发布订阅 )工作模式 的整合。
1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用
- 基于API 的方式主要讲的是使用 Spring 框架提供的 API管理类 : AmqpAdmin 定制 消息发送组件,并进行消息发送。
- 这种 定制消息发送组件的方式与在 RabbitMQ 可视化界面上通过 对应面板 进行 组件操作的 实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后 “组装消息队列” 供应用程序调用,从而 实现消息服务。下面我们就对这种 基于 API的方式进行 讲解和演示 。
(1) 创建项目,“全局配置文件” 中配置信息
创建项目,"全局配置文件"中 配置信息 :
application.properties ( 全局配置文件 ):
#配置RabbitMQ消息中间件的"连接配置" spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/ , 默认可省略 spring.rabbitmq.virtual-host=/
pom.xml :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.myh</groupId> <artifactId>chapter_22</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chapter_22</name> <description>chapter_22</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <!-- <build>--> <!-- <plugins>--> <!-- <plugin>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-maven-plugin</artifactId>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </build>--> </project>
(2) 使用 AmqpAdmin “定制消息发送组件”
在项目的测试类 : Chapter22ApplicationTests , 在该测试类先引入 AmqpAdmin 管理类 定制 Publish/Subscribe 工作模式 所需的 消息组件,代码如下所示 :
Chapter22ApplicationTests.Java ( 测试类 ) :
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter22ApplicationTests { @Test void contextLoads() { } //自动注入 RabbitMQ消息中间件的API操作类: AmqpAdmin @Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin() { //1.定义"fanout"类型的交换器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定义两个"默认持久化队列" , 分别处理email 和 sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); //该队列处理"邮件信息" amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));//该队列处理"短信信息" //3.将两个队列分别与"交换器"进行绑定 amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE,"fanout_exchange","",null)); } }
在上面的代码中,使用 Spring 框架提供的消息管理组件 AmqpAdmin定制了消息组件。其中定义了一个 fanout 类型的交换器 : fanout_exchange ;
还定义了两个消息队列 : fanout_queue_email 和 fanout_queue_sms,分别用来处理邮件信息和短信信息 ;最后 将定义的两个队列分别与 交换器绑定。
(3) 查看 “RabbitMQ消息组件” 的定制效果
执行上述单元测试方法 : amgpAdmin(),验证 RabbitMQ 消息组件的定制效果。单元测试方法执行成功后,可通过 RabbitMQ 可视化管理页面 ,具体操作为: 打开RabbitMQ安装目录中的 sbin目录,双击 rabbitmq-server.bat ,后在浏览器访问访问 : http://localhost:15672 ,输入密码和密码 : guest , 此时 可在可视化界面 中 查询 “RabbitMQ消息组件” 的 定制效果 ,如下图所示 :
通过上面图片看出,在 Queues 队列面板页面中,展示有定制的消息队列信息,这与程序中 "定制"的消息队列一致。可以单击消息队列名称查看每个队列的详情。
通过上述操作可以发现,在管理页面中提供了 消息组件交换器、队列的定制功能。在程序中使用 Spring 框架提供的管理员 API组件 AmqpAdmin 定制消息组件和在管理页面上手动定制消息组件的 本质是一样 的。
(4) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
完成消息组件的 定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain包,在其中创建实体类。
User.java
public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容" private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
在 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送,示例代码如下 ( 在原测试类中加入以下"主要代码") :
Chapter22ApplicationTests.java :
import com.myh.chapter_22.domain.User; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter22ApplicationTests { @Test void contextLoads() { } @Autowired //引入进行"消息中间件"管理的 RabbitTemplate组件对象 private RabbitTemplate rabbitTemplate; /** * "消息发送者" 发送信息 */ @Test public void psubPublisher() { User user = new User(); //消息发送者 要发送的"消息内容" user.setId(1); user.setUsername("石头"); /* 使用 RabbitTemplate中的 convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为 String exchange : 表示发送信息的"交换器" String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定" Object object : 表示要"发送的信息内容" */ //执行哪个"交换器" 和 指定给该"交换器"的"消息内容" rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容" } }
上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理 的 RabbitTemplate 组件对象,然后使用该模板工具类的
convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的第 1个参数表示发送消息 的 交换器,这个参数值要与之前定制的交换器名称一致 ;第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式,所以不需要指定 ;第3 个参数是发送的消息内容,接收 Object 类型。
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
执行上述 消息发送 的 测试方法 : psubPublisher( ),控制台执行效果如 下图所示 :
如果要 解决 上述 消息中间件发送 实体类消息出现的 异常,通常可以采用两种解决方案 :
① 第一种 是执行 JDK 自带 的 serializable 序列化接口 ;
② 第二种是 定制其他类型的 消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般 推荐使用第二种方式。配置代码如下所示 :
自定义 “消息转换器” 的配置代码如下所示 :
RabbitMQConfig.java :
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" @Bean //将该类的返回值对象加入到IOC容器中 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" } }
再次执行 psubPublisher( ) 方法,该方法 执行成功后,查看 RabbitMQ可视化管理页面 中的 Queues面板信息,具体如下图所示 :
进入 队列详情页面查看消息 , 如下图所示 :
从上图看出,在 消息队列中 存储有指定发送 的 消息详情和 其他参数信息,这与程序指定发送的信息完全一致。
(5) “消息消费者” 接收消息
在 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService,
代码如下所示 :RabbitMQService.java :
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service //加入到IOC容器中 public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类 /** * Publish/Subscribe 工作模式接收、处理 "邮件业务" * * @RabbitListener(queues = "fanout_queue_email") : * 使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。 * */ @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作 public void psubConsumerEmail(Message message) { byte[] body = message.getBody(); //转化为字符串 String str = new String(body); System.out.println("邮件业务接收到的信息为: "+str); } /** * Publish/Subscribe 工作模式接收、处理 "短信业务" */ @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作 public void psubConsumerSms(Message message) { byte[] body = message.getBody(); //转化为字符串 String str = new String(body); System.out.println("短信业务接收到的信息为: "+str); } }
在上面的代码中,创建了一个 接收处理 RabbitMQ消息的 业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解 来 监听队列 中 名称为 fanout_queue_email 和 fanout_queue_sms 的 消息,监听 的这 两个 队列是 前面 “指定发送并存储” 消息 的 消息队列。
需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听到指定的队列中有 消息存在 ( 目前两个队列中都存在消息 ),对应注解的方法 会立即接收并消费队列中的 消息 ( 即 注解对应 的 方法会被调用 )。另外,在接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型和 Message 类型。如果使用与消息类型对应的参数接收消息的话,只能够得到 具体的消息体信息 ; 如果使用 Object或者 Message 类型参数接收消息的话,还可以获得除了消息体外的消息参数信息 MessageProperties。
此时 成功启动项目后,控制台显示的消息消费效果如下图所示 :
从上图可以看出,项目启动成功后,消息消费者 “监听” 到 “消息队列” 中存在的 两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面的 Queues 面板查看队列消息情况,会发现两个队列中存储的消息 已经 被消费,如下图所示 :
至此,一条完整的消息发送、消息中间件存储消息消费的 Publish/Subscribe 工作模式的业务案例 已经实现。
ps :
使用的是开发中常用的 @RabbitListener注解 监听指定名称队列的 消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类 的 receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息。
1.2 基于 “配置类” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用
- 基于 配置类 的 方式 主要讲的是使用 Spring Boot 框架提供的 @Configuration 注解 配置类 定制消息发送组件,并进行消息发送。例子代码如下所示。
(1) 创建项目,“全局配置文件” 中配置信息
创建项目,"全局配置文件"中 配置信息 :
application.properties ( 全局配置文件 ):
#配置RabbitMQ消息中间件的"连接配置" spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/ , 默认可省略 spring.rabbitmq.virtual-host=/
pom.xml :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.myh</groupId> <artifactId>chapter_22</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chapter_22</name> <description>chapter_22</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <!-- <build>--> <!-- <plugins>--> <!-- <plugin>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-maven-plugin</artifactId>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </build>--> </project>
(2) 使用 "配置类"的方式 “定制消息发送组件”
使用 "配置类"的方式 “定制消息发送组件” (Publish/Subscribe 工作模式) 代码如下所示 :
RabbitMQConfig.Java ( 配置类 ) :
import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" /** * 使用基于"配置类"的方式定义"消息发送组件" */ //1.定义定义fanout类型的交换器 @Bean public Exchange fanout_exchange() { //创建一个名为"fanout_exchange" 的 "交换器" return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定义两个不同名称的消息队列 @Bean public Queue fanout_queue_email() { //创建消息队列 //创建一个名为 "fanout_queue_email" 的 "消息队列" return new Queue("fanout_queue_email"); } @Bean public Queue fanout_queue_sms() { //创建消息队列 //创建一个名为 "fanout_queue_sms" 的 "消息队列" return new Queue("fanout_queue_sms"); } //3.将两个不同名称的"消息队列" @Bean public Binding bindingEmail() { //将"fanout_queue_email"消息队列 和 "fanout_exchange"交换器 进行"绑定" return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs(); } @Bean public Binding bindingSms() { //将"fanout_queue_sms"消息队列 和 "fanout_exchange"交换器 进行"绑定" return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs(); } }
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
完成消息组件的 定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain包,在其中创建实体类。
User.java
public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容" private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
在 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送,示例代码如下 ( 在原测试类中加入以下"主要代码") :
Chapter22ApplicationTests.java :
import com.myh.chapter_22.domain.User; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter22ApplicationTests { @Test void contextLoads() { } @Autowired //引入进行"消息中间件"管理的 RabbitTemplate组件对象 private RabbitTemplate rabbitTemplate; /** * "消息发送者" 发送信息 */ @Test public void psubPublisher() { User user = new User(); //消息发送者 要发送的"消息内容" user.setId(1); user.setUsername("石头"); /* 使用 RabbitTemplate中的 convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为 String exchange : 表示发送信息的"交换器" String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定" Object object : 表示要"发送的信息内容" */ //执行哪个"交换器" 和 指定给该"交换器"的"消息内容" rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容" } }
上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理 的 RabbitTemplate 组件对象,然后使用该模板工具类的
convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的第 1个参数表示发送消息 的 交换器,这个参数值要与之前定制的交换器名称一致 ;第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式,所以不需要指定 ;第3 个参数是发送的消息内容,接收 Object 类型。
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
执行上述 消息发送的测试方法 : psubPublisher( ),控制台执行效果如 下图所示 :
如果要 解决 上述 消息中间件发送 实体类消息出现的 异常,通常可以采用两种解决方案 :
① 第一种 是执行 JDK 自带 的 serializable 序列化接口 ;
② 第二种是 定制其他类型的 消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般 推荐使用第二种方式。配置代码如下所示 :
自定义 “消息转换器” 的配置代码如下所示 :
RabbitMQConfig.java :
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" @Bean //将该类的返回值对象加入到IOC容器中 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" } }
(4) “消息消费者” 接收消息
在 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService,
代码如下所示 :RabbitMQService.java :
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service //加入到IOC容器中 public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类 /** * Publish/Subscribe 工作模式接收、处理 "邮件业务" * * @RabbitListener(queues = "fanout_queue_email") : * 使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。 * */ @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作 public void psubConsumerEmail(Message message) { byte[] body = message.getBody(); //转化为字符串 String str = new String(body); System.out.println("邮件业务接收到的信息为: "+str); } /** * Publish/Subscribe 工作模式接收、处理 "短信业务" */ @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作 public void psubConsumerSms(Message message) { byte[] body = message.getBody(); //转化为字符串 String str = new String(body); System.out.println("短信业务接收到的信息为: "+str); } }
在上面的代码中,创建了一个 接收处理 RabbitMQ消息的 业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解 来 监听队列 中 名称为 fanout_queue_email 和 fanout_queue_sms 的 消息,监听的这 两个 队列是 前面 “指定发送并存储” 消息 的 消息队列。
需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听到指定的队列中有 消息存在 ( 目前两个队列中都存在消息 ),对应注解的方法会**立即接收并消费队列中的 消息 ( 即注解对应的 方法会被调用 )。另外,在 接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型和 Message 类型。如果 使用与消息类型对应的参数**接收消息的话,只能够得到 具体的消息体信息 ; 如果 使用 Object 或者 Message 类型参数 接收消息 的话,还可以获得 除了 消息体外的消息参数信息 MessageProperties。
此时 成功启动项目后,控制台显示的消息消费效果如下图所示 :
从上图可以看出,项目启动成功后,消息消费者 “监听” 到 “消息队列” 中存在的 两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面的 Queues 面板查看队列消息情况,会发现两个队列中存储的消息 已经 被消费,如下图所示 :
至此,一条完整的消息发送、消息中间件存储消息消费的 Publish/Subscribe 工作模式的业务案例 已经实现。
ps :
使用的是开发中常用的 @RabbitListener注解 监听指定名称队列的 消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类 的 receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息。
1.3 基于 “注解” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用
- 基于注解的方式指的是使用 Spring框架的 @RabbitListener注解 来 ① 定制消息发送组件并 ② "消息消费者" 接收信息 , 当然还要结合之前的代码,才能完整实现这个功能需求 。
(1) 创建项目,“全局配置文件” 中配置信息
创建项目,"全局配置文件"中 配置信息 :
application.properties ( 全局配置文件 ):
#配置RabbitMQ消息中间件的"连接配置" spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/ , 默认可省略 spring.rabbitmq.virtual-host=/
pom.xml :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.myh</groupId> <artifactId>chapter_22</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chapter_22</name> <description>chapter_22</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <!-- <build>--> <!-- <plugins>--> <!-- <plugin>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-maven-plugin</artifactId>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </build>--> </project>
(2) 使用 “注解” 的方式定制 “消息发送组件” 和 “消息消费者” “接收消息”
使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下 :
RabbitMQService.java :
import com.myh.chapter_24.domain.User; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service //加入到IOC容器中 public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类 /** * 使用基于注解的方式实现消息服务 ( 使用"注解"的方式 ①定制消息组件 ②接收信息) * 1.1 Publish/Subscribe工作模式接收、处理"邮件业务" */ @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_email"),exchange = @Exchange(value = "fanout_exchange",type = "fanout"))) public void psubConsumerEmailAno(User user) { System.out.println("邮件业务接收到消息: "+user); } /** * * 1.2 Publish/Subscribe工作模式接收、处理"短信业务" */ @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_sms"),exchange = @Exchange(value = "fanout_exchange",type = "fanout"))) public void psubConsumerSmsAno(User user) { System.out.println("短信业务接收到消息: "+user); } }
在上面代码中,使用 @RabbitListener 注解及 其相关属性定制了两个消息组件的消费者,这两个消费者都接收实体类 User 并消费。在 @RabbitListener 注解中,bindings 属性用于创建并绑定交换器和消息队列组件,需要注意的是,为了能使两个消息组件的消费者接受到实体类User,需要我们在 定制交换器时将交换器类型 type 设置为 fanout。另外,bindings 属性的 @QueueBinding 注解除了有 value、type 属性外,还有key 属性用于定制路由键 : routingKey (当前发布订阅模式不需要)。
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
完成消息组件的 定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain包,在其中创建实体类。
User.java
public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容" private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
在 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送,示例代码如下 :
Chapter22ApplicationTests.java :
import com.myh.chapter_22.domain.User; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter22ApplicationTests { @Test void contextLoads() { } @Autowired //引入进行"消息中间件"管理的 RabbitTemplate组件对象 private RabbitTemplate rabbitTemplate; /** * "消息发送者" 发送信息 */ @Test public void psubPublisher() { User user = new User(); //消息发送者 要发送的"消息内容" user.setId(1); user.setUsername("石头"); /* 使用 RabbitTemplate中的 convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为 String exchange : 表示发送信息的"交换器" String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定" Object object : 表示要"发送的信息内容" */ //执行哪个"交换器" 和 指定给该"交换器"的"消息内容" rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容" } }
上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理 的 RabbitTemplate 组件对象,然后使用该模板工具类的
convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的第 1个参数表示发送消息 的 交换器,这个参数值要与之前定制的交换器名称一致 ;第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式,所以不需要指定 ;第3 个参数是发送的消息内容,接收 Object 类型。
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
执行上述 消息发送的测试方法 : psubPublisher( ),控制台执行效果如 下图所示 :
如果要 解决上述 消息中间件 发送 实体类消息出现的 异常,通常可以采用两种解决方案 :
① 第一种 是执行 JDK 自带 的 serializable 序列化接口 ;
② 第二种是 定制其他类型的 消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的 可视化效果较差,转换后的消息无法辨识,所以一般推荐使用第二种方式。配置代码如下所示 :
自定义 “消息转换器” 的配置代码如下所示 :
RabbitMQConfig.java :
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" @Bean //将该类的返回值对象加入到IOC容器中 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" } }
(4) 控制台查看 “消费者” 消费信息情况
此时 成功启动项目后,执行psubPublisher( )方法,控制台显示的消息消费效果如下图所示 :
至此,在 Spring Boot 中完成了 使用基于 API、基于配置类 和 基于注解 这 3 种方式来实现 Publish/Subscribe 工作模式 的 整合讲解 在这 3 种实现消息服务的方式中,
上面讲述过的 三种配置方式的 区别 :
① 基于 API 的方式相对简单、直观,但容易与业务代码产生 耦合;② 基于 配置类 的 方式相对隔离 、容易统一管理、符合Spring Boot 框架思想;
③ 基于 注解的方式 清晰明了、方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用 基于配置类的方式 和 基于注解的方式定制组件 实现消息服务较为常见。使用 基于 API的方式偶尔使用,具体还需要根据实际情况进行选择。
二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式
2.1 基于 “注解” 的方式 ( 实现 Routing "路由模式"工作模式 )
(1) 创建项目,“全局配置文件” 中配置信息
创建项目,"全局配置文件"中 配置信息 :
application.properties ( 全局配置文件 ):
#配置RabbitMQ消息中间件的"连接配置" spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/ , 默认可省略 spring.rabbitmq.virtual-host=/
pom.xml :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.myh</groupId> <artifactId>chapter_22</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chapter_22</name> <description>chapter_22</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <!-- <build>--> <!-- <plugins>--> <!-- <plugin>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-maven-plugin</artifactId>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </build>--> </project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下 :
RabbitMQService.java :
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service //加入到IOC容器中 public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类 //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息" /** * 2.1路由模式消息接收、处理error级别日志信息 * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息") */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key")) public void routingConsumerError(String message) { System.out.println("接收到error级别日志信息: "+message); } /** * 2.2路由模式消息接收、处理info、error、warning级别日志信息 * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息") */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = {"error_routing_key","info_routing_key","warning_routing_key"})) public void routingConsumerAll(String message) { System.out.println("接收到info、error、warning级别日志信息: "+message); } }
上述代码中,在消息业务处理类 : RabbitMQService 中编写了两个用来 处理 Routing 路由模式的消息消费者方法,在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了 路由模式下的 消息服务组件。
Routing路由模式下的交换器类型 : type, 属性为 direct,而且还必须 指定 key 属性 。
( 每个消息队列可以映射多个路由键,但在 Spring Boot 1.X版本中,@QueueBinding 中的 key 属性只接收 Spring 类型而不接收 Spring [ ] 类型 )。
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
完成消息组件的 定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain包,在其中创建实体类。
User.java
public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容" private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
在 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送,示例代码如下 :
Chapter24ApplicationTests.java : ( 项目测试类 )
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter24ApplicationTests { @Test void contextLoads() { } @Autowired //引入进行"消息中间件"管理的 RabbitTemplate组件对象 private RabbitTemplate rabbitTemplate; //Routing工作模式消息发送端 ( "消息发布者" "发送信息" ) @Test public void routingPublisher() { //发送信息 //给指定的"交换器" "发送信息" rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send 错误信息..."); } }
上述代码中,通过调用 RabbitTemplate的 converAndSend ( String exchange , String routingKey , Object object )方法来 "发送消息"。在 Routing 工作模式下发送消息时,必须指定路由键参数,该参数要与消息队列映射的路由键保持一致,否则发送的消息将会丢失。
本次示例中使用的是error_routing_key 路由键,根据定制规则,编写的两个消息消费者方法应该都可以正常接收并消费该发送端发送的消息。
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
执行上述 消息发送的测试方法 : psubPublisher( ),控制台执行效果如 下图所示 :
如果要 解决上述 消息中间件 发送 实体类消息出现的 异常,通常可以采用两种解决方案 :
① 第一种 是执行 JDK 自带 的 serializable 序列化接口 ;
② 第二种是 定制其他类型的 消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般 推荐使用第二种方式。配置代码如下所示 :
自定义 “消息转换器” 的配置代码如下所示 :
RabbitMQConfig.java :
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" @Bean //将该类的返回值对象加入到IOC容器中 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" } }
(4) 控制台 查看 “消费者” 消费信息情况
此时 成功启动项目后,执行routingPublisher( )方法,控制台显示的消息消费效果如下图所示 :
此时,修改 routingPublisher( )方法中的消息发送参数,调整发送info 级别的 日志信息( 注意同修改 info_routing_key 路由键 ),再次启动 routingPublisher( )方法,控制台效果如下图所示 :
如上图所示,控制台打印出使用 info_routing_key 路由键发送 info 级别的日志信息,说明只有配置映射 info_routing_key 路由键的 消息消费者 的方法 消费了消息。
三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式
3.1 基于 “注解” 的方式 ( 实现 Topics"通配符模式"工作模式 )
(1) 创建项目,“全局配置文件” 中配置信息
创建项目,"全局配置文件"中 配置信息 :
application.properties ( 全局配置文件 ):
#配置RabbitMQ消息中间件的"连接配置" spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虚拟主机路径/ , 默认可省略 spring.rabbitmq.virtual-host=/
pom.xml :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.myh</groupId> <artifactId>chapter_22</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chapter_22</name> <description>chapter_22</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <!-- <build>--> <!-- <plugins>--> <!-- <plugin>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-maven-plugin</artifactId>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </build>--> </project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下 :
RabbitMQService.java :
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service //加入到IOC容器中 public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类 //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息" /** * 3.1使用"通配符模式"消息接收、进行"邮件业务"订阅处理 * ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" ) */ @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"),exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.email.#")) public void topicConsumerEmail(String message) { System.out.println("接收到邮件订阅需求处理信息: "+message); } /** * 3.2使用"通配符模式"消息接收、进行"短信业务"订阅处理 * ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" ) */ @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"),exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.sms.#")) public void topicConsumerSms(String message) { System.out.println("接收到短信订阅需求处理信息: "+message); } }
上述代码中,在消息业务处理类 :RabbitMQService 中编写了两个处理 Topics 通配符模式的 消息消费者方法 ( 这两个方法即创建了"消息组件",又创建了"消息消费者来接收且消费信息 "),在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了通配符模式下的 消息组件。
从上述示例可以看出,Topics 通配符模式下注解使用方式与 Routing 路由模式的使用基本一样,主要是将交换器类型 type 修改为了 topic,还分别使用通配符的样式指定路由键 routingKey。
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
完成消息组件的 定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain包,在其中创建实体类。
User.java
public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容" private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
针对不同的用户订阅需求,使用 RabbitTemplate 模板工具类的 convertAndSend ( Stringexchange,String routingKey,Object object )方法发送不同的消息,发送消息时,必须 根据具体需求 和 已经定制的路由键通配符 设置 具体的路由键。
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
在 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送,示例代码如下 :
Chapter24ApplicationTests.java : ( 项目测试类 )
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter24ApplicationTests { @Test void contextLoads() { } @Autowired //引入进行"消息中间件"管理的 RabbitTemplate组件对象 private RabbitTemplate rabbitTemplate; //3.Topcis工作模式下的"消息发布者" "发送消息" @Test public void topicPublisher() { //(1)只发送"邮件"订阅用户信息 ---(给指定的"交换器" "发送信息") rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message..."); //(2)只发送"短信"订阅用户信息 rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send sms message..."); //(3)"邮件"订阅用户 和 "短信"订阅用户 都发送消息 rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms message..."); } }
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
执行上述 消息发送的测试方法 : psubPublisher( ),控制台执行效果如 下图所示 :
如果要 解决上述 消息中间件 发送 实体类消息出现的 异常,通常可以采用两种解决方案 :
① 第一种 是执行 JDK 自带 的 serializable 序列化接口 ;
② 第二种是 定制其他类型的 消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般 推荐使用第二种方式。配置代码如下所示 :
自定义 “消息转换器” 的配置代码如下所示 :
RabbitMQConfig.java :
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration //标记该类为"配置类" public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类" @Bean //将该类的返回值对象加入到IOC容器中 public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" } }
(4) 控制台 查看 “消费者” 消费信息情况
先执行topicPublisher( ) 方法 中的 步骤(1) 中 邮件订阅用户的消息发送,控制台效果如下图所示 :
将 topicPublisher( )方法 中 步骤(1) 进行 注释,打开步骤(2) 中只进行 短信订阅用户的 消息发送方法,并 再次启动该测试方法,控制台效果如下图所示 :
为了查看 topicPublisher( )方法 中 步骤(3)的效果,我们需要把 步骤(2) 的代码注释,步骤(3) 的代码主要进行 邮件 和 短信订阅用户 的消息发送方法,项目重新启动 后的效果如下图所示 :