RabbitMQ高级特性 - 生产者消息确认机制

news2025/1/20 16:54:29

文章目录

  • 生产者消息确认机制
    • 概述
    • confirm 代码实现
    • return 代码实现

生产者消息确认机制


概述

在这里插入图片描述
为了保证信息 从生产者 发送到 队列,因此引入了生产者的消息确认机制.

RabbitMQ 提供了两种解决方案:

  • 通过事务机制实现.
  • 通过发送确认机制(confirm 和 return)实现.

因为事务机制比较消耗性能,在实际工作中用的也不多,因此这里主要介绍 confirm 和 return 机制来实现发送放的确认.

a)confirm 确认模式
在这里插入图片描述
如上图,confirm 确认模式主要保障于 生产者 到 交换机 的消息可靠性.

具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ConfirmCallback 回调监听:

  • 如果 Exchange 成功收到消息,那么 ConfirmCallback 这个回调 ack 参数就为 true
  • 如果 Exchange 没有收到消息,那么 ConfirmCallback 这个回调 ack 参数就为 false

b)return 退回模式
在这里插入图片描述
如上图,confirm 确认模式主要保障于 交换机 到 队列 的消息可靠性.

具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ReturnsCallback 回调监听:

  • 如果 Queue 成功收到 Exchange 的消息,那么 ReturnsCallback 回调监听 就不会收到任何消息.
  • 如果 Queue 没有收到 Exchange 的消息,那么 ReturnsCallback 回调监听 就会收到该消息.

confirm 代码实现

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    publisher-confirm-type: correlated # 开启发送方确认机制

b)交换机、队列、绑定配置

    @Bean("confirmExchange")
    fun confirmExchange() = DirectExchange(MQConst.CONFIRM_EXCHANGE)
    @Bean("confirmQueue")
    fun confirmQueue() = Queue(MQConst.CONFIRM_QUEUE)
    @Bean("confirmBinding")
    fun confirmBinding(
        @Qualifier("confirmExchange") exchange: DirectExchange,
        @Qualifier("confirmQueue") queue: Queue,
    ): Binding = BindingBuilder
        .bind(queue)
        .to(exchange)
        .with(MQConst.CONFIRM_BINDING)

c)confirmRabbitTemplate Bean 配置

@Configuration
class MQTemplateConfig {

    /**
     * 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)
     *
     * 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):
     * 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象
     * 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象
     * 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate
     */
    @Bean("rabbitTemplate")
    fun rabbitTemplate(
        connectionFactory: ConnectionFactory
    ): RabbitTemplate {
        return RabbitTemplate(connectionFactory)
    }

    @Bean("confirmRabbitTemplate")
    fun confirmRabbitTemplate(
        connectionFactory: ConnectionFactory
    ): RabbitTemplate {
        val tpl = RabbitTemplate(connectionFactory)
        tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->
            println("执行了 confirm ...")
            if (ack) {
                println("confirm ack: { 消息id: ${correlationData?.id} }")
            } else {
                println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")
                //进行相应的业务处理...
            }
        })
        return tpl
    }

}

d)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(
    val confirmRabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/confirm")
    fun confirm(): String {
        val data = CorrelationData("1")
        confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)
        return "ok"
    }

}

此处演示无需消费者…

e)消息正确的路由到交换机,效果如下:
在这里插入图片描述
在这里插入图片描述

f)消息没有找到交换机(发送消息时,写了一个不存在的交换机的名字),效果如下:
在这里插入图片描述
在这里插入图片描述

return 代码实现

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    publisher-confirm-type: correlated # 开启发送方确认机制

b)bean 的配置

@Configuration
class MQTemplateConfig {

    /**
     * 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)
     *
     * 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):
     * 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象
     * 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象
     * 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate
     */
    @Bean("rabbitTemplate")
    fun rabbitTemplate(
        connectionFactory: ConnectionFactory
    ): RabbitTemplate {
        return RabbitTemplate(connectionFactory)
    }

    @Bean("confirmRabbitTemplate")
    fun confirmRabbitTemplate(
        connectionFactory: ConnectionFactory
    ): RabbitTemplate {
        val tpl = RabbitTemplate(connectionFactory)
        tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->
            println("执行了 confirm ...")
            if (ack) {
                println("confirm ack: { 消息id: ${correlationData?.id} }")
            } else {
                println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")
                //进行相应的业务处理...
            }
        })
        //这里可以和 confirm模式 一起配置
        //mandatory = true 属性是在告诉 rabbitmq,如果一个消息无法被任何队列消费,那么该消息就会返回给发送者,此时 ReturnCallback 就会被触发
        //mandatory 相当于是开启 ReturnsCallback 前提
        tpl.setMandatory(true)
        tpl.setReturnsCallback(RabbitTemplate.ReturnsCallback { returned ->
            println("执行了 return ...")
            println("return: $returned")
        })
        return tpl
    }

}

c)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(
    val confirmRabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/confirm")
    fun confirm(): String {
        val data = CorrelationData("1")
        confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)
        return "ok"
    }

}

d)正确的路由到队列,效果如下:
可以看到只有 confirm模式 被触发.
在这里插入图片描述
e)没有路由到队列(发送消息时,我改成了一个不存在的 routingKey 名字),效果如下:
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1978709.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CPU利用率100%该怎么办

1 节拍率 Linux 作为一个多任务操作系统,将每个 CPU 的时间划分为很短的时间片,再通过调度器轮流分配给各个任务使用,因此造成多任务同时运行的错觉。 为了维护 CPU 时间,Linux 通过事先定义的节拍率(内核中表示为 H…

AI大模型定级体系

前言:一直以来人们对通用人工智能(AGI)的定义始终缺乏一个具体的衡量标准,而现在OpenAI已创建了一套分级系统。 AI大模型定级 OpenAI对于其大模型的定级有一个独特的分级体系,旨在描述其人工智能系统的发展阶段以及距…

网络安全埋头干活,也要抬头看路

24年的Hvv大家干的正是热火朝天,也有的干的无可奈何,确实Hvv的核心其实是在Hvv前的准备阶段,Hvv中可能更多的是盯监控。 7月份我也出了两趟差,看了一下在Hvv项目上的小伙伴,30%的时间是在处理误报,60%的时…

51 for 循环与 while 循环

Python 主要有 for 循环和 while 循环两种形式的循环结构,多个循环可以嵌套使用,并且还经常和选择结构嵌套使用来实现复杂的业务逻辑。 while 循环一般用于循环次数难以提前确定的情况,当然也可以用于循环次数确定的情况。 for 循环一般用于…

共享`pexlinux`数据文件的网络服务

实验环境准备: 1.红帽7主机 2.要全图形安装 3.配置网络为手动,配置网络可用 4.关闭vmware DHCP功能 一、kickstart自动安装脚本制作 1.安装图形化生成kickstart自动脚本安装工具 2.启动图形制作工具 3.图形配置脚本 这里使用的共享方式是http&#xff0…

SpringBoot + Vue + ElementUI 的人力资源管理系统-附项目源码与配套文档

摘 要 在如今这个人才需求量大的时代,各方企业为了永葆企业的活力与生机,在不断开 拓进取的同时,又广泛纳用人才,为企业的长久发展奠定了基础。于是,各个企业与部 门机构,都不可避免地会接触到人力资源管理…

微信小程序之behaviors

目录 概括 Demo演示 进阶演示 1. 若具有同名的属性或方法 2. 若有同名的数据 3. 若有同名的生命周期函数 应用场景 最后 属性&方法 组件中使用 代码示例: 同名字段的覆盖和组合规则 概括 一句话总结: behaviors是用于组件间代码共享的特性, 类似一…

03 RabbitMQ:HelloWorld

03 RabbitMQ:HelloWorld 1. 目标2. 实现2.1. 新建Spring Boot 项目2.1.1. 新建生产者(producer)项目2.1.2. 新建生产者(consumer)项目 2.2. 导入依赖2.3. 代码2.3.1. 发送消息(producer)2.3.2. …

TiKV Raft 快照全流程丨TiKV 源码解读(二十二)

导读 TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会的顶级项目。它通过 Raft 协议实现数据的高可用性和强一致性,是 TiDB 分布式数据库系统的重要组成部分。本文作为 TiKV 源码解读系列的增补,详细介绍了 TiKV 8.2.…

2024华数杯C题保姆级分析完整思路+代码+数据教学

2024华数杯C题保姆级分析完整思路代码数据教学 C题题目:老外游中国 接下来我们将按照题目总体分析-背景分析-各小问分析的形式来 1 总体分析: 题目要求本题目基于中国境内旅游景点数据,旨在通过数学建模解决外国游客在中国旅游时可能遇到的…

安装pytorch GPU方法

参考全网最详细的安装pytorch GPU方法,一次安装成功!!包括安装失败后的处理方法!-CSDN博客 整体来看,一共下面三个安装步骤: 显卡驱动(nvidia-smi)-》显卡深度学习驱动&#xff08…

三十种未授权访问漏洞复现 合集( 四 )

未授权访问漏洞介绍 未授权访问可以理解为需要安全配置或权限认证的地址、授权页面存在缺陷,导致其他用户可以直接访问,从而引发重要权限可被操作、数据库、网站目录等敏感信息泄露。---->目录遍历 目前主要存在未授权访问漏洞的有:NFS服务&a…

日志系统——整体框架

日志等级模块: 该模块描述了日志消息的各种等级debug,info,warn,error,fatal,off(off为最高等级,屏蔽一切日志消息),并提供描述日志等级的方法 日志消息模块: 该模块负责构建日志消息对象,此对象管理着一条日志中的各项…

【Python网络爬虫案例】python爬虫之爬取豆瓣电影信息

🔗 运行环境:PYTHON 🚩 撰写作者:左手の明天 🥇 精选专栏:《python》 🔥 推荐专栏:《算法研究》 #### 防伪水印——左手の明天 #### 💗 大家好🤗&#x1f91…

【C语言】计算四则运算,中缀表达式转换为后缀表达式

C语言编程—中缀表达式转换为后缀表达式 思路: 中缀转后缀保存结果栈:stack,保存数据和-*/ 操作符栈:op_stack,保存-*/() 场景一:遇到数据,直接入栈stack 场景二:遇到"(&qu…

海康笔试题

1. 2. 块设备:磁盘设备驱动、SD设备驱动 字符设备:终端设备驱动 网络设备:网络设备驱动 (1)linux操作系统驱动程序分为三大类:字符设备驱动、快设备驱动和网络设备驱动 (2)字符设…

2024 年华数杯全国大学生数学建模竞赛C 题 老外游中国 完整思路 源代码 模型结果(仅供学习)

最近,“city 不 city”这一网络流行语在外国网红的推动下备受关注。随着我国过境免签政策的落实,越来越多外国游客来到中国,通过网络平台展示他们在华旅行的见闻,这不仅推动了中国旅游业的发展,更是在国际舞台上展现了…

基于X86+FPGA助力实现电力系统的智能监测与高效管理

电力监控 信迈提供基于Intel平台、Xilinx平台、Rockchip平台、NXP平台、飞腾平台的Mini-ITX主板、Micro-ATX主板、ATX主板、嵌入式准系统/工业整机等计算机硬件。产品算力强大,支持高速存储,提供丰富串口、USB、LAN、PCIe扩展接口、显示接口等I/O接口&am…

【python】数据类型之列表类型(上)

本篇文章将讲解列表类型。 列表(list),是一个有序且可变的容器,在里面可以存放多个不同类型的元素。 列表中的元素之间用逗号(英文中的逗号)相隔。 1、定义: 例如: user_list[]…

stl容器 vector的基本操作

目录 1.vector构造 1.1默认构造函数 1.2 fill 填充构造函数 ​编辑 1.3 范围构造函数(Range Constructor) 1.4拷贝构造函数 2.initializer_list初始化vector 3.迭代器 4.常用的几个成员 4.1 size()统计当前有效字符个数 4.2 capacity ve…