文章目录
- 消息分发
- 不公平分发
- 限流-basic.qos
- 主要功能
- 使用场景
- 示例代码
- 消费者默认concurrency数量
- prefetch和concurrency结合?
- spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别
- 1. `spring.rabbitmq.template.retry.enabled=true`
- 2. `spring.rabbitmq.listener.simple.retry.enabled=true`
- 总结
- 多机器(集群部署)同时消费某个队列的消息理解
- 负载均衡
- 高可用性
- 和Fanout发布订阅对比理解
- TODO--持续更新
这篇文章主要是汇总一些杂七杂八的问题,核心内容参考前三章节
RabbitMQ系列文章 |
---|
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念 |
不止于纸上谈兵,用代码案例分析如何确保RabbitMQ消息可靠性? |
不止于方案,用代码示例讲解RabbitMQ顺序消费 |
RabbitMQ常见问题持续汇总 |
消息分发
不公平分发
RabbitMQ 默认分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1); 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
限流-basic.qos
通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。
prefetch
默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch
值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
basic.qos
是 RabbitMQ 的一个方法,用于设置消息消费的流控(Quality of Service, QoS)。具体来说,它允许你限制在消费者确认(acknowledge)消息之前,RabbitMQ 能够推送给该消费者的消息数量。这样可以防止消费者因为处理不过来而被淹没在大量未处理的消息中。
com.rabbitmq.client.Channel#basicQos(int, int, boolean)
/**
* 请求对此通道应用特定的prefetchCount“服务质量”设置
*
* @param prefetchCount 服务器将交付的最大消息数,如果不限制则为0
* @throws java.io.IOException 如果遇到错误
*
* 注意: 该方法是基本服务质量机制的一部分,用于流量控制
* 客户端可以通过设置prefetchCount来避免被服务器压垮
*/
void basicQos(int prefetchCount) throws IOException;
主要功能
配置如下
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
concurrency: 1
max-concurrency: 3
# 消费者预取1条数据到内存,默认为250条
# 消费端限流:每个消费者未确认的未处理消息的最大数量
prefetch: 1
-
设置预取计数(prefetch count):
prefetch_count
是basic_qos
方法中的一个参数,用于指定消费者可以接收的未确认消息的最大数量。在 RabbitMQ 中,prefetch_count
的值可以是 0 到 65535 之间的任意整数,其中 0 表示无限制 。RabbitMQ 允许为每个消费者独立设置prefetch_count
,而不是像 AMQP 0-9-1 协议中那样在通道级别共享 。basic.qos
允许设置每个消费者在未确认的情况下能接收的最大消息数量。比如,basic.qos(1)
表示每个消费者在没有确认上一条消息之前不会再接收新的消息。这有助于实现公平调度,确保消费者不会被淹没。
-
限制消息分发:
- 当有多个消费者时,
basic.qos
通过限制每个消费者处理消息的数量,确保消息分发更均衡。消费者处理完消息并发送确认后,RabbitMQ 才会将新的消息发送给它。
- 当有多个消费者时,
-
避免消息堆积:
- 通过合理设置
prefetch count
,可以防止消费者端消息堆积,避免因为过多未处理的消息导致的内存消耗问题。
- 通过合理设置
使用场景
- 资源密集型任务: 如果你的消费者在处理某些需要消耗较多资源(如CPU、内存)的任务时,需要限制其一次处理的消息数量,以避免资源耗尽。
- 分布式系统: 在分布式系统中,
basic.qos
可以帮助实现更公平的负载均衡,确保每个消费者都能公平地获取消息处理。
示例代码
// 设置每个消费者最多处理3条未确认的消息
channel.basicQos(3);
通过上述设置,RabbitMQ 在每个消费者确认消息前只会推送3条消息,这样就能有效控制消息处理的并发量。
消费者默认concurrency数量
首先说一下concurrency配置,这个配置是设置listener初始化时的线程数,即消费者的数量,即消费者同时消费消息的数量。
那么如果没有显性设置concurrency时,默认的线程数是多少呢,答案是1。
具体的我们可以在源码org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
看到
prefetch和concurrency结合?
prefetch
默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch
值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# 全局并发设置
concurrency: 1
max-concurrency: 3
# 消费者预取1条数据到内存,默认为250条
# 消费端限流:每个消费者未确认的未处理消息的最大数量
prefetch: 10
若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息)。
concurrency在注解里面可以配置,配置了以后以注解为准,yml里面相当于是全局的
@Component
public class MyConsumer {
//会覆盖配置文件中的参数。
@RabbitListener(queues = {"myQueue"},concurrency ="2")
public void receiver(Message msg, Channel channel) throws InterruptedException {
//...业务处理
}
}
spring.rabbitmq.template.retry.enabled=true和spring.rabbitmq.listener.simple.retry.enabled=true有什么区别
spring.rabbitmq.template.retry.enabled=true
和 spring.rabbitmq.listener.simple.retry.enabled=true
是 Spring AMQP 中配置 RabbitMQ 消息重试机制的两个不同选项,分别用于不同的场景。
1. spring.rabbitmq.template.retry.enabled=true
- 作用范围: 这个配置用于通过
RabbitTemplate
进行消息发送时的重试机制。 - 适用场景: 当你在使用
RabbitTemplate
主动发送消息到 RabbitMQ 时,如果消息发送失败(如网络问题、连接超时等),Spring 会自动进行重试。 - 默认行为: 如果开启了这个选项,
RabbitTemplate
会按照配置的重试策略(如重试次数、重试间隔等)自动重试消息发送操作,直到成功或者达到重试上限。 - 常见使用: 适用于主动调用
RabbitTemplate.convertAndSend()
或类似方法进行消息发送的场景。
2. spring.rabbitmq.listener.simple.retry.enabled=true
- 作用范围: 这个配置用于在使用消息监听器(Message Listener)时的重试机制,特别是在使用简单消息监听容器(SimpleMessageListenerContainer)时。
- 适用场景: 当你使用
@RabbitListener
或其他监听机制从队列中接收和处理消息时,如果消息处理失败(如业务逻辑异常),Spring 会按照配置的重试策略自动进行重试。 - 默认行为: 如果开启了这个选项,当消费者处理消息时抛出异常,Spring 会自动进行重试,直到处理成功或达到最大重试次数。
- 常见使用: 适用于使用
@RabbitListener
注解来监听队列消息,并希望在处理失败时自动重试的场景。
总结
spring.rabbitmq.template.retry.enabled=true
是用于发送消息失败后的重试机制,针对的是消息的生产者(发送端)。spring.rabbitmq.listener.simple.retry.enabled=true
是用于消费消息时处理失败后的重试机制,针对的是消息的消费者(监听端)。
多机器(集群部署)同时消费某个队列的消息理解
对于这个代码,我们的代码在实际业务中是集群部署的,不同的机器都可能去消费这个队列的消息!
public class MessageConsumer {
@RabbitListener(queues = "queue1")
public void receiveMessageFromQueue1(String message) {
// 处理来自queue1的消息
System.out.println("Received from queue1: " + message);
}
}
负载均衡
通过多个消费者实例共同消费同一个队列的消息,负载可以均匀分布在所有消费者上,提高整体系统的处理能力。当多个消费者监听同一个队列时,RabbitMQ 会按照轮询的方式将消息分发给这些消费者。每条消息只会被一个消费者消费,这样可以有效地分摊负载,提高系统的吞吐量和可靠性。
假设有两个消费者(Consumer A 和 Consumer B),都监听同一个队列 queue1
。RabbitMQ 将会按如下方式分发消息:
- 消息 1 发送给 Consumer A
- 消息 2 发送给 Consumer B
- 消息 3 发送给 Consumer A
- 依此类推…
高可用性
如果某个消费者实例宕机,RabbitMQ 会自动将新的消息发送给其他存活的消费者,确保消息不会丢失,业务处理不中断。
和Fanout发布订阅对比理解
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:
也就是说发送一条消息A,假设有队列1、队列2都订阅了这个消息A,那么这监听这两个队列的消费者都会去消费消息A,同时,在集群部署的情况下,对应监听队列1、2的消费者都会有多个,但是每次MQ都会采用负载均衡策略(默认轮询),这样每个队列对应实际只会有一个消费者去消费!
下面展示了消息A被两个队列订阅,并且每个队列有多个消费者,但在负载均衡策略下,每次只有一个消费者去消费消息A的情况。
如图所示:
- 消息A被发送到两个队列:队列1和队列2。
- 每个队列有多个消费者:队列1有消费者1、消费者2和消费者3,队列2有消费者4、消费者5和消费者6。
- 在负载均衡策略下,每次只有一个消费者去消费消息A。例如,队列1的消息A可能会被消费者1、消费者2或消费者3中的一个消费,队列2的消息A可能会被消费者4、消费者5或消费者6中的一个消费。