一、持久化概念
在上一章内容中我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉后消 息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它将忽视队列 和消息,除非告知它不要这样做。
确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。
二、实现持久化
2.1 队列实现持久化
如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化 。
boolean durable = true;
channel.queueDeclare(ACK_QUEUE_NAME, durable, false, false, null);
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新 创建一个持久化的队列,不然就会出现错误 。
当我们将durable参数位置传递true之后,即使重启 rabbitmq 队列也依然存在。
2.2 消息实现持久化
要想让消息实现持久化需要在消息生产者修改代码,需要像channel的basicPublish方法中props属性中传递MessageProperties.PERSISTENT_TEXT_PLAIN 参数。
将消息标记为持久化并不能保证完全不丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘,但是还没有存储完,事实上消息还在缓存的一个时间点。在这个时候消息并没有真正写入磁盘,持久性保证并不强。但是对于我们的简单任务队列而言,这已经绰绰有余了。后面的发布确认章节将介绍更强有力的持久化策略。
没有持久化:
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
实现持久化:
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
三、不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的话就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下就不合理,但是 RabbitMQ 并不知道这种情况,它依然很公平地进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个 任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加 新的 worker 或者改变其他存储任务的策略。
四、预取值
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
设置预取值的方式和三中一样,即channel.basicQos(预取值)。
例如,假设在通道上有 未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何 消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况并再发送一条消息。
消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取值将提高 向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理 的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)。应该小心使用具有无限预处理 的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的 内存消耗变大,所以找到合适的预取值是一个反复试验的过程,在不同的负载下该值取值也不同,100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这 将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。