文章目录
- 生产者消息确认机制
- 概述
- confirm 代码实现
- return 代码实现
生产者消息确认机制
概述
为了保证信息 从生产者 发送到 队列,因此引入了生产者的消息确认机制.
RabbitMQ 提供了两种解决方案:
- 通过事务机制实现.
- 通过发送确认机制(confirm 和 return)实现.
因为事务机制比较消耗性能,在实际工作中用的也不多,因此这里主要介绍 confirm 和 return
机制来实现发送放的确认.
a)confirm 确认模式
如上图,confirm 确认模式主要保障于 生产者 到 交换机 的消息可靠性.
具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ConfirmCallback 回调监听:
- 如果 Exchange
成功
收到消息,那么 ConfirmCallback 这个回调 ack 参数就为true
- 如果 Exchange
没有
收到消息,那么 ConfirmCallback 这个回调 ack 参数就为false
b)return 退回模式
如上图,confirm 确认模式主要保障于 交换机 到 队列 的消息可靠性.
具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ReturnsCallback 回调监听:
- 如果 Queue
成功
收到 Exchange 的消息,那么 ReturnsCallback 回调监听就不会收到任何消息
. - 如果 Queue
没有
收到 Exchange 的消息,那么 ReturnsCallback 回调监听就会收到该消息
.
confirm 代码实现
a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
publisher-confirm-type: correlated # 开启发送方确认机制
b)交换机、队列、绑定配置
@Bean("confirmExchange")
fun confirmExchange() = DirectExchange(MQConst.CONFIRM_EXCHANGE)
@Bean("confirmQueue")
fun confirmQueue() = Queue(MQConst.CONFIRM_QUEUE)
@Bean("confirmBinding")
fun confirmBinding(
@Qualifier("confirmExchange") exchange: DirectExchange,
@Qualifier("confirmQueue") queue: Queue,
): Binding = BindingBuilder
.bind(queue)
.to(exchange)
.with(MQConst.CONFIRM_BINDING)
c)confirmRabbitTemplate Bean 配置
@Configuration
class MQTemplateConfig {
/**
* 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)
*
* 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):
* 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象
* 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象
* 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate
*/
@Bean("rabbitTemplate")
fun rabbitTemplate(
connectionFactory: ConnectionFactory
): RabbitTemplate {
return RabbitTemplate(connectionFactory)
}
@Bean("confirmRabbitTemplate")
fun confirmRabbitTemplate(
connectionFactory: ConnectionFactory
): RabbitTemplate {
val tpl = RabbitTemplate(connectionFactory)
tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->
println("执行了 confirm ...")
if (ack) {
println("confirm ack: { 消息id: ${correlationData?.id} }")
} else {
println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")
//进行相应的业务处理...
}
})
return tpl
}
}
d)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val confirmRabbitTemplate: RabbitTemplate
) {
@RequestMapping("/confirm")
fun confirm(): String {
val data = CorrelationData("1")
confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)
return "ok"
}
}
此处演示无需消费者…
e)消息正确的路由到交换机,效果如下:
f)消息没有找到交换机(发送消息时,写了一个不存在的交换机的名字),效果如下:
return 代码实现
a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
publisher-confirm-type: correlated # 开启发送方确认机制
b)bean 的配置
@Configuration
class MQTemplateConfig {
/**
* 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)
*
* 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):
* 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象
* 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象
* 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate
*/
@Bean("rabbitTemplate")
fun rabbitTemplate(
connectionFactory: ConnectionFactory
): RabbitTemplate {
return RabbitTemplate(connectionFactory)
}
@Bean("confirmRabbitTemplate")
fun confirmRabbitTemplate(
connectionFactory: ConnectionFactory
): RabbitTemplate {
val tpl = RabbitTemplate(connectionFactory)
tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->
println("执行了 confirm ...")
if (ack) {
println("confirm ack: { 消息id: ${correlationData?.id} }")
} else {
println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")
//进行相应的业务处理...
}
})
//这里可以和 confirm模式 一起配置
//mandatory = true 属性是在告诉 rabbitmq,如果一个消息无法被任何队列消费,那么该消息就会返回给发送者,此时 ReturnCallback 就会被触发
//mandatory 相当于是开启 ReturnsCallback 前提
tpl.setMandatory(true)
tpl.setReturnsCallback(RabbitTemplate.ReturnsCallback { returned ->
println("执行了 return ...")
println("return: $returned")
})
return tpl
}
}
c)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val confirmRabbitTemplate: RabbitTemplate
) {
@RequestMapping("/confirm")
fun confirm(): String {
val data = CorrelationData("1")
confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)
return "ok"
}
}
d)正确的路由到队列,效果如下:
可以看到只有 confirm模式 被触发.
e)没有路由到队列(发送消息时,我改成了一个不存在的 routingKey 名字),效果如下: