文章目录
- RabbitMQ 消息确认机制
- 背景
- 消费者消息确认机制
- 概述
- 手动确认(RabbitMQ 原生 SDK)
- 手动确认(Spring-AMQP 封装 RabbitMQ SDK)
- AcknowledgeMode.NONE
- AcknowledgeMode.AUTO(默认)
- AcknowledgeMode.MANUAL
- MANUAL 可能会引发的问题
RabbitMQ 消息确认机制
背景
上图中可以看出,从生产者发送消息
到消费者接收到消息并正确处理
,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.
消费者消息确认机制
概述
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.
a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).
- 自动确认:当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后不管消费者是否真正的消费这些消息,都会从内存中删除.(适合对消息可靠性要求不高的场景).
- 手动确认:当 autoAck = false 时,RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令(波安排时间哦且确认消息),然后才会从 内存或磁盘 中删除消息.(适合对消息可靠性要求高的场景).
Ps:可靠性高了,性能也就下降了,所以请综合考虑.
b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认
和 nack否定确认
)
手动确认(RabbitMQ 原生 SDK)
消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.
a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.
Channel.basicAck(long deliveryTag, boolean multiple)
- deliveryTag:消息的唯一标识(单调递增的 long),特点如下:
- deliveryTag 是每个 Channel 通道独立维护,所以每个通道上的都是唯一的(生产者 和 Broker 建立一个 channel 会生成一个 deliverTag,消费者 和 Broker 建立一个 channel 会生成一个 deliverTag,这俩 deliverTag 是不同的).
- 当消费者 ack确认 一条消息时,必须使用对应的通道上 deliveryTag 进行确认.
- multiple:是否批量确认. 如果值为 true,那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息,大大减少了网络开销
- 假设 deliveryTag = 8,multiple = true:那么 deliveryTag <= 8 的消息都会被确认.
- 假设 deliveryTag = 8,multiple = false:只确认 8.
Ps:deliveryTag 确保了消息传递的可靠性和顺序性.
b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.
Channel.basicReject(long deliveryTag, boolean requeue)
- requeue:标识拒绝后,这条消息如何处理.
- requeue = true:消息会重新存入队列,将来会发送给下一个订阅的消费者.
- requeue = false:消息会从队列中移除,因此不会发送给消费者.
c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.
d)MQ 的管理平台上也提供了几种确认方式.
手动确认(Spring-AMQP 封装 RabbitMQ SDK)
Spring-AMQP 对消息确认提供了三种策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
这里根 RabbitMQ 原生 SDK 是有些不同的.
AcknowledgeMode.NONE
不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.
a)配置手动确认
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: none
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
println("业务逻辑处理完成")
}
}
d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
AcknowledgeMode.AUTO(默认)
分为以下情况:
- 消费者处理消息过程中
没有抛出异常
,则自动确认消息,然后从 队列 中移除消息. - 消费者处理消息过程中
抛出异常
,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).
a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: auto
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者(正常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
println("业务逻辑处理完成")
}
}
效果如下:
d)消费者(异常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
}
}
效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
AcknowledgeMode.MANUAL
分为以下情况:
- 消费者在处理完消息后显示调用 basicAck 方法 来确认消息,然后从 队列 中移除消息.
- 消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息,是否从队列中移除消息需要看 requeue 参数的值
- requeue = true:重返队列,不断重试.
- requeue = false:丢弃消息.
- 消费者在处理完消息后什么都不做,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).
a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: manual
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者(异常处理消息,requeue = true)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, true) //requeue: true
}
}
}
由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
d)消费者(异常处理消息,requeue = false)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, false) //requeue: false
}
}
}
由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
d)消费者(正常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, false) //requeue: false
}
}
}
消息被正常处理,返回 ack.
MANUAL 可能会引发的问题
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.
Ps:具体还是需要根据业务场景而定