前言
消息堆积是Mq消费时常见的问题,这里我们展开说一下消息堆积的原因,以及RabbitMq 中是如何解决这个问题的。
1. 消息堆积问题
当生产者发送消息时的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早收到的消息,可能会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种思路:
- 增加更多的消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
RabbitMq 中解决消息堆积的思路是扩大队列容积,但是它把消息存储从内存中剥离出来,使用了磁盘。
2. 惰性队列
在 RabbitMq 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列,惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者消费消息时才会从磁盘中读取并加载到内存
- 支持百万条的消息存储
2.1 命令行设置
设置一个队列为惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。可以通过命令将一个运行中的队列修改为惰性队列
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
- rabbitmqctl: RabbitMq 的命令行工具
- set_policy: 添加一个策略
- Lazy: 策略名称,可以自定义
- ^lazy-queue$: 用正则表达式匹配队列的名字
- {“queue-mode”:“lazy”}: 设置队列模式位 lazy 模式
- –apply-to queues: 策略的作用对象,是所有的队列
而 SpringAMQP 中声明惰性队列分为两种方式, 一种是@Bean 的方式,一种是基于注解的方式
2.2 基于@Bean 的方式
使用 .lazy( ) 开启x-queue-mode为lazy
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** 惰性队列配置 */
@Configuration
public class LazyConfig {
/** 惰性队列 - 增加了.lazy()属性 */
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 开启x-queue-mode为lazy
.build();
}
/** 正常队列 */
@Bean
public Queue normalQueue()
{
return QueueBuilder.durable("normal.queue")
.build();
}
}
2.3 基于注解方式:
在消费者端的消息监听中新增方法
使用 @Argument 来标注使用参数的key 和对应参数的值 value
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息:{}", msg);
}
3. 案例测试
这里对正常队列和惰性队列进行十万条数据的测试,启动consumer服务,在publisher服务的SpringAmqpTest类中新增下列两个方法:
/** 测试惰性队列 */
@Test
public void testLazyQueue() throws InterruptedException {
// 模拟发送十万条数据
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, LazyQueue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下LazyQueue的效果
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("lazy.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
/** 测试正常队列 */
@Test
public void testNormalQueue() throws InterruptedException {
// 模拟发送十万条数据
long b = System.nanoTime();
for (int i = 0; i < 1000000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 改成非持久化,可以看一下正常队列的效果
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("normal.queue", message);
}
long e = System.nanoTime();
System.out.println(e - b);
}
RabbitMQ控制台队列的数据变化:
-
初始化:
-
运行两个生产者方法后,看一下两个队列总览
-
惰性队列数据变化 (全部一进来就会存储到磁盘,内存中只有需要消费的)
-
正常队列数据变化:
而正常队列都在内存中(这里是没有开启持久化的情况)
4. 总结
消息堆积问题的解决方案?
- 队列上绑定多个消费者,提高消费速度
- 给消费者开启线程池,提高消费速度
- 使用惰性队列,可以在 mq 中保存更多消息
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间隙性的 page-out, 性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的 IO