RabbitMQ 高级特性——发送方确认

news2025/1/5 9:10:55

在这里插入图片描述

文章目录

  • 前言
  • 发送方确认
    • confirm 确认模式
    • return 退回模式
  • 常见面试题

前言

前面我们学习了 RabbitMQ 中交换机、队列和消息的持久化,这样能够保证存储在 RabbitMQ Broker 中的交换机和队列中的消息实现持久化,就算 RabbitMQ 服务发生了重启或者是宕机,也不会导致交换机和消息的丢失。那么这个机制是保证存储在 RabbitMQ Broker 中的可靠性,但是对于生产者发送的消息如果都到达不了 RabbitMQ 的话,那么这些持久化操作也就没有意义了,那么对于生产者发送的消息,生产者如何知道消息是否已经成功到达 RabbitMQ Broker 了呢?这里就需要用到 RabbitMQ 发送发确认这个特性了,前面我们大概的讲了一下 RabbitMQ Java Client 中的 Publisher/confirm 发送方确认,那么这篇文章我们将学习在 SpringBoot 中如何实现发送方确认。

发送方确认

其实对于上面的问题,RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认机制实现

因为使用事务机制的话比较消耗性能,在实际工作中使用的不多,所以我们就主要介绍发送方确认的机制来实现发送方的确认。并且对于发送方确认的机制 RabbitMQ 也为我们提供了两个方式来控制消息的可靠性。

  1. confirm 确认模式
  2. return 退回模式

confirm 模式是确认消息是否到达指定的 Exchange 交换机的,而 return 退回模式则是确认消息是否到达指定队列的。

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果 Exchange 成功收到,ACK (Ackonwledge character 确认字符)为 true,如果没有收到消息,ACK 就为 false。

那么下面我们就来看看在 SpringBoot 中如何实现 confirm 确认模式:

首先在配置文件中配置信息:

spring:
	rabbitmq:
		publisher-confirm-type: correlated #消息发送确认

然后设置确认回调函数的内容并且发送消息:

无论消息是否成功送到,都会执行这个回调函数,确认消息是否成功送达的判断依据就是 ACK 的值:

public class Constants {
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    public static final String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitConfig {
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
    }

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
    }

    @Bean("confirmBinding")
    public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("confirm");
    }

    @Bean("confirmRabbitTemplate")
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
    	//创建新的RabbitTemplate对象,并且设置confirm回调函数
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
                }else {
                    System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
                }
            }
        });
        return rabbitTemplate;
    }
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/confirm")
    public String confirm() {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","rabbitmq confirm",correlationData);
        return "消息确认成功";
    }
}

在这里插入图片描述
然后我们指定交换机的时候,如果指定一个不存在的交换机,也就是消息无法到达指定的交换机,那么看看时候会执行确认回调函数:

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + 1,"confirm","rabbitmq confirm",correlationData);

2024-08-13 14:47:52.646 ERROR 11252 — [3.57.1.114:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
消息接受失败,id:1,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)

可以看到,如果消息没有到达指定的交换机,那么也是会执行相应的回调函数的。

public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 的区别:

  • RabbitTemplate.ConfirmCallback:这是Spring AMQP库提供的一个回调接口,主要用于在使用RabbitTemplate发送消息时,接收来自RabbitMQ服务器的确认信息。这些确认信息表明消息是否已成功发送到RabbitMQ的交换机(Exchange)。
  • ConfirmListener:这个接口或功能更多是直接与RabbitMQ的Channel相关,而不是直接通过Spring AMQP的RabbitTemplate来使用的。它用于监听RabbitMQ Channel上的消息确认事件,包括消息的ACK(确认)和NACK(不确认)。这种方式通常需要更底层的操作,直接处理RabbitMQ的Channel和连接。

return 退回模式

当消息成功到达 Exchange 交换机的时候,交换机会根据路由规则匹配对应的队列,将消息路由到指定的队列,在消息从 Exchange 到 Queue 的过程中,如果一条消息无法被任何队列消费(即没有队列与消息的 Routing Key 匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。

那么使用 SpringBoot 如何实现 return 退回模式呢?

首先还是需要进行配置,配置和上面的 confirm 模式是一样的:

spring:
	rabbitmq:
		publisher-returns: true #设置回退

设置返回回调逻辑并发送消息:

@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
            }else {
                System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
            }
        }
    });
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.printf("消息被退回:%s",returnedMessage);
        }
    });
    return rabbitTemplate;
}

setConfirmCallback() 和 setReturnCallback() 方法可以同时存在也可以单独设置。

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm1","rabbitmq confirm",correlationData);

发送消息的时候,我们的 Routing Key 设置为没有 Binding Key 与之匹配的,然后来看看这个 returnCallback 是否会被执行:

消息被退回:ReturnedMessage [message=(Body:‘rabbitmq confirm’ MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]
消息接收成功,id:1

消息成功到达了 Exhcange,但是没有到达指定的队列,所以执行了 returnCallback 方法。

public class ReturnedMessage {
	//返回的消息对象,包含了消息体和消息属性
	private final Message message;
	//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.
	private final int replyCode;
	//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
	private final String replyText;
	//消息被发送到的交换机名称
	private final String exchange;
	//消息的路由键,即发送消息时指定的键
	private final String routingKey;
}

常见面试题

如何保证 RabbitMQ 消息的可靠传输:

在这里插入图片描述

从这个图中可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到RabbitMQ失败
    a. 可能原因: 网络问题等
    b. 解决办法: 参考本章节[发送方确认-confirm确认模式]

  2. 消息在交换机中无法路由到指定队列:
    a. 可能原因: 代码或者配置层面错误,导致消息路由失败
    b. 解决办法: 参考本章节[发送方确认-return模式]

  3. 消息队列自身数据丢失
    a. 可能原因: 消息到达RabbitMQ之后,RabbitMQ Server宕机导致消息丢失。
    b. 解决办法: 参考本章节[持久性]。开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)

  4. 消费者异常,导致消息丢失
    a. 可能原因: 消息到达消费者,还没来得及消费,消费者宕机。消费者逻辑有问题。
    b. 解决办法: 参考本章节[消息确认]。RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制(参考下一章节),当消息消费异常时,通过消息重试确保消息的可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2151200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Android】浅析MVC与MVP

【Android】浅析MVC与MVP 什么是架构&#xff1f; 架构&#xff08;Architecture&#xff09;在软件开发中指的是软件系统的整体设计和结构&#xff0c;它描述了系统的高层组织方式&#xff0c;包括系统中各个组件之间的关系、依赖、交互方式&#xff0c;以及这些组件如何协同…

基于OpenCV的YOLOv5图片检测

利用OpenCV的DNN模块加载onnx模型文件进行图片检测。 1、使用的yolov5工程代码&#xff0c;调用export.py导出onnx模型。 2、下载opencv版本&#xff0c;https://opencv.org/releases/ 使用opencv版本4.5.3或以上&#xff0c;本文使用的opencv4.6.0 3、使用vc20…

4.使用 VSCode 过程中的英语积累 - View 菜单(每一次重点积累 5 个单词)

前言 学习可以不局限于传统的书籍和课堂&#xff0c;各种生活的元素也都可以做为我们的学习对象&#xff0c;本文将利用 VSCode 页面上的各种英文元素来做英语的积累&#xff0c;如此做有 3 大利 这些软件在我们工作中是时时刻刻接触的&#xff0c;借此做英语积累再合适不过&a…

STM32 使用 CubeMX 实现按键外部中断

目录 问题背景知识参考需要改什么注意尽量不要在中断函数使用 循环函数做延时中断函数中延时方法调试 问题 我想实现按钮触发紧急停止类似功能&#xff0c;需要使用按键中断功能。 背景知识 GPIO 点亮 LED。stm32cubemx hal学习记录&#xff1a;GPIO输入输出。STM32—HAL库 …

【实战篇】MySQL是怎么保证高可用的?

背景 在一个主备关系中&#xff0c;每个备库接收主库的 binlog 并执行。正常情况下&#xff0c;只要主库执行更新生成的所有 binlog&#xff0c;都可以传到备库并被正确地执行&#xff0c;备库就能达到跟主库一致的状态&#xff0c;这就是最终一致性。 但是&#xff0c;MySQL…

免费在线压缩pdf 压缩pdf在线免费 推荐简单好用

压缩pdf在线免费&#xff1f;在日常生活和工作学习中&#xff0c;处理PDF文件是常见任务。但有时PDF文件体积较大&#xff0c;给传输、存储和分享带来不便。因此&#xff0c;学习PDF文件压缩技巧十分必要。压缩PDF文件是指通过技术手段减小文件占用的存储空间&#xff0c;同时尽…

[Redis][Hash]详细讲解

目录 0.前言1.常见命令1.HSET2.HGET3.HEXISTS4.HDEL5.HKEYS6.HVALS7.HGETALL8.HMGET9.HLEN10.HSETNX11.HINCRBY12.HINCRBYFLOAT 2.内部编码1.ziplist(压缩链表)2.hashtable(哈希表) 3.使用场景4.缓存方式对比1.原⽣字符串类型2.序列化字符串类型3.哈希类型 0.前言 在Redis中&am…

CSS - 通用左边图片,右边内容,并且控制长度溢出处理模板(vue | uniapp | 微信小程序)

前言 通用模板&#xff0c;可适用于任意前端项目。 如下图所示&#xff0c;手机电脑通用。 示例代码 根据自己的需求修改即可。 <body><div class"container"><!-- 头像图片 --><img class"avatar" src"https://cdn.uviewui.com…

C++初阶学习——探索STL奥秘——标准库中的priority_queue与模拟实现

1.priority_queque的介绍 1.priority_queue中文叫优先级队列。优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。 2. 此上下文类似于堆&#xff0c;在堆中可以随时插入元素&#xff0c;并且只能检索最大堆元…

学习大数据DAY59 全量抽取和增量抽取实战

目录 需求流程&#xff1a; 需求分析与规范 作业 作业2 需求流程&#xff1a; 全量抽取 增量抽取 - DataX Kettle Sqoop ... 场景: 业务部门同事或者甲方的工作人员给我们的部门经理和你提出了新的需 求 流程: 联系 > 开会讨论 > 确认需求 > 落地 需求文档( 具体…

Vue 项目中引入 Axios 详解

Vue 项目中引入 Axios 详解 在 Vue 项目中&#xff0c;axios 是一个非常流行的 HTTP 客户端&#xff0c;用于向服务器发送请求并处理响应。本文将详细说明如何在 Vue 项目中引入 Axios 插件&#xff0c;以及如何进行基本的配置&#xff0c;包括构建、配置域名、设置全局错误拦…

WEB攻防-JS项目Node.js框架安全识别审计验证绕过

知识点&#xff1a; 1、原生JS&开发框架-安全条件 2、常见安全问题-前端验证&未授权 详细点&#xff1a; 1、什么是JS渗透测试&#xff1f; 在JavaScript中也存在变量和函数&#xff0c;当存在可控变量及函数调用即可参数漏洞 2、流行的Js框架有哪些&#xff1f; …

CC1链的第二种方式-LazyMap版调用链

文章目录 CC1链的第二种方式-LazyMap版调用链LazyMap构造payloadCC1的调用链 CC1链的第二种方式-LazyMap版调用链 CC1链的第一种方式可以参考另一篇文章&#xff1a;CC1链_全网最菜的分析思路 LazyMap 在之前的CC1链中分析&#xff0c;其实是其中一种方式&#xff08;国内版本…

全面解析流量态势感知与网络性能监控:IT运维中的核心技术

在现代IT运维中&#xff0c;网络的稳定性和业务的连续性是企业赖以生存的基石。随着数字化转型的深入&#xff0c;网络流量日益复杂&#xff0c;安全威胁愈加严峻&#xff0c;运维人员不仅需要确保网络的顺畅运行&#xff0c;还必须及时发现潜在风险并快速响应。流量态势感知与…

如何查看Android设备的dpi

adb shell getprop ro.sf.lcd_density adb shell cat /system/build.prop > build_prop.txt shell cat system/build.prop 结果&#xff1a;参考&#xff1a; 如何查看Android设备的dpi_安卓 查看手机dpi-CSDN博客

ABAP-Swagger 一种公开 ABAP REST 服务的方法

ABAP-Swagger An approach to expose ABAP REST services 一种公开 ABAP REST 服务的方法 Usage 1: develop a class in ABAP with public methods 2: implement interface ZIF_SWAG_HANDLER, and register the public methods(example method zif_swag_handler~meta) 3: …

ElementUI 用span-method实现循环el-table组件的合并行功能

需要把指定列的相同数据合并起来&#xff08;项目中用的是updateTime&#xff09; 后端返回的数据格式&#xff1a; html&#xff1a; <el-tab-pane label"执行记录概览" name"fourth" v-loading"loading"><el-timeline v-if"re…

单元测试和unittest框架(超详细总结)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;薪资嘎嘎涨 单元测试的定义 1. 什么是单元测试&#xff1f; 单元测试是指&#xff0c;对软件中的最小可测试单元在与程序其他部分相隔离的情况下进行检查和验证的工作&am…

电子烟智能化创新体验:WTK6900P语音交互芯片方案,融合频谱计算、精准语音识别与流畅音频播报

一&#xff1a;开发背景 在这个科技日新月异的时代&#xff0c;每一个细节的创新都是对传统的一次超越。今天&#xff0c;我们自豪地宣布一项革命性的融合——将先进的语音识别技术与电子烟相结合&#xff0c;通过WTK6900P芯片的卓越性能&#xff0c;为您开启前所未有的个性化…

【有啥问啥】摄像头成像质量量化标准解读与测试方法

摄像头成像质量量化标准解读与测试方法 在自动驾驶和智能驾驶舱领域&#xff0c;摄像头是关键的感知设备&#xff0c;直接关系到系统的环境感知能力。为确保摄像头在实际应用中表现出色&#xff0c;需明确了解其成像质量标准和测试方法。本文将围绕成像质量的核心指标、测试方…