RabbitMQ高级特性
RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客
RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客
引言
RabbitMQ 作为一款强大的消息队列中间件,在分布式系统中发挥着至关重要的作用。除了基本的消息收发功能外,它还具备许多高级特性,如 TTL、死信队列、延迟队列、事务和消息分发等。本文将详细介绍这些高级特性。
1. TTL(Time to Live,过期时间)
1.1 概念
TTL 即过期时间,RabbitMQ 可以对消息和队列设置 TTL。当消息到达存活时间之后,还没有被消费,就会被自动清除。这在很多业务场景中都非常有用,比如网上购物时,下单超过 24 小时未付款,订单会被自动取消;申请退款之后,超过 7 天未被处理,则自动退款。
1.2 设置消息的 TTL
有两种方法可以设置消息的 TTL,一是设置队列的 TTL,队列中所有消息都有相同的过期时间;二是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。
1.2.1 针对每条消息设置 TTL
针对每条消息设置 TTL 的方法是在发送消息的方法中加入 expiration
的属性参数,单位为毫秒。
配置交换机和队列
// TTL
public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";
// 1. 交换机
@Bean("ttlExchange")
public Exchange ttlExchange() {
return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
}
// 2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange,
@Qualifier("ttlQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
发送消息
@RequestMapping("/ttl")
public String ttl() {
String ttlTime = "10000"; // 10s
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration(ttlTime);
return messagePostProcessor;
});
return "发送成功!";
}
运行结果
调用接口发送消息后,可以看到 Ready
消息为 1。10 秒钟之后,刷新页面,发现消息已被删除。如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
1.3 设置队列的 TTL
设置队列 TTL 的方法是在创建队列时,加入 x-message-ttl
参数实现的,单位是毫秒。
配置队列和绑定关系
public static final String TTL_QUEUE2 = "ttl_queue2";
// 设置 ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {
// 设置 20 秒过期
return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();
}
// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,
@Qualifier("ttlQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
发送消息
@RequestMapping("/ttl")
public String ttl() {
// 发送不带 ttl 的消息
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");
return "发送成功!";
}
运行结果
运行之后发现,新增了一个队列,队列 Features
有一个 TTL
标识。调用接口发送消息后,可以看到 Ready
消息为 1。采用发布订阅模式,所有与该交换机绑定的队列(ttl_queue
和 ttl_queue2
)都会收到消息。20 秒钟之后,刷新页面,发现 ttl_queue2
中的消息已被删除,由于 ttl_queue
队列未设置过期时间,所以该队列的消息未删除。
1.4 两者区别
设置队列 TTL 属性的方法,一旦消息过期,就会从队列中删除;设置消息 TTL 的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判定的。
这是因为设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。
而设置消息 TTL 的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
2. 死信队列
2.1 死信的概念
死信(dead message)简单理解就是因为种种原因,无法被消费的信息,就是死信。当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX(Dead Letter Exchange),绑定 DLX 的队列,就称为死信队列(Dead Letter Queue,简称 DLQ)。
消息变成死信一般是由于以下几种情况:
- 消息被拒绝(
Basic.Reject/Basic.Nack
),并且设置requeue
参数为false
。- 消息过期。
- 队列达到最大长度。
2.2 代码示例
2.2.1 声明队列和交换机
包含两部分:声明正常的队列和正常的交换机;声明死信队列和死信交换机。死信交换机和死信队列和普通的交换机、队列没有区别。
// 死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
@Configuration
public class DLXConfig {
// 死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange() {
return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();
}
// 2. 死信队列
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
// 3. 死信队列和交换机绑定 Binding
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,
@Qualifier("dlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
// 正常交换机
@Bean("normalExchange")
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();
}
// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
// 正常队列和交换机绑定 Binding
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange,
@Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
}
2.2.2 正常队列绑定死信交换机
当这个队列中存在死信时,RabbitMQ 会自动的把这个消息重新发布到设置的 DLX 上,进而被路由到另一个队列,即死信队列。可以监听这个死信队列中的消息以进行相应的处理。
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx").build();
}
2.2.3 制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx")
.ttl(10 * 1000)
.maxLength(10L)
.build();
}
2.2.4 发送消息
@RequestMapping("/dlx")
public void dlx() {
// 测试过期时间,当时间达到 TTL,消息自动进入到死信队列
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
// 测试队列长度
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
// }
// 测试消息拒收
// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
2.2.5 测试死信
- 程序启动之后,观察队列:队列
Features
说明:D
是durable
的缩写,表示设置持久化;TTL
表示Time to Live
,队列设置了 TTL;Lim
表示队列设置了长度(x-max-length
);DLX
表示队列设置了死信交换机(x-dead-letter-exchange
);DLK
表示队列设置了死信RoutingKey
(x-dead-letter-routing-key
)。- 测试过期时间:调用接口发送消息,10 秒后,消息进入到死信队列。生产者首先发送一条消息,然后经过交换器(
normal_exchange
)顺利地存储到队列(normal_queue
)中。由于队列normal_queue
设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息过期。由于设置了 DLX,过期之时,消息会被丢给交换器(dlx_exchange
)中,这时根据RoutingKey
匹配,找到匹配的队列(dlx_queue
),最后消息被存储在queue.dlx
这个死信队列中。- 测试达到队列长度:队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列。发送前,死信队列只有一条数据,发送 20 条消息后,运行后可以看到死信队列变成了 11 条。过期之后,正常队列的 10 条也会进入到死信队列。
- 测试消息拒收:写消费者代码,并强制异常,测试拒绝签收。
@Component
public class DlxQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.NORMAL_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
// 3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 异常了就拒绝签收
Thread.sleep(1000);
// 第三个参数 requeue,是否重新发送,如果为 true,则会重新发送,若为 false,则直接丢弃,若设置死信,会进入到死信队列
channel.basicNack(deliveryTag, true, false);
}
}
// 指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("死信队列接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
}
}
发送消息,观察运行结果:
接收到消息: dlx test..., deliveryTag: 1
死信队列接收到消息: dlx test..., deliveryTag: 1
2.3 常见问题
- 死信队列的概念:死信(Dead Letter)是消息队列中的一种特殊消息,它指的是那些无法被正常消费或处理的消息。在消息队列系统中,如 RabbitMQ,死信队列用于存储这些死信消息。
- 死信的来源:
- 消息过期:消息在队列中存活的时间超过了设定的 TTL。
- 消息被拒绝:消费者在处理消息时,可能因为消息内容错误、处理逻辑异常等原因拒绝处理该消息。如果拒绝时指定不重新入队(
requeue=false
),消息也会成为死信。- 队列满了:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信。
- 死信队列的应用场景:对于 RabbitMQ 来说,死信队列是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。比如:用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列中,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。其他应用场景还有消息重试、消息丢弃、日志收集等。
3. 延迟队列
3.1 概念
延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
3.2 应用场景
延迟队列的使用场景有很多,比如:
- 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
- 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议。
- 用户注册成功后,7 天后发送短信,提高用户活跃度等。
3.3 TTL + 死信队列实现
RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL + 死信队列的方式组合模拟出延迟队列的功能。
代码实现
声明队列:
// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 绑定死信队列
arguments.put("x-dead-letter-routing-key", "dlx"); // 设置发送给死信队列的 RoutingKey
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
生产者:发送两条消息,一条消息 10s 后过期,第二条 20s 后过期。
@RequestMapping("/delay")
public String delay() {
// 发送带 ttl 的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..." + new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("10000"); // 10s 过期
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..." + new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("20000"); // 20s 过期
return messagePostProcessor;
});
return "发送成功!";
}
消费者:
// 指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf ("% tc 死信队列接收到消息: % s, deliveryTag: % d% n", new Date (), new String (message.getBody (),"UTF-8"),
message.getMessageProperties ().getDeliveryTag ());
}
**运行程序**:调用接口发送数据`http://127.0.0.1:8080/product/delay`,通过控制台观察死信队列消费情况:
周三 5 月 22 11:59:00 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 11:58:50 CST 2024, deliveryTag: 1
周三 5 月 22 11:59:10 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 11:58:50 CST 2024, deliveryTag: 2
可以看到,两条消息按照过期时间依次进入了死信队列。延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里,这样消费者一直消费死信队列里的消息就可以了。
**存在问题**:当把生产消息的顺序修改为先发送20s过期数据,再发送10s过期数据时:
```java
@RequestMapping("/delay")
public String delay() {
// 发送带ttl的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 20s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("20000");
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 10s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("10000");
return messagePostProcessor;
});
return "发送成功!";
}
通过控制台观察死信队列消费情况:
周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 12:14:02 CST 2024, deliveryTag: 3
周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 12:14:02 CST 2024, deliveryTag: 4
这时会发现:10s 过期的消息,也是在 20s 后才进入到死信队列。这是因为 RabbitMQ 只会检查队首消息是否过期,如果过期则丢到死信队列。如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。所以在考虑使用 TTL + 死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
3.4 延迟队列插件
RabbitMQ 官方提供了一个延迟的插件来实现延迟的功能。
安装延迟队列插件
- 下载并上传插件:根据自己的 RabbitMQ 版本从插件下载地址选择相应版本的延迟插件,下载后上传到服务器。插件上传目录参考installing Additional Plugins | RabbitMQ,
/usr/lib/rabbitmq/plugins
是一个附加目录,RabbitMQ 包本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建。如果为 docker 操作,使用docker cp
命令复制文件到 docker 容器,例如:docker cp 宿主机文件 容器名称或ID:容器目录
。- 启动插件:在服务器命令行中,使用
rabbitmq - plugins list
查看插件列表,使用rabbitmq - plugins enable rabbitmq_delayed_message_exchange
启动插件,之后重启 RabbitMQ 服务。如果为 docker 操作,进入容器后同样执行这两个命令来查看和启动插件,最后重启 docker 容器。- 验证插件:在 RabbitMQ 管理平台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了。
基于插件延迟队列实现
声明交换机、队列、绑定关系:
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayedConfig {
@Bean("delayedExchange")
public Exchange delayedExchange() {
return ExchangeBuilder.directExchange(Constant.DELAYED_EXCHANGE_NAME).durable(true).delayed().build();
}
//2. 队列
@Bean("delayedQueue")
public Queue delayedQueue() {
return QueueBuilder.durable(Constant.DELAYED_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("delayedBinding")
public Binding delayedBinding(@Qualifier("delayedExchange") Exchange exchange,
@Qualifier("delayedQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();
}
}
生产者:发送两条消息,并设置延迟时间。
@RequestMapping("/delay2")
public String delay2() {
// 发送带ttl的消息
rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed",
"delayed test 20s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelayLong(20000L);
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed",
"delayed test 10s..."+new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelayLong(10000L);
return messagePostProcessor;
});
return "发送成功!";
}
消费者:
import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayedQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.DELAYED_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
System.out.printf("%tc 死信队列接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));
}
}
运行程序,并测试:程序启动后,调用接口发送消息http://127.0.0.1:8080/product/delay2
,观察控制台:
周三 5月 22 15:42:02 CST 2024 死信队列接收到消息: delayed test 10s...Wed May 22 15:41:52 CST 2024
周三 5月 22 15:42:12 CST 2024 死信队列接收到消息: delayed test 20s...Wed May 22 15:41:52 CST 2024
从结果可以看出,使用延迟队列,可以保证消息按照延迟时间到达消费者。
介绍下 RabbitMQ 的延迟队列:延迟队列是一个特殊的队列,消息发送之后,并不立即给消费者,而是等待特定的时间,才发送给消费者。延迟队列的应用场景有很多,比如订单在十分钟内未支付自动取消、用户注册成功后 3 天后发调查问卷、用户发起退款 24 小时后商家未处理则默认同意自动退款等。但 RabbitMQ 本身并没直接实现延迟队列,通常有两种方法:
- TTL + 死信队列组合的方式:优点是灵活不需要额外的插件支持;缺点是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性。
- 使用官方提供的延迟插件实现延迟功能:优点是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了 DLX 的时序问题;缺点是需要依赖特定的插件,有运维工作,只适用于特定版本。
4. 事务
RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。
4.1 配置事务管理器
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TransactionConfig {
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
4.2 声明队列
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable("trans_queue").build();
}
4.3 生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/trans")
@RestController
public class TransactionProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
@RequestMapping("/send")
public String send() {
rabbitTemplate.convertAndSend("","trans_queue", "trans test 1...");
int a = 5/0;
rabbitTemplate.convertAndSend("","trans_queue", "trans test 2...");
return "发送成功";
}
}
4.4 测试
- 不加
@Transactional
,会发现消息 1 发送成功。- 添加
@Transactional
,消息 1 和消息 2 全部发送失败,体现了事务的原子性。
5. 消息分发
5.1 概念
RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表里的一个消费者。默认情况下,RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式不太合理,比如某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。
我们可以使用
channel.basicQos(int prefetchCount)
方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量。比如消费端调用了channel.basicQos(5)
,RabbitMQ 会为该消费者计数,发送一条消息计数 + 1,消费一条消息计数 - 1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息,类似 TCP/IP 中的 “滑动窗口”。prefetchCount
设置为 0 时表示没有上限,basicQos
对拉模式的消费无效。
5.2 应用场景
5.2.1 限流
在订单系统中,正常情况下每秒可处理 5000 请求,但在秒杀时请求瞬间达每秒 1 万个,若全部通过 MQ 发送会压垮订单系统。通过设置prefetchCount
参数并将消息应答方式设为手动应答,可实现限流。
配置 prefetch 参数和应答方式:
listener:
simple:
acknowledge-mode: manual
prefetch: 5
配置交换机和队列:
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QosConfig {
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange,
@Qualifier("qosQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
}
}
发送消息:一次发送 20 条消息。
@RequestMapping("/qos")
public String qos() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME, "qos", "qos test..."+i);
}
return "发送成功!";
}
消费者监听:
@Component
public class QosQueueListener {
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
}
}
测试:调用接口发送消息,控制台只打印 5 条消息,管理平台显示待发送 15 条,未确认 5 条。取消prefetch
配置,消费者会一次性接收 20 条消息。
5.2.2 负载均衡
在有两个消费者的情况下,若一个消费者处理任务快,一个慢,会导致负载不均衡。通过设置prefetch = 1
,可让 RabbitMQ 一次只给一个消费者一条消息,处理并确认前一条消息后再发送新消息,实现负载均衡。
配置 prefetch 参数和应答方式:
listener:
simple:
acknowledge-mode: manual
prefetch: 1
启动两个消费者:使用Thread.sleep(100)
模拟消费慢。
@Component
public class QosQueueListener {
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQosQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
channel.basicAck(deliveryTag, true);
}
@RabbitListener(queues = Constant.QOS_QUEUE)
public void ListenerQueue2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);
Thread.sleep(100);
channel.basicAck(deliveryTag, true);
}
}
测试:调用接口发送消息,通过日志观察两个消费者消费消息情况,可看到消息在两个消费者间均衡分配。
感谢阅览。