文章目录
- RabbitMQ 持久化机制
- 概述
- 实现非持久化(交换机、队列、消息)
- 实现持久化(交换机、队列、消息)
RabbitMQ 持久化机制
概述
前面讲到了 生产者消息确认机制 和 消费者消息确认机制,保证了消息传输的可靠性,但是这还不够,试想如果 Broker 突然崩溃,那么所有的 交换机、队列、消息 不就全部都没了(RabbitMQ 以内存为主,硬盘为辅,默认交换机和队列都是非持久化的). 因此需要持久化机制.
a)交换机持久化
:
如果交换机不设置持久化,那么在 rabbitmq 服务重启之后,相关的交换机元数据就会丢失,对一个长期使用的交换机来说,建议设置成持久化的.
交换机的持久化是通过声明时将 druable 参数设置为 true 实现的,这样交换机的属性就会在 硬盘 保存,当 MQ 意外关闭之后,重启 MQ 时服务自动恢复交换机数据.
b)队列持久化
:
如果队列不设置持久化,那么再重启 MQ 之后,队列就会丢失,这也就意味着,无论队列中的消息是否持久化,也都会跟着丢失.
队列的持久化是通过声明时将 druable 参数设置为 true 实现的,但是这样并不能保证内部存储的消息一定不会丢失. 要确保消息不丢失,同时也需要将消息设置为持久化.
c)消息持久化
:
消息想要实现真正意义上的持久化,前提时是队列需要是持久化的,其次消息是持久化的(只设置消息持久化,队列不设置持久化,是毫无意义的).
rabbitmq 默认情况下会将消息设置为 持久化的,除非队列被声明为非持久化、或者消息在发送时被标记为非持久化.
实现非持久化(交换机、队列、消息)
a)交换机、队列、绑定
@Bean("durableExchange")
fun durableExchange(): DirectExchange = ExchangeBuilder
.directExchange(MQConst.DURABLE_EXCHANGE)
.durable(false) //非持久化
.build()
@Bean("durableQueue")
fun durableQueue(): Queue = QueueBuilder
.nonDurable(MQConst.DURABLE_QUEUE) //非持久化
.build()
@Bean("durableBinding")
fun durableBinding(): Binding = BindingBuilder
.bind(durableQueue())
.to(durableExchange())
.with(MQConst.DURABLE_BINDING)
b)生产者接口
import com.cyk.rabbitmq.constants.MQConst
import org.springframework.amqp.core.MessageBuilder
import org.springframework.amqp.core.MessageDeliveryMode
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/mq")
class MQApi(
private val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/durable")
fun durable(): String {
val msg = MessageBuilder
.withBody("no durable msg 1".toByteArray())
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build()
rabbitTemplate.convertAndSend(MQConst.DURABLE_EXCHANGE,MQConst.DURABLE_BINDING ,msg)
return "ok"
}
}
c)程序启动后,可以看到交换机和队列都是非持久化的
d)重启服务后发现 交换机 和 队列 都还在是怎么回事?
那是因为你的 SpringBoot 服务还开着,当服务器重新连接上 MQ 的时候就会进行初始化(但未持久化的消息还是会丢失的)
实现持久化(交换机、队列、消息)
a)交换机、队列、绑定
@Bean("durableExchange")
fun durableExchange(): DirectExchange = ExchangeBuilder
.directExchange(MQConst.DURABLE_EXCHANGE)
.durable(true) //持久化
.build()
@Bean("durableQueue")
fun durableQueue(): Queue = QueueBuilder
.durable(MQConst.DURABLE_QUEUE) //持久化
.build()
@Bean("durableBinding")
fun durableBinding(): Binding = BindingBuilder
.bind(durableQueue())
.to(durableExchange())
.with(MQConst.DURABLE_BINDING)
b)生产者接口
import com.cyk.rabbitmq.constants.MQConst
import org.springframework.amqp.core.MessageBuilder
import org.springframework.amqp.core.MessageDeliveryMode
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/mq")
class MQApi(
private val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/durable")
fun durable(): String {
val msg = MessageBuilder
.withBody("durable msg 1".toByteArray())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build()
//不用上述这样设计消息也可以,因为 RabbitMQ 消息默认就是持久化的
rabbitTemplate.convertAndSend(MQConst.DURABLE_EXCHANGE,MQConst.DURABLE_BINDING ,msg)
return "ok"
}
}
c)进入 mq 管理界面,可以看得到 交换机、队列、消息 都是持久化的