订阅模式Publish/Subscribe
- 基于API的方式
- 1.使用AmqpAdmin定制消息发送组件
- 2.消息发送者发送消息
- 3.消息消费者接收消息
- 基于配置类的方式
- 基于注解的方式
- 总结
SpringBoot整合RabbitMQ中间件实现消息服务,主要围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中,定制中间件是比较麻烦的工作,且必须预先定制。
下面以用户注册成功后,同时发送邮件通知和短信通知这一场景为例, 分别使用基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合。
基于API的方式
基于API的方式,是指使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息的发送。这种定制消息发送组件的方式,与在RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。下面我们就对这种基于API的方式进行讲解和演示。
1.使用AmqpAdmin定制消息发送组件
我们先打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。
package com.ytx;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class Chapter08ApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Test
void contextLoads() {
}
/** 使用AmqpAdmin管理员API定制消息组件 */
@Test
public void amqpAdmin() {
// 1.定义fanout类型的交换器
amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
// 2.定义两个默认持久化队列,分别处理email和sms
amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
// 3.将队列分别与交换器进行绑定
amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
}
}
执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。单元测试方法执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。
从上图可以看出,在RabbitMQ可视化管理页面的Exchanges面板中,新出现了一个名称为fanout_exchange的交换器(其他7个交换器是RabbitMQ自带的),且其类型是我们设置的fanout类型。我们可以单击fanout_exchange交换器进入查看。
从上图可以看出,在fanout_exchange交换器详情页面中展示有该交换器的具体信息,还有与之绑定的两个消息队列fanout_queue_email和fanout_queue_sms,并且与程序中设置的绑定规则一致。切换到Queues面板页面,查看定制生成的消息队列信息。
从上图可以看出,在Queues队列面板页面中,展示有定制的消息队列信息,这与程序中定制的消息队列一致,我们可以单击消息队列名称查看每个队列的详情。
通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin,定制消息组件和在管理页面上手动定制消息组件的本质是一样的。
2.消息发送者发送消息
完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,我们可以借助一个实体类传递消息,需要预先创建一个实体类对象。
首先,在chapter08项目中创建名为com.cy.domain的包,并在该包下创建一个实体类User。
package com.ytx.domain;
/** 发布消息的实体类可以通过实现Serializable序列化接口进行发布 */
public class User {
private Integer id;
private String username;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
'}';
}
}
其次,我们在项目测试类Chapter08ApplicationTests中,使用Spring框架提供的RabbitTemplate模板类实现消息发送。
@Autowired
private RabbitTemplate rabbitTemplate;
/** 1.Publish/Subscribe工作模式消息发送端 */
@Test
public void subPublisher() {
User user = new User();
user.setId(1);
user.setUsername("小明");
rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}
上述代码中,我们先使用@Autowired注解,引入消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。此方法中的第1个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第2个参数表示路由键,因为实现的是Publish/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。
然后,执行上述消息发送的测试方法subPublisher(),控制台执行效果见下图所示。
从上图可以看出,发送实体类对象消息时程序发生异常,从异常信息“SimpleMessageConverter only supports String, byte[] and Serializable payloads”可以看出,消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体类对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。
如果要解决上述消息中间件发送实体类消息出现的异常,我们通常可以采用两种解决方案:第一种是执行JDK自带的Serializable序列化接口;第二种是定制其他类型的消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般使用第二种方式。
接着我们在chapter08项目中创建名为com.ytx.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。
package com.ytx.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
/** 定制JSON格式的消息转换器 */
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
代码中创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。
再次执行subPublisher()方法,该方法执行成功后,查看RabbitMQ可视化管理页面Queues面板信息。
从上图可以看出,消息发送完成后,Publish/Subscribe工作模式下绑定的两个消息队列中各自拥有一条待接收的消息, 由于目前尚未提供消息。
3.消息消费者接收消息
在chapter08项目中创建名为com.ytx.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。
package com.ytx.chapter08.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
/** Publish/Subscribe工作模式接收,处理邮件业务 */
@RabbitListener(queues = "fanout_queue_email")
public void subConsumerEmail(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("邮件业务接收到消息:" + msg);
}
/** Publish/Subscribe工作模式接收,处理短信业务 */
@RabbitListener(queues = "fanout_queue_sms")
public void subConsumerSms(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("短信业务接收到消息:" + msg);
}
}
上述代码中,创建了一个接收处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解,我们可以监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听的这两个队列是前面指定发送并存储消息的消息队列。
需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列中有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法就会立即接收并消费队列中的消息。另外,在接收消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用与消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的消息参数信息MessageProperties。
启动chapter08项目,控制台显示的消息消费效果如下图所示。
从上图可以看出,项目启动成功后,消息消费者监听到消息队列中存在的两条消息,并进行了各自的消费。与此同时,通过RabbitMQ可视化管理页面的Queues面板查看队列消息情况,会发现两个队列中存储的消息已经被消费。至此,一条完整的消息发送、 消息中间件存储、消息消费的Publish/Subscribe(发布订阅模式)工作模式的业务案例已经实现。
注意,如果没有引入Spring Web模块的依赖,启动chapter08项目时,消息消费者接收消息会报以下错误。
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.amqp.support.converter.MessageConverter]: Factory method 'messageConverter' threw exception; nested exception is java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.3.25.jar:5.3.25]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653) ~[spring-beans-5.3.25.jar:5.3.25]
... 18 common frames omitted
小提示:
上述代码中,使用的是开发中常用的@RabbitListener注解,来监听指定名称队列的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,我们还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。
基于配置类的方式
基于配置类的方式,主要讲的是使用SpringBoot框架提供的@Configuration注解,配置定制消息发送组件,并进行消息发送。下面我们来对这种基于配置类的方式进行讲解和演示。
打开RabbitMQ消息配置类RabbitMQConfig,在该配置类中使用基于配置类的方式定制消息发送相关组件。
package com.ytx.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** RabbitMQ消息配置类 */
@Configuration
public class RabbitMQConfig {
/** 定制JSON格式的消息转换器 */
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/** 使用基于配置类的方式定制消息中间件 */
// 1.定义fanout类型的交换器
@Bean
public Exchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
}
// 2.定义两个不同名称的消息队列
@Bean
public Queue fanoutQueueEmail() {
return new Queue("fanout_queue_email");
}
@Bean
public Queue fanoutQueueSms() {
return new Queue("fanout_queue_sms");
}
// 3.将两个不同名称的消息队列与交换器进行绑定
@Bean
public Binding bindingEmail() {
return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs();
}
@Bean
public Binding bindingSms() {
return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs();
}
}
上述代码中,使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与交换器的绑定。这种基于配置类方式定制的消息组件,其实现和基于API方式定制的消息组件完全一样,只不过是实现方式不同而已。
按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现了消息发送者和消息消费者,并且基于配置类方式定制的消息组件名称,和之前测试用的消息发送和消息消费组件名称都是一致的,所以这里我们可以直接重复使用。
重新运行消息发送者测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。
基于注解的方式
基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。
在消息接收和处理的业务类RabbitMQService中,将针对邮件业务和短信业务处理的消息消费者方法进行注释,使用@RabbitListener注解及其相关属性定制消息发送组件。
package com.ytx.chapter08.service;
import com.ytx.chapter08.domain.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/** RabbitMQ消息接收处理的业务类 */
@Service
public class RabbitMQService {
/** Publish/Subscribe工作模式接收,处理邮件业务 */
/*
@RabbitListener(queues = "fanout_queue_email")
public void subConsumerEmail(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("邮件业务接收到消息:" + msg);
}
*/
/** Publish/Subscribe工作模式接收,处理短信业务 */
/*
@RabbitListener(queues = "fanout_queue_sms")
public void subConsumerSms(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("短信业务接收到消息:" + msg);
}
*/
/** 使用基于注解的方式实现消息服务 */
// 1.1 Publish/Subscribe工作模式接收,处理邮件业务
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout_queue_email"),
exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
public void subConsumerEmailAno(User user) {
System.out.println("邮件业务接收到消息:" + user);
}
// 1.2 Publish/Subscribe工作模式接收,处理短信业务
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout_queue_sms"),
exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
public void subConsumerSmsAno(User user) {
System.out.println("短信业务接收到消息:" + user);
}
}
上述代码中,使用@RabbitListener注解及其相关属性定制了两个消息组件的消费者,这两个消费者都接收实体类User并消费。在@RabbitListener注解中,bindings属性用于创建并绑定交换器和消息队列组件,需要注意的是,为了能使两个消息组件的消费者接收到实体类User,需要我们在定制交换器时将交换器类型type设置为fanout。另外,bindings属性的@QueueBinding注解除了有value、exchange属性外,还有key属性用于定制路由键routingKey(当前发布订阅模式不需要)。
重启测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。
至此,我们就在SpringBoot中完成了基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。
在实际开发中,使用基于配置类的方式和基于注解的方式较为常见,基于API的方式则偶尔使用,当然大家要根据实际情况进行具体选择。
总结
今天介绍了基于API方式、配置类方式和注解的3种消息队列,并展示了实现发布订阅Publish/Subscribe模式的整合及代码实现,基于注解方式的实现需要重点掌握。有关RabbitMQ的其他内容,袁老后续更新