目录
- 一、RabbitMQ 消息应答
- 二、RabbitMQ 持久化
- 1.交换机的持久化
- 2.队列的持久化
- 3.消息的持久化
- 4.持久化问题
官网地址:https://www.rabbitmq.com/
下载地址:https://www.rabbitmq.com/download.html
一、RabbitMQ 消息应答
执行一个任务可能需要花费几秒钟的时间,你可能会担心如果一个消费者在执行任务过程中挂掉了。基于现在的代码,一旦RabbitMQ将消息分给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者上未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接受并且处理完毕了。RabbitMQ可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完毕,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ 才会重新投递。即使处理一条消息会花费很长的时间。消息应答是默认打开的。我们明确地把它们关掉了(autoAck=false)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。
boolean authAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
二、RabbitMQ 持久化
-
持久化:是为了提高 RabbitMQ 消息的可靠性,放置在异常情况(重启,关闭,宕机)下数据的丢失
-
RabbitMQ 持久化分为三个部分:
交换机的持久化
、队列的持久化
、消息的持久化
1.交换机的持久化
交换机的持久化是通过声明队列时,将 durable
参数设置为 true 实现的。如果交换器不设置持久化,那么 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了,建议将交换器设置为持久化。
设置交换器的持久化:
// 三个参数分别为 交换器名、交换器类型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
2.队列的持久化
队列的持久化是通过声明队列时,将 durable
参数设置为 true 实现的。如果队列不设置持久化,那么 RabbitMQ 服务重启之后,相关的队列元数据将会丢死,而消息似乎存车处在队列中的,所以队列中的消息也会被丢失。
设置队列的持久化:
// 参数1 queue:队列名
// 参数2 durable:是否持久化
// 参数3 exclusive:仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete:当所有消费客户端断开连接后,是否自动删除队列
// 参数5 arguments
channel.queueDecalre(QUEUE_NAME, true, false, false, null);
3.消息的持久化
队列的持久化只能保证其队列本身的元数据不会被丢失,但是不能保证消息不会被丢失。所以消息本身也需要被持久化,可以在投递消息前设置 AMQ.BasicProperties
的属性 deliveryMode
的值为 2 即可。
// 参数1 exchange:交换器
// 参数2 routingKey:路由键
// 参数3 props:消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT
// 参数4 body:消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
4.持久化问题
描述:将交换机、队列和消息都设置持久化之后既能保证数据不会被丢失吗?当然不能,多个方面:
-
问题1: 消费者端,消费者订阅队列将autoAck设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那数据也会丢失。
解决方案: 以手动确认接收消息,待处理完消息之后,手动删除消息。
-
问题2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 未来得及持久化,没有将数据写入磁盘,服务异常而导致数据丢失。
解决方案: 可以通过 RabbitMQ 集群的方式实现消息中间件的高可用。