消息中间件:RabbitMQ
- 前言
- 安装
- Window安装
- Linux安装
- 管理页面
- 什么是RabbitMQ?
- 入门
- 基本概念
- 简单队列
- 工作队列(Work Queues)
- 发布/订阅(Publish/Subscribe)
- 临时队列
- 路由(Routing)
- 主题(Topics)
- 标题(Headers)
- 远程过程调用 (RPC)
- 死信队列
- 延迟队列
- 防止重复消费
- 持久化
- 交换机自动删除
- 可靠传输
- 唯一标识
- 面试题:如何保证MQ顺序消费?
前言
RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,提供了可靠的消息传递机制,用于应用解耦、异步提速、流量削锋、数据分发、错峰流控、日志收集等等。RabbitMQ 的主要特点包括:
-
消息队列:RabbitMQ 采用消息队列模型,允许生产者将消息发送到队列中,消费者可以从队列中获取消息进行处理。这种模型使得应用程序可以进行解耦,提高了系统的灵活性和可扩展性。
-
可靠性:RabbitMQ 提供了多种机制来确保消息的可靠传递,包括持久化、确认机制、事务等,保证消息不会丢失或重复传递。
-
灵活的路由:RabbitMQ 支持多种消息路由方式,例如直连、主题、广播等,可以根据业务需求灵活地进行消息的路由和过滤。
-
可扩展性:RabbitMQ 支持集群部署,可以通过增加节点来实现横向扩展,提高消息处理能力和系统的可用性。
-
管理界面:RabbitMQ 提供了一个用户友好的管理界面,可以通过界面监控队列、交换器、连接等信息,并进行配置管理。
-
广泛的支持:RabbitMQ 支持多种编程语言,包括 Java、Python、Ruby、C# 等,开发者可以使用不同语言的客户端库与 RabbitMQ 进行交互。
解耦应用:通过引入消息队列,不同的应用程序可以实现解耦,发送方和接收方之间不需要直接通信,降低了系统的耦合度。
异步通信:消息队列支持异步通信,发送方发送消息后即可继续处理其他任务,接收方可以在合适的时间处理消息,提高系统的响应速度和并发能力。
流量削锋:是一种网络管理技术,用于平稳控制网络流量的传输速率,以避免网络拥堵或过载。
安装
进入官网看下RabbitMQ对应的Erlang版本,免得到时候版本不对安装错误。
Window安装
- 先安装Erlang
进入官网。右边点击下载Download Windows installer,下拉可以选择不同位数的系统(下载速度那叫一个慢)。
下载后安装,有需要更换下安装目录,直至下一步即可。
然后配置环境变量,找到你的安装目录,比如:D:\Program Files\Erlang OTP
将目录配置的Path中
打开cmd,输入erl
,弹出版本号安装成功。
- 再安装RabbitMQ
然后进入官网https://www.rabbitmq.com/docs/download#downloads-on-github,点击Windows Installer。
下载完后点击安装,可能等的时间比较久,需要检查是否安装Erlang。
如有需要更换下安装目录,直至下一步即可。
打开RabbitMQ的命令窗口
输入rabbitmq-plugins enable rabbitmq_management
安装插件。
重启服务后,输入http://127.0.0.1:15672/
,安装完成。
默认账号:guest/guest
Linux安装
- 先安装Erlang
再github上找到对应版本(点击进入),复制链接地址(如果是centOS7就选el7,否则安装时容易出问题)
使用命令下载(你也可以本地下载后上传到服务器):
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.2/erlang-26.2.2-1.el8.x86_64.rpm
如图所示
执行命令
rpm -ivh erlang-26.2.2-1.el8.x86_64.rpm
(1)-i
:表示安装软件包。
(2)-v
:在安装过程中显示详细的信息,即 verbose 模式。
(3)-h
:以哈希标记的方式显示安装进度。
输入命令查看版本
erl -v
安装完毕。
- 安装RabbitMQ
然后进入官网。找到RHEL, CentOS Stream 9.x, CentOS 8.x,复制链接,如图所示:
执行命令下载(你也可以本地下载后上传到服务器):
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
如图所示
- 执行命令安装
rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
Rabbitmq默认安装路径再/usr/lib/rabbitmq
目录。
- 开启界面管理
rabbitmq-plugins enable rabbitmq_management
如果你是云服务器,还需要开启防火墙
然后执行启动命令,有两种方法:
(1)传统的 SysVinit 命令格式
#启动
service rabbitmq-server start
#停止
service rabbitmq-server stop
#重启
service rabbitmq-server restart
(2)systemd 命令格式
#启动
systemctl start rabbitmq-server
#停止
systemctl start rabbitmq-server
#重启
systemctl restart rabbitmq-server
#查看状态
systemctl status rabbitmq-server
输入http://ip地址:15672/
,安装完成。
- 添加用户
默认账号:guest/guest,由于RabbitMQ的默认账号,仅允许本机访问,我们需要远程添加一个用户
rabbitmqctl add_user 用户名 密码
- 设置角色
执行以下命令
rabbitmqctl set_user_tags 用户 角色
用户角色说明:
(1)administrator
:可以登录控制台、查看所有信息、并对rabbitmq进行管理
(2)monToring
:监控者;登录控制台,查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
(3)policymaker
:策略制定者;可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息
(4)managment
:普通管理员;无法看到节点信息,也无法对策略进行管理。
(5)none
:无法登陆管理控制台,通常就是普通的生产者和消费者。
- 设置权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
三个".*"
分别代表配置、写入和读取的权限。
然后登录
再来讲讲其他几个常用命令
# 修改用户密码
rabbitmqctl change_password 用户名 密码
# 查看当前所有用户
rabbitmqctl list_users
# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
rabbitmqctl delete_user 用户名
管理页面
下面来了解下RabbitMQ再管理页面上如何使用的。
- 连接:这里,可以查看、管理和关闭当前所有的TCP连接。
- 通道:展示了所有当前打开的通道以及它们的详细信息。
- 交换器:查看、创建和删除的交换机。
添加交换机(Add a new exchange):
(1)Name(名称):交换机的唯一标识符,用于在RabbitMQ中识别交换机。
(2)Type(类型):指定交换机的类型,决定了交换机的路由策略。常见的交换机类型有Direct、Fanout、Topic和Headers。不同类型的交换机对应不同的路由规则。
(3)Durability(持久化):指定交换机是否持久化到磁盘。如果将该参数设置为Durable,交换机将在RabbitMQ服务器重启后仍然存在,Transient来描述不持久化的消息或队列。
(4)Auto delete(自动删除):交换机在不被使用时是否自动删除。如果将该参数设置为yes,当没有与之绑定的队列或连接时,交换机将被自动删除。
(5)Internal(内部交换机):指定交换机是否为内部交换机。内部交换机只能被直接连接到的交换机使用,而无法通过路由键绑定到队列。该参数为可选参数,用于特定的高级使用场景。
(6)Arguments(参数):创建交换机时指定一些额外的自定义参数。这些参数可以根据特定的需求来定义交换机的行为和特性。Arguments参数是一个键值对的字典,其中键和值的类型可以是字符串、数字、布尔值等。
- 队列:展示了所有当前的队列以及它们的详细信息,还可以添加队列。
添加队列:
(1)Virtual host(虚拟主机):
(2)Type(类型):传统队列(classic queue)、Quorum 队列(quorum queue)以及流队列(stream queue)。
(3)Name(名称):队列的唯一标识符。
(4)Durability(持久化):指定交换机是否持久化到磁盘。
(5)Arguments(参数):键值对形式,自定义参数,输入框下面有常用参数。
比如现在创建一个队列
然后点击列表中的Name,进入详情
- 用户:查看系统中所有的操作用户。
添加用户:
(1)username:用户名。
(2)password:可以在下拉处选择no password。
(3)tags:设置权限。
点击Name,可以进入详情。可以对用户进行,修改密码、修改权限、删除用户等操作。
什么是RabbitMQ?
RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把想要寄出的邮件放进邮箱时,你可以确定邮递员最终会把邮件送到你的收件人手中。在这个类比中,RabbitMQ是一个邮箱、一个邮局和一个信件载体。
RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据
RabbitMQ和一般的消息传递使用了一些术语。
- 生产只不过意味着发送。发送消息的程序是生产者:
- 队列是RabbitMQ中邮箱的名称。虽然消息流经RabbitMQ和你的应用程序,但它们只能存储在队列中。队列只受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。
许多生产者可以向一个队列发送消息,而许多消费者可以尝试从一个队列接收数据。下面是我们表示队列的方式:
- 消费和接受有着相似的含义。consumer是一个主要等待接收消息的程序:
注意,生产者、消费者和代理不必驻留在同一主机上;事实上,在大多数应用程序中,它们不需要。应用程序既可以是生产者,也可以是消费者。
入门
我们以Maven项目为例,先引入官方依赖,示例代码如下:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
SpringBoot提供依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.3.2</version>
</dependency>
我们将用Java编写“hello world”
;发送单个消息的生产者和接收消息并将其打印出来的消费者。
- 生产方发送消息
如果我们想连接到不同机器上的一个节点,我们只需在setHost()
方法中指定它的主机名或IP地址。使用try-with-resources
语句,不需要在代码中显式地关闭它们(注:15672
是页面访问端口号,5672
为消息队列连接端口号,请确保防火墙开启此端口号)。
public class MQProduct {
//创建队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
//创建连接且创建新通道
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()){
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
用于管理与 RabbitMQ 代理的连接的中心组件是 ConnectionFactory 接口。 有三种连接工厂可供选择:
- PooledChannelConnectionFactory:该工厂基于Apache Pool2管理一个连接和两个通道池。一个池用于事务性通道,另一个用于非事务性通道。池是具有默认配置的GenericObjectPools;提供了一个回调来配置池
public static void main(String[] args) {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
}
- ThreadChannelConnectionFactory:这个工厂管理一个连接和两个ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。该工厂确保同一线程上的所有操作使用相同的通道(只要通道保持打开状态)。为了避免内存泄漏,如果应用程序使用许多短寿命线程,则必须调用工厂的
closeThreadChannel()
来释放通道资源。从2.3.7版本开始,线程可以将其通道传输给另一个线程。
public static void main(String[] args) {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
ThreadChannelConnectionFactory tcf = new ThreadChannelConnectionFactory(rabbitConnectionFactory);
tcf.closeThreadChannel();
}
- CachingConnectionFactory:默认情况下,它建立一个可以由应用程序共享的单一连接代理。CachingConnectionFactory实现支持对这些通道进行缓存,并根据通道是否为事务性通道维护单独的缓存。要配置通道缓存的大小(默认为25),可以调用
setChannelCacheSize()
方法。
public static void main(String[] args) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setChannelCacheSize(128);
Connection connection = connectionFactory.createConnection();
}
然后创建队列,调用queueDeclare()
方法:
//声明一个队列是幂等的——只有当它不存在时才会被创建。
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
它一共有五个参数:
(1)queue
:这是要声明的队列的名称。
(2)durable
:表示是否将队列持久化。如果设置为true,RabbitMQ将会将队列存储到磁盘上,以便在RabbitMQ服务器重启后能够恢复。
(3)exclusive
:表示是否是一个排他队列。如果设置为true,该队列只能被当前连接使用,连接断开时会自动删除该队列。
(4)autoDelete
:表示是否在不再使用时自动删除队列。如果设置为true,当最后一个消费者断开连接之后,队列会自动删除。
(5)arguments
:额外的参数。
然后,我们可以向队列发布一条消息,调用basicPublish()
方法:
String message = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
它一共有四个参数:
(1)exchange
:表示消息发送到的交换机的名称。根据消息的routingKey
和交换机的类型,交换机将消息路由到一个或多个队列(设置为""
表示使用默认交换机)。
(2)routingKey
:是消息路由的关键词。交换机根据这个关键词将消息路由到相应的队列。消息的路由规则是由交换机的类型决定的。
(3)props
:是一个AMQP.BasicProperties
对象,用于设置消息的属性。消息的属性包括持久性、优先级、过期时间、消息标识符等。
(4)body
:是消息的实际内容,以字节数组的形式传输。这个字节数组通常包含了需要传输的业务数据,例如JSON数据、文本、二进制数据等。
- 消费方接收消息
消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以侦听消息并将它们打印出来。
发布方相同;我们打开一个连接和一个通道,并声明我们要从中消费的队列。
为什么不使用
try-with-resource
语句来自动关闭通道和连接?我们希望该过程保持活动状态,而消费者正在异步侦听消息到达。
public class MQConsumer {
//创建队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
//创建连接且创建新通道
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");//如果我们想连接到不同机器上的一个节点,我们只需在这里指定它的主机名或IP地址。
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//再次创建队列确保队列存在
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 取消消费的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
如果先启动生产方,先创建一个队列包含未消费的消息,如图所示
然后启动消费方就会被消费,执行结果如下:
若最先启动消费方,但是此时队列并未创建,就会报队列找不到的错误信息
声明队列是幂等的 - 仅当队列不存在时才会创建队列。
由于我们可能在发布者之前启动使用者,因此我们希望在尝试使用来自其中的消息之前确保队列存在。
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//再次创建队列确保队列存在
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
执行结果如图所示
再RabbitMQ页面可以看到连接的信息、通道、交换机、队列等信息,如图所示
基本概念
经过简单的示例介绍,我们知道了RabbitMQ的基本用法,AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个面向消息的中间件协议,其重要的组成部分包括:
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
- Broker:接收客户端的连接,用于接收和分发消息,实现 AMQP 实体服务。
- Connection:连接,生产者/消费者与Broker之间的TCP网络连接。
- Channel:信道,在 Connection内部建立的逻辑连接,客户端可以建立多个信道,每个信道代表一个会话任务,Channel之间是完全隔离的。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个不相同的Exchange和Queue。
- Message:消息,服务与应用程序之间传送的数据,消息可以很简单,也可以很复杂,由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Exchange:交换嚣,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。
- Queue:消息队列,用来保存消息,转发消费者消费。
- Binding:绑定,Exchange和Queue之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- Routingkey:路由规则,生产者将消息发送给Exchange,通过路由规则发送给指定队列。
流程如图所示:
简单队列
往后的内容我们以Spring Boot为例进行讲解和使用,Spring Boot 提供了许多功能,但我们在这里只重点介绍一些。首先,Spring Boot 应用程序可以选择通过 application.properties
或 application.yml
文件提供其属性(还有更多选项,但这将让我们继续前进)。
# 主机名
spring.rabbitmq.host=192.168.2.101
# 端口
spring.rabbitmq.port=5672
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 用户名
spring.rabbitmq.username=admin
# 密码
spring.rabbitmq.password=admin
在下图中,“P”
是我们的生产者,“C”
是我们的受害者。中间的框是一个队列 - RabbitMQ 代表消费者保留的消息缓冲区。
我们将创建一个 Java 配置文件,描述Spring bean:
@Configuration
public class MQConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
@Bean
public MQReceiver receiver(){
return new MQReceiver();
}
@Bean
public MQSender sender(){
return new MQSender();
}
}
现在,需要进入发送方和接收方类的代码非常少。
- 发送方
您会注意到 Spring AMQP 删除了样板代码,只留下需要关注的消息传递逻辑。在 MQConfig 类中的 bean 定义中配置的队列中自动连线,并且像许多 Spring 连接抽象一样,通过RabbitTemplate
类可以自动连接到发送器中。剩下的就是创建一个消息并调用模板 convertAndSend()
的方法,从我们定义的 bean 和我们刚刚创建的消息中传入队列名称。
public class MQSender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
convertAndSend()
方法有很多种传参方式:
(1)void convertAndSend(Object message);
:只接受一个参数,即要发送的消息对象。消息会被默认的消息转换器(MessageConverter)转换成消息体后发送到默认的交换机和路由键。
(2)void convertAndSend(String exchange, String routingKey, Object message);
:指定消息要发送到的交换机和路由键,以及要发送的消息对象。
(3)void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
:这种方式在前两种的基础上增加了messagePostProcessor(修改消息属性的接口)和correlationData(关联数据,用于消息的确认处理),
- 接收方
接收器同样简单。我们用 @RabbitListener
队列的名称来注释我们的接收器类,并传入队列的名称。然后,我们通过传入已推送到队列的有效负载来 @RabbitHandler
注释我们 receive
的方法。
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
调用发送方send()
方法,运行结果如图
@RabbitListener
和 @RabbitHandler
是 Spring AMQP(高级消息队列协议)中用于处理RabbitMQ消息的两个重要注解:
@RabbitListener
注解:它可以被用在类级别或者方法级别。被用在类级别时,这个类中的所有带有@RabbitHandler
注解的方法都会被视为消息处理方法。如果被用在方法级别,那么这个方法就直接作为消息处理方法,而不需要再额外使用@RabbitHandler
。@RabbitHandler
注解:用于指定一个方法作为消息处理器,但它通常与@RabbitListener
注解一起使用在类级别。
工作队列(Work Queues)
在本文中,我们将创建一个工作队列,用于在多个工作线程之间分配耗时的任务。也就是一个生产者、一个队列、多个消费者,让多个消费者绑定到一个队列,共同消费队列中的消息。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并不得不等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息,并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时,任务将在它们之间共享。
再Spring Bean中定义第二个消费方:
@Configuration
public class MQConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
@Bean
public MQReceiver receiver(){
return new MQReceiver();
}
@Bean
public MQReceiver2 receiver2(){
return new MQReceiver2();
}
@Bean
public MQSender sender(){
return new MQSender();
}
}
- 发送方
比如,循环10次,向队列中不停发送消息,模拟消息堆积。
public class MQSender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
public void send() {
String message = "Hello World!";
//循环发送10条消息
for (int i = 1; i <= 10; i++) {
this.template.convertAndSend(queue.getName(), message+","+i);
}
}
}
- 消费方
两个消费方监听同一个队列,进行消费:
//消费方1
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
//消费方2
@RabbitListener(queues = "hello")
public class MQReceiver2 {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received2 '" + in + "'");
}
}
调用发送方send()
方法,执行两次后,执行结果如图所示:
由图可知,一个消息只会被一个消费者消费,消费方式也不一定按照队列顺序进行消费。工作队列(Work Queues)通常是通过轮询(round-robin)的方式进行消费的。
默认情况下 AbstractMessageListenerContainer 的值 DEFAULT_PREFETCH_COUNT
默认为 250,则会告诉 RabbitMQ 一次不要向工作线程提供超过 250 条消息。相反,它会将其分派给下一个尚未忙碌的工作人员。所以发送10条消息是有可能只被一个队列消费的。
- 公平派单与循环派单
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。在这种模式下,调度不一定完全按照我们的要求工作。例如,在有两个工作人员的情况下,当所有奇数消息都很重,偶数消息都很轻时,一个工作人员将一直忙碌,而另一个工作人员几乎不做任何工作。
“公平调度”是 Spring AMQP 的默认配置。将 AbstractMessageListenerContainer 的值
DEFAULT_PREFETCH_COUNT
定义为 250。
循环派单还有一个弊端,因为每个消费者都会收到相同数量的消息,所以当任意一个消费者执行时间过长,导致后面的消息无法及时被消费,其它执行速度快的消费者已经空闲下来,示例代码如下
我们将任意消费方设置延迟时间,模拟消费时间过长的问题:
@RabbitListener(queues = "hello")
public class MQReceiver2 {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received2 '" + in + "'");
Thread.sleep(1500);
}
}
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
执行结果如图:
那么,如何将执行过慢的消费者的剩余消息,交给其它执行速度快的消费者处理?
答:我们可以通过
prefetchCount
(预取值)属性,预取值是消息缓存区允许存在未确认消息的最大数量。如果消费者收到消息后未及时应答,那么就会认为该消息区已满,就不会接收其它消息,反之应答后,消息区空闲才会接收新的消息。
设置预取值的方式有两种:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(2); // 设置预取数量
return factory;
}
或者使用yaml(properties)配置文件:
spring.rabbitmq.listener.simple.prefetch=2
将预取值设置为2,执行结果如下:
我们可以看到消费者1消费了8条数据,消费者2只消费了2条数据。
如果将DEFAULT_PREFETCH_COUNT
设置为 1,则行为将是最开始所述的循环传递。
在大多数情况下,
prefetchCount
等于 1 过于保守,并严重限制了消费者的吞吐量。如果所有工作人员都很忙,您的队列可能会填满。你会想要密切关注这一点,也许可以增加更多的工人,或者有一些其他的策略。
发布/订阅(Publish/Subscribe)
在本部分中,我们将实现交换机模式,将消息传递给多个消费者,此模式也称为“发布/订阅”。现在是时候在 RabbitMQ 中引入完整的消息传递模型了。
RabbitMQ 中消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。实际上,很多时候,生产者甚至根本不知道消息是否会被传递到任何队列。
相反,生产者只能向Exchange发送消息。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将它们推送到队列。交易所必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列中?是否应该将其附加到多个队列中?或者它应该被丢弃。其规则由交换类型定义。
从本质上讲,发布的消息将被广播到所有接收者。
Exchange也有一个由ExchangeTypes中定义的常量表示的“类型”。基本类型有:fanout(扇出)
、direct(直连)
、 topic(主题)
和 headers(标题)
。
我们先定义一个Fanout交换机,然后定义两个队列,示例代码如下:
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
/**
* 队列2
* @return
*/
@Bean
public Queue queue2(){
return new Queue("hello2");
}
/**
* 交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange");
}
}
交换机和队列之间的这种关系称为 绑定,通过BindingBuilder.bind()
方法将队列和交换机进行绑定,示例代码如下:
/**
* 绑定队列到指定交换机
* @return
*/
@Bean
public Binding bindingFanout(){
return BindingBuilder.bind(queue()).to(fanoutExchange());
}
/**
* 绑定队列到指定交换机
* @return
*/
@Bean
public Binding bindingFanout2(){
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
我们在页面上看一下交换机绑定队列是否绑定成功,如图所示:
Queue 类表示消息使用者从中接收消息的组件。
消费方,监听不同两个队列代码如下:
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String message) {
System.out.println(" [x] Received '" + message + "'");
}
}
@Component
@RabbitListener(queues = "hello2")
public class MQReceiver2 {
@RabbitHandler
public void receive(String message) {
System.out.println(" [x] Received2 '" + message + "'");
}
}
发送方向交换机发送消息,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
// 指定交换机,发生消息
this.template.convertAndSend("fanout.exchange", "", msg);
}
}
调用发送方send()
方法,执行结果如图:
可以看到监听两个不同队列绑定同一个交换机的消费方都收到了消息。若其中某一个队列有多个监听,则消息会按顺序进行消费。
临时队列
每当我们连接到 Rabbit 时,我们都需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者 - 甚至更好的 - 让服务器为我们选择一个随机的队列名称。为了使用 Spring AMQP 客户端执行此操作,我们定义了一个 AnonymousQueue,它创建一个非持久的、排他性的、具有生成名称的自动删除队列:
@Configuration
public class MQConfig {
@Bean
public Queue hello(){
return new AnonymousQueue();
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(hello()).to(fanoutExchange());
}
}
临时队列会随机生成一串字符串当作队列名称,如图所示:
或者你也可以通过设置autoDelete
为true,指定队列自动删除,源码如图所示:
示例代码如下:
@Bean
public Queue queue() {
return new Queue("normal_queue", true, false, true);
}
一旦我们断开了消费者的连接,队列会自动删除。
路由(Routing)
在前面的教程中,我们构建了一个简单的 fanout
交换。我们能够向许多接收者广播消息。它没有给我们太多的灵活性 - 它只能进行无意识的广播。
Direct Exchange(直连交换机)是RabbitMQ中的一种消息交换机类型,其主要特点是基于路由键(routingKey)的精确匹配来进行消息的路由(使用相同的routingKey绑定多个队列是完全可以的,在这种情况下,直接交换的行为将类似于 fanout
交换,并将消息广播到所有匹配的队列)。
我们将消息发送到DirectExchange ,示例代码如下:
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
/**
* 队列
* @return
*/
@Bean
public Queue queue2(){
return new Queue("hello2");
}
/**
* 交换机
* @return
*/
@Bean
public DirectExchange direct(){
return new DirectExchange("direct.exchange");
}
/**
* 绑定队列到指定交换机,通过routingKey=direct.routing进行匹配
* @return
*/
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(queue()).to(direct()).with("direct.routing");
}
/**
* 绑定队列到指定交换机,通过routingKey=direct.routing进行匹配
* @return
*/
@Bean
public Binding bindingDirect2(){
return BindingBuilder.bind(queue2()).to(direct()).with("direct.routing2");
}
}
我们可以再页面上确认交换机是否绑定队列,如图所示:
创建两个消费方,监听不同队列,示例代码如下:
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
@Component
@RabbitListener(queues = "hello2")
public class MQReceiver2 {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received2 '" + in + "'");
}
}
SpringAMQP注解提供了简写(这样你就不需要声明创建交换机和队列),示例代码如下:
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "hello"),
exchange = @Exchange(value = "direct.exchange", type = ExchangeTypes.DIRECT),
key = "direct.routing"
))
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
然后创建生产方,指定交换机和RoutingKey:direct.routing
,发送消息,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String message = "Hello World!";
this.template.convertAndSend("direct.exchange", "direct.routing", message);
}
}
调用发送方send()
方法,监听改队列的消费方收到消息,执行结果如下:
主题(Topics)
Topic(主题)模式允许发送者(生产者)将消息发送到特定的主题上,这种模式通过使用通配符来匹配路由键(Routing Key),从而实现了比Direct模式更灵活的路由机制。
Topic模式支持两种通配符:
*
(星号)和#
(井号)。其中,*
可以匹配一个单词,而#
可以匹配零个或多个单词。这种机制使得Topic模式能够基于复杂的路由键进行灵活的匹配。
配置示例代码如下(你也可以直接使用@RabbitListener
注解):
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
@Bean
public Queue queue2(){
return new Queue("hello2");
}
/**
* 交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic.exchange");
}
/**
* 绑定队列到指定交换机
* @return
*/
@Bean
public Binding bindingTopic(){
return BindingBuilder.bind(queue()).to(topicExchange()).with("*.*.rabbit");
}
/**
* 绑定队列到指定交换机
* @return
*/
@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");
}
}
交换机绑定队列信息,如图所示
创建两个消费方,监听不同队列,示例代码如下:
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
@Component
@RabbitListener(queues = "hello2")
public class MQReceiver2 {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received2 '" + in + "'");
}
}
然后创建生成,指定交换机和路由键,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
this.template.convertAndSend("topic.exchange", "topic.routing.rabbit", msg);
}
}
调用发送方send()
方法,执行结果如图:
路由键设置为 “topic.routing.rabbit”
的消息将被传送到两个队列。如果设置为“test.routing.rabbit”
只会进入第一个队列,而 “topic.routing.test”
只会进入第二个队列。“quick.brown.fox”
不匹配任何绑定,因此它将被丢弃。
主题交换功能强大,可以像其他交换一样运行。
当队列与 “#”
(哈希) 绑定键绑定时 - 它将接收所有消息,而不管路由键如何 - 就像在Fanout交换中一样。
当绑定中不使用特殊字符 “*”
(星号) 和 “#”
(哈希) 时,Topic的行为将与Direct主题交换类似。
标题(Headers)
Headers(标题)模式,它不像Direct(直连)模式和Topic(主题)模式那样依赖于路由键(routing key)进行消息路由,而是依赖于消息的Headers属性进行路由决策。
创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。
- 全部匹配
创建Headers交换机 ,然后将队列和Headers交换机进行绑定,通过调用whereAll()
方法,对定义的Header进行全部匹配,配置示例代码如下:
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
/**
* 交换机
* @return
*/
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("header.exchange");
}
/**
* 绑定队列到指定交换机,全部匹配
* @return
*/
@Bean
public Binding bindingHeader(){
Map<String, Object> headerMap= new HashMap<>();
headerMap.put("key", "1");
headerMap.put("key2", "2");
return BindingBuilder.bind(queue()).to(headersExchange()).whereAll(headerMap).match();
}
}
然后定义MessageProperties对象,调用setHeader()
方法设置请求头信息,发送方代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加请求头
messageProperties.setHeader("key", "1");
messageProperties.setHeader("key2", "2");
//设置消息
String msg = "hello world";
Message message = new Message(msg.getBytes(), messageProperties);
this.rabbitTemplate.convertAndSend("header.exchange", "", message);
}
}
Message 类的目的是将 body
和 properties
封装在单个实例中,以便 API 反过来可以更简单。以下示例显示了 Message 类定义:
消费方代码如下:
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public void receive(byte[] message) {
System.out.println(" [x] Received '" + new String(message) + "'");
}
}
你也可以使用注解方式节省你的配置代码,示例代码如下:
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "hello"),
exchange = @Exchange(value = "header.exchange", type = ExchangeTypes.HEADERS),
arguments = {
@Argument(name = "key", value = "1"),
@Argument(name = "key2", value = "2"),
})
)
public class MQReceiver {
@RabbitHandler
public void receive(byte[] message) {
System.out.println(" [x] Received '" + new String(message) + "'");
}
}
要注意接受的消息类型保持一致,否则会报Failed to convert message,消息转换错误。
调用发送方send()
方法,执行结果如图:
(2)部分匹配
Headers交换机和队列绑定后,通过调用whereAny()
方法达到部分匹配,配置示例代码如下:
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
/**
* 交换机
* @return
*/
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("header.exchange");
}
/**
* 绑定队列到指定交换机,部分匹配
* @return
*/
@Bean
public Binding bindingHeader(){
Map<String, Object> args = new HashMap<>();
args.put("key", "1");
args.put("key2", "2");
return BindingBuilder.bind(queue()).to(headersExchange()).whereAny(args).match();
}
}
发送方,MessageProperties对象,设置部分请求头信息,发送方代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加请求头
messageProperties.setHeader("key", "1");
//设置消息
String msg = "hello world";
Message message = new Message(msg.getBytes(), messageProperties);
this.rabbitTemplate.convertAndSend("header.exchange", "", message);
}
}
消费方代码如下:
@Component
@RabbitListener(queues = "hello2")
public class MQReceiver2 {
@RabbitHandler
public void receive(byte[] message) {
System.out.println(" [x] Received2 '" + new String(message) + "'");
}
}
调用发送方send()
方法,执行结果如图:
最后我们看一下交换机绑定队列的信息,如图
远程过程调用 (RPC)
如果我们需要在远程计算机上运行一个函数并等待结果?这种模式通常称为远程过程调用 (RPC)。
在本教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。
如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 而不是类似 RPC 的阻塞,而是将结果异步推送到下一个计算阶段。
通过 RabbitMQ 执行 RPC 很容易。客户端发送请求消息,服务器使用响应消息进行回复。Spring AMQP 的 RabbitTemplate 当我们使用上述 convertSendAndReceive()
方法时,它会为我们处理回调队列。
我们以Direct交换机为例,配置信息如下:
@Configuration
public class MQConfig {
/**
* 队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("hello");
}
/**
* 交换机
* @return
*/
@Bean
public DirectExchange topicExchange(){
return new DirectExchange("direct.exchange");
}
/**
* 绑定队列到指定交换机
* @return
*/
@Bean
public Binding bindingTopic(){
return BindingBuilder.bind(queue()).to(topicExchange()).with("rpc");
}
}
发送方,调用convertSendAndReceive()
方法,接受回调参数,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
String res = (String) template.convertSendAndReceive("direct.exchange", "rpc", msg);
System.out.println(res);
}
}
消费方只需要定义返回类型即可,示例代码如下:
@Component
@RabbitListener(queues = "hello")
public class MQReceiver {
@RabbitHandler
public String receive(String message) {
System.out.println(" [x] Received '" + message + "'");
return "hi";
}
}
调用发送方send()
方法,执行结果如图:
死信队列
死信队列(Dead Letter Queue)是RabbitMQ中一个特殊的队列,用于存储因消息消费失败而被标记为死信的消息。
当消息变成一个死信之后,如果这个消息所在的队列存在
x-dead-letter-exchange
参数,那么它会被发送到x-dead-letter-exchange
对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。
死信队列的原因有如下几种:
1.消费者显式地拒绝了消息(通过basic.reject()
或basic.nack()
方法,并且设置了requeue参数为false
)
先将消费者设置为手动应答方式:
#手动应答
spring.rabbitmq.listener.simple.acknowledge-mode = manual
先定义死信队列和死信交换机并绑定,绑定后可以队列详情页面里看到配置信息,如图所示:
然后定义普通队列和普通交换机并绑定,在普通队列里面设置死信队列和死信队列的routingKey,示例代码如下:
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
监听消费方普通队列,调用basicReject()
方法手动拒绝消息,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException {
System.out.println("收到消息:"+msg);
// 参数一:当前消息标签,参数二:false不重新放回队列,true重新放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为拒绝(小于等于DeliveryTag的消息),false只确认当前消息,参数三:false不重新放回队列,true重新放回队列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, false);
}
}
监听消费方死信队列,调用basicAck()
方法接收消息,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException {
System.out.println("死信队列收到消息:" + msg + "'");
// 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费(小于等于DeliveryTag的消息),false只确认当前消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送方向普通交换机发送消息,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用发送方send()
方法,执行结果如图:
- 由于每个消息的TTL而过期
如果是之前创建的队列,请重新创建队列,否则会报队列不存在x-message-ttl
属性,如图所示:
设置好的队列,除了上述介绍的详情里面可以看到配置的信息,外面也可以看到,如图所示:
我们需要先将手动确认改为自动确认或者删除配置(默认手动确认),示例代码如下:
#spring.rabbitmq.listener.simple.acknowledge-mode = auto
(1)当你在队列级别设置TTL时,这意味着该队列中的所有消息都将有一个统一的过期时间(TTL和死信队列是两个独立的配置,但是他们可以配合使用)。
@Configuration
public class MQConfig {
// 省略(参考前面示例)死信队列绑定代码... ...
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-message-ttl", 10000);
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.ttl(10000)
.build();
}
// 省略(参考前面示例)普通交换机和队列绑定代码... ...
}
监听消费方普通队列,模拟业务处理过程中出现异常情况,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) {
log.info("收到消息:"+msg);
throw new RuntimeException();
}
}
监听消费方死信队列,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) {
log.info("死信队列收到消息:{}",msg);
}
}
发送方向普通交换机发送消息,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用发送方send()
方法,执行结果如图:
你会发现就算TTL到期后,并不会立即进入死信队列
(2)另一方面,你也可以在消息级别设置TTL。这意味着每个消息都可以有自己的过期时间。
其他代码不动(参考上述示例),发送方示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
log.info("发送消息:"+msg);
// 消息后处理对象,可以设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息过期时间5秒(单位:ms)
message.getMessageProperties().setExpiration("5000");
return message;
}
};
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用发送方send()
方法,执行结果如图:
当同时指定每个队列和每个消息的 TTL 时,将选择两者之间的较低值。
在某些队列实现中,过期的消息不会立即被删除,过期的消息可能会排在未过期消息之后,直到这些未过期消息被消费或也过期。这种延迟处理方式可能导致过期消息占用队列资源。
- 队列达到最大长度
再普通队列里设置最大长度(自动确认模式),将原来的队列删除后,重新创建队列,示例代码如下:
@Configuration
public class MQConfig {
// 省略(参考前面示例)死信队列绑定代码... ...
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
// normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.maxLength(5)
.build();
}
// 省略(参考前面示例)普通交换机和队列绑定代码... ...
}
为了达到演示的效果我们将MQ的预期值设置为1(不然要发送很多条消息,才可以有效果),示例代码如下:
spring.rabbitmq.listener.simple.prefetch=1
普通队列和死信队列的消费者监听,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) throws IOException, InterruptedException {
log.info("收到消息:"+msg);
}
}
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) {
log.info("死信队列收到消息:{}",msg);
}
}
发送方向普通交换机发送10条消息,示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
for (int i = 0; i < 10; i++) {
String msg = "hello world_"+ i;
template.convertAndSend("normal.exchange", "normal", msg);
}
}
}
调用发送方send()
方法,执行结果如图:
有时因为消费速度的问题,不一定等于你设置的最大长度,但基本上可以保证设置了最大长度,在队列满的情况下按溢出方式进行处理。
当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面删除或死信消息(即队列中最早的消息)。
要修改此行为,请使用下面描述的
overflow
设置:
默认drop-head
,如果overflow
设置为reject-publish
或reject-publish-dlx
,则最近发布的消息将被丢弃。此外,如果启用了发布者确认,则将通过 basic.nack 消息通知发布者拒绝。
延迟队列
延迟队列是一种特殊的队列,用于存放那些需要在指定时间之后才能被消费者消费的消息。这种机制在很多场景下都非常有用,比如订单超时自动取消、定时任务调度、延迟发送通知等。
RabbitMQ 本身并不直接支持延迟队列,但可以通过一些插件或者特定的消息属性来实现。最常用的方式是使用 RabbitMQ 的 Dead Letter Exchanges(DLX)结合消息的 TTL(Time-To-Live)属性来实现(侧重点就是延迟,与死信队列略有不同),示例代码如下:
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-message-ttl", 10000);//设置消息过期时间
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.ttl(10000)
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
只需要监听死信队列即可,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) {
log.info("死信队列收到消息:{}",msg);
}
}
发送方示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
log.info("发送消息");
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用发送方send()
方法,执行结果如图:
另外通过RabbitMQ插件实现延迟队列,下载 rabbitmq_delayed_message_exchange
插件,然后选择想要下载的版本进行下载:
安装完成后,新建交换机类型多了一种延迟交换机x-delayed-message
,这里就不过多的演示,有需求了解的可以自行百度学习。
防止重复消费
消息重复消费的根本原因在于消息队列的可靠性保证机制,即确保消息至少被消费一次。这种机制确保了消息不会因为网络问题或消费者崩溃而丢失,但也可能导致消息被多次投递给消费者。
在消息被消费者处理完成后,消费者通过调用消息队列提供的basic.ack()
方法确认接口来告知消息队列该消息已被成功处理,消息队列随后会删除该消息,从而避免其他消费者再次处理这条消息(当然你也可以使用自动应答模式)。
你需要先开启手动确认模式,示例代码如下:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
创建普通交换机和队列并绑定,示例代码如下:
@Configuration
public class MQConfig {
/**
* 普通队列
*
* @return
*/
@Bean
public Queue queue() {
return new Queue("normal_queue");
}
/**
* 普通交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
*
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
消费方再方法上定义Message对象和Channel对象,然后调用Channel对象的basicAck()
或basicNack()
、basicReject()
方法,进行消息确认或拒绝,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg,Message message,Channel channel) throws IOException, InterruptedException {
log.info("收到消息:"+msg);
try{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
发送方示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
log.info("发送消息");
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用发送方send()
方法,执行结果如图:
另外一种情况,业务处理过程中出现异常,重复消费导致死循环, Spring Boot 提供了消息重试的一个属性:spring.rabbitmq.template.retry.max-attempts
,示例代码如下:
# 开启消费重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 设置消费最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
现在假设再自动应答模式中,业务处理过程中出现了异常,消费方示例代码如下:
// 省略(参考前面示例)绑定代码和发送方代码... ...
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) {
log.info("收到消息:"+msg);
throw new RuntimeException();
}
}
调用发送方send()
方法,执行结果如图:
你还可以设置每次重发的间隔时间,示例代码如下:
# 设置间隔时间(单位:秒)
spring.rabbitmq.listener.simple.retry.initial-interval=10000
调用发送方send()
方法,执行结果如图:
手动拒绝并重新入队的行为并不直接受
spring.rabbitmq.listener.simple.retry.max-attempts
配置的控制。这个配置项主要控制的是监听器在自动模式下,对消息处理失败后的自动重试次数。当消息通过手动方式被拒绝并重新入队时,它被视为一个新的消费请求,而不是之前失败尝试的延续。
持久化
RabbitMQ的持久化机制主要用于确保消息在队列中即使在Broker重启后也不会丢失。持久化分为三个部分:交换机持久化、队列持久化、消息持久化。
交换机和队列的持久化将durable
设置为true,前面我们讲解的所有内容中创建的交换机和队列,默认都是持久化的,如图所示:
当服务器重启后不会被丢失,下面我们模拟一下服务器重启的情况,现在有一个持久化的交换机和队列(含D字母为持久化),如图所示:
然后我们执行命令systemctl restart rabbitmq-server
,让RabbitMQ重启,如图所示(稍等一会):
然后刷新页面后发现交换机和队列依然存在。
下面我们来演示一下,不设置为持久化(先将之前队列删除),示例代码如下:
@Configuration
public class MQConfig {
@Bean
public Queue queue() {
return new Queue("normal_queue", false);
}
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange", false, false);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
新创建的队列和交换机(不含字母D),如图所示:
再次执行重启命令systemctl restart rabbitmq-server
,刷新页面后交换机和队列就不存在了(如果有被监听的情况下,重启后是不会删除的)。
- 消息持久化
消息持久化并不能完全保证消息不丢失,因为将消息写入磁盘比仅在内存中存储要慢。这可能导致生产者的发送速度降低。另一方面再存储到磁盘过程中消息还在缓存的一个间隔点,此时宕机导致消息丢失。
如果队列本身被定义为持久化(durable),那么队列可以在RabbitMQ重启后保留其状态。这意味着,队列的元数据和其结构会被保留。
如果消息的持久化是通过队列来确认,那消息持久化就完全没啥必要了,但是还是介绍下,通过setDeliveryMode()
方法,设置为MessageDeliveryMode.PERSISTENT,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 持久化设置
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
template.convertAndSend("normal.exchange", "normal", msg, messagePostProcessor);
}
}
交换机自动删除
在临时队列章节中,若断开了消费者的连接,队列会自动删除。我们同样也可以将交换机的autodelete
属性被设置为true,当所有与该交换机绑定的队列都断开连接并且不再存在时,该交换机将会自动被RabbitMQ删除。
源码如图所示:
示例代码如下:
@Configuration
public class MQConfig {
public Queue queue() {
return new Queue("normal_queue");
}
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal.exchange", true, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
创建成功后可以看到交换机对应的配置标明AD标签,如图所示:
当没有队列绑定时,对应的交换机将自动删除(可以设置为临时队列或者删除队列后查看效果)。
可靠传输
确保RabbitMQ中的消息可靠传输,有很多种策略,比如前面介绍的:持久化、死信队列、确认机制,还有一种方式是生产方确认机制,它有两种方式:confirm机制和return机制。
- Confirm机制
Confirm机制主要用于确认消息是否已经被RabbitMQ代理成功接收并路由到指定的Exchange(交换机)。
开启Confirm确认机制,示例代码如下:
# 开启生产方confirm确认机制 SIMPLE: 使用同步的 Confirm 模式 CORRELATED: 使用异步的 Confirm 模式
spring.rabbitmq.publisher-confirm-type=correlated
定义一个全局Bean,通过setConfirmCallback()
方法接收消息回调,示例代码如下:
@Configuration
public class MQConfig {
private static final Logger log = LoggerFactory.getLogger(MQConfig.class);
// 省略绑定交换机和队列代码(参考前面示例)... ...
/**
* 统一配置
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 表示如果消息无法被路由到一个或多个队列(例如,因为指定的路由键没有匹配的队列),那么消息代理(如RabbitMQ)将返回一个未路由的消息给生产者(发送者)。
rabbitTemplate.setMandatory(true);
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息已经到达Exchange");
} else {
log.info("消息没有到达Exchange");
}
if (correlationData != null) {
log.info("相关数据:" + correlationData);
}
if (cause != null) {
log.info("原因:" + cause);
}
});
return rabbitTemplate;
}
}
生产方和消费方的代码就不展示了(参考前面的示例),启动后将交换机删除后,调用send()
方法,执行结果如图:
- Return机制
Return机制用于处理那些由于路由问题而无法到达任何队列的消息。
开启Return确认机制,示例代码如下:
# 开启生产方return确认机制
spring.rabbitmq.publisher-returns=true
在全局Bean里,通过setReturnsCallback()
方法接收消息回调,示例代码如下:
@Configuration
public class MQConfig {
private static final Logger log = LoggerFactory.getLogger(MQConfig.class);
// 省略绑定交换机和队列代码(参考前面示例)... ...
/**
* 统一配置
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 表示如果消息无法被路由到一个或多个队列(例如,因为指定的路由键没有匹配的队列),那么消息代理(如RabbitMQ)将返回一个未路由的消息给生产者(发送者)。
rabbitTemplate.setMandatory(true);
// 设置返回回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("消息无法到达队列时触发");
log.info("ReturnCallback: " + "消息:" + returnedMessage.getMessage());
log.info("ReturnCallback: " + "回应码:" + returnedMessage.getReplyCode());
log.info("ReturnCallback: " + "回应信息:" + returnedMessage.getReplyText());
log.info("ReturnCallback: " + "交换机:" + returnedMessage.getExchange());
log.info("ReturnCallback: " + "路由键:" + returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
}
发送方发送一个不匹配的RoutingKey,示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
template.convertAndSend("normal.exchange", "normal2", msg);
}
}
调用send()
方法,执行结果如图:
通过合理使用这两个机制(一起使用),生产者可以确保消息在发送到RabbitMQ时的可靠性,并及时处理那些由于路由问题而无法到达队列的消息。
唯一标识
在 RabbitMQ 中设置消息的唯一标识(Message ID)可以帮助你跟踪消息的处理状态。你可以通过在发送消息时设置 MessageProperties 来实现这一点。
在发送消息的时候给消息设置指定标识,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
String msg = "hello world";
MessageProperties messagePostProcessor = new MessageProperties();
messagePostProcessor.setMessageId("202409300000001"); // 设置消息标识
Message message = null;
try {
message = new Message(msg.getBytes("utf-8"), messagePostProcessor);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
template.convertAndSend("normal.exchange", "normal", message);
}
}
消费方示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(byte[] msg,Message message,Channel channel) throws IOException {
log.info("收到消息:"+new String(msg)+",消息标识"+message.getMessageProperties().getMessageId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
执行结果如图:
面试题:如何保证MQ顺序消费?
答:可以在生产者端将同一组的消息发送到一个特定的队列。然后,每个队列由一个单独的消费者处理,使用basicAck()
来确认消息,从而保持组内消息的顺序。