文章目录
- 消息和队列TTL
- 概述
- 实战开发
- 死信队列
- 概述
- 实战开发
消息和队列TTL
概述
a)TTL(Time To Live 过期时间),RabbitMQ 可以对消息和队列设置 TTL. 当消息到达存活时间之后,还没有被消费,就会被自动清除.
b)注意:
- 给
消息
设置TTL:MQ 扫描到队列头部消息(先进先出)过期了,就会剔除消息(这里会存在一个问题:当队列头部的消息还没过期,而队列头部之后的元素过期了却不会被清理,需要等到头部元素过期被清理后,才会依次
扫描后面的元素). - 给
队列
设置TTL:相当于是给当前队列中的所有消息设置了一个全局过期时间,消息的 ttl 最终取决于 消息 和 队列 设置的 ttl 中的最小值.(队列设置 ttl 并非到期就删除队列,而是作用于其中的消息) - 给
消息和队列
都设置TTL:只要队列过期,那么该队列中的所有消息也都会被清理. 假设队列 TTL 是 20s,消息 TTL 是 10s,那么消息的 TTL 会自动取最小值,也就是 10s.
c)消息过期了,但由于排在此消息前面的消息没有过期,,导致过期的消息不能被清理,可能会造成什么问题?
例如想通过 消息TTL + 死信队列
来实现 延迟队列
的效果,就可能存在漏洞:
当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存
-> 商品即使有,也卖不出去).
实战开发
案例:给消息和队列都设置过期时间,队列的过期时间比消息时间短.
a)配置 交换机、队列、绑定
@Bean
fun ttlExchange(): DirectExchange = ExchangeBuilder
.directExchange(MQConst.TTL_EXCHANGE)
.build()
/**
* 消息设置过期时间有很多中方式,本质都是给扩展参数中添加 x-message-ttl
*/
@Bean
fun ttlQueue(): Queue = QueueBuilder
.durable(MQConst.TTL_QUEUE)
//.withArgument("x-message-ttl", 10 * 1000) //设置过期时间本质都是它
.ttl(10 * 1000) // 10s 过期
.build()
@Bean
fun ttlBinding(
@Qualifier("ttlExchange") exchange: DirectExchange,
@Qualifier("ttlQueue") queue: Queue,
): Binding = BindingBuilder
.bind(queue)
.to(exchange)
.with(MQConst.TTL_BINDING)
b)生产者接口
import com.cyk.rabbitmq.constants.MQConst
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/mq")
class MQApi(
private val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("ttl")
fun ttl(): String {
rabbitTemplate.convertAndSend(MQConst.TTL_EXCHANGE, MQConst.TTL_BINDING, "ttl msg 1") { msg ->
//给 msg 配置一些属性
val expire = 20 * 1000
msg.messageProperties.expiration = expire.toString()
return@convertAndSend msg
}
return "ok"
}
}
c)效果演示
SpringBoot 程序一启动,就可以看到交换机队列被创建,并且队列是带 ttl 的
此时向队列中发送一个过期时间是 20s 的消息,由于队列的过期时间是 10s,因此入队后,消息的过期时间变为 10s.
死信队列
概述
a)概念
死信(dead message):可以简单理解为因为种种原因,无法被消费的信息,就是死信.
死信交换机(DLX -> Dead Letter Exchange):如果普通队列设置了对应的死信交换机,那么队列中的生成的死信就会被路由到死信交换机.
死信队列(DLQ -> Dead Letter Queue):死信交换机会根据对应的普通队列设置的 routingKey ,将死信转发给死信队列,最后交给订阅的消费者消费.
b)消息变成死信的情况:
- 消息过期(消息TTL / 队列TTL) .
- 消息被拒绝(Basic.Reject / Basic.Nack),并且设置 requeue 参数为 false.
- 队列设置最大长度(maxLength).
c)死信队列解决消息积压问题:
消费者再处理消息时,可能会因为以下原因,异常处理消息:
- 网络短暂故障:通过
nack + requeue=true
可以实现消息重新入队,解决网路短暂故障. - 代码逻辑错误:如果还是
nack + requeue=true
,就会导致消息不断重复,不仅导致负载飙升,还是导致消息积压. 那么这里就可以使用nack + requeue=false + 死信队列
的方式,保证了消息可以被正确处理.
d)应用场景:
例如用户下订单,发送订单到普通队列,超过一定时间用户没有支付,消息就会变成死信. 此时就可以用一个消费者来处理死信,修改订单状态为取消.
e)注意事项(和之前 消息 TTL 同理):
例如想通过 消息TTL + 死信队列
来实现 延迟队列
的效果,就可能存在漏洞:
当前消息过期了,但是前面的消息没有过期,于是此消息依然在队列中,进而导致消息不能在过期的时候被死信队列接收到,那么进一步导致消息不能被消费者处理(如果是实现订单过期删除业务,就要出大问题了 -> 锁库存
-> 商品即使有,也卖不出去).
实战开发
a)分别指定 普通 和 死信 交换机、队列、绑定
@Configuration
class DLConfig {
@Bean
fun normalExchange(): DirectExchange = ExchangeBuilder
.directExchange(MQConst.NORMAL_EXCHANGE)
.build()
@Bean
fun normalQueue(): Queue = QueueBuilder
.durable(MQConst.NORMAL_QUEUE)
.deadLetterExchange(MQConst.DL_EXCHANGE)
.deadLetterRoutingKey(MQConst.DL_BINDING)
.ttl(10 * 1000)
//.maxLength(10L) //队列设置最大长度
.build()
@Bean
fun normalBinding(
@Qualifier("normalExchange") exchange: Exchange,
@Qualifier("normalQueue") queue: Queue,
): Binding = BindingBuilder
.bind(queue)
.to(exchange)
.with(MQConst.NORMAL_BINDING)
.noargs() //如果 交换机 是顶级接口,这里需要 noargs()
@Bean
fun dlExchange(): DirectExchange = ExchangeBuilder
.directExchange(MQConst.DL_EXCHANGE)
.build()
@Bean
fun dlQueue(): Queue = QueueBuilder
.durable(MQConst.DL_QUEUE)
.build()
@Bean
fun dlBinding(): Binding = BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with(MQConst.DL_BINDING)
}
b)生产者接口
@RequestMapping("dl")
fun dl(): String {
rabbitTemplate.convertAndSend(MQConst.NORMAL_EXCHANGE, MQConst.NORMAL_BINDING, "dl msg 1")
return "ok"
}
c)演示效果
触发生产者接口,可以看到前 10s,如下
消息过期之后,可以看到消息变为死信,最终路由到死信队列,如下: