RabbitMQ 发布确认机制

news2024/11/16 9:40:55

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /**
     * Wait until all messages published since the last call have been
     * either ack'd or nack'd by the broker.  Note, when called on a
     * non-Confirm channel, waitForConfirms throws an IllegalStateException.
     * @return whether all the messages were ack'd (and none were nack'd)
     * @throws java.lang.IllegalStateException
     */
    boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){
    channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列
        List<String> messages = new ArrayList<>();
        for (int i = 1; i < 20000 ;  i++){
            String msg = String.valueOf(i);
            messages.add(msg);
            channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
            // 每发送十条消息进行一次确认
            if(i > 0 && i % 10 == 0 ){
                // 如果确认不通过则将消息重新发送
                if(!channel.waitForConfirms()){
                    for (String e : messages) {
                        channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());
                    }
                }else{
                    // 如果确认成功则将这些消息从未应答队列中移除
                    messages.clear();
                }
            }
        }

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {
  void handleAck(long deliveryTag, boolean multiple)
      throws IOException;

  void handleNack(long deliveryTag, boolean multiple)
      throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体
        HashMap<Long,String> msgMap = new HashMap<>();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                msgMap.remove(deliveryTag);
            }

            /**
             * 如果消息应带结果为nack则重新发送该消息
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                String msg = msgMap.get(deliveryTag);
                if(msg != null){
                    channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
                }
            }
        });
        for (int i = 1; i < 20000 ;  i++){
            String msg = String.valueOf(i);
            // 将消息序号和消息存储map中
            msgMap.put(channel.getNextPublishSeqNo(),msg);
            channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        }

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/854783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用postman做接口测试

1.接口测试&#xff1a;针对软件对外提供服务的接口的输入输出进行测试&#xff0c;以及接口间相互逻辑的测试&#xff0c;验证接口功能与接口描述文档的一致性 2.接口测试流程&#xff1a; 1&#xff09;获取接口信息&#xff1a;通过接口文档或抓包来获取接口的基本调用方式和…

【脚踢数据结构】内核链表

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言,Linux基础,ARM开发板&#xff0c;软件配置等领域博主&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff01;送给自己和读者的一句鸡汤&#x1f914;&…

【iOS安全】开启任意app的WebView远程调试

参考&#xff1a;https://mp.weixin.qq.com/s/bNKxQaVrPaXsZ5BPbsXy7w &#xff08;来自周智老师的公众号&#xff09; 概述 Safari 有一个内置的前端调试器&#xff0c; 在iPhone通过局域网或者USB连接MacBook 并启用Safari 远程调试之后&#xff0c;前端调试器默认情况下对…

【机器学习1】什么是机器学习机器学习的重要性

什么是机器学习? 简而言之&#xff0c;机器学习就是训练机器去学习。 机器学习作为人工智能(Artificial Intelligence,AI)的一个分支&#xff0c;以其最基本的形式来使用算法通过从数据中获取知识来进行预测。 不同于人类通过分析大量数据手动推导规则和模型&#xff0c;机…

释放AI创作潜能:从大模型训练到高产力应用

文章目录 每日一句正能量前言什么是人工智能生成内容&#xff08;AIGC&#xff09;人工智能生成内容&#xff08;AIGC&#xff09;能做什么为什么要用人工智能生成内容&#xff08;AIGC&#xff09;创作成果用Java实现冒泡排序算法学生信息收集系统学生请假管理系统需求分析教务…

SpringBoot 依赖管理

Spring Boot 依赖管理 1. 父项目做依赖管理 无需关注版本号&#xff0c;自动版本仲裁机制 <!-- 依赖管理 --> <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version&g…

利用 PHP 特性绕 WAF 测试

在测试绕过 WAF 执行远程代码之前&#xff0c;首先构造一个简单的、易受攻击的远程代码执行脚本&#xff0c;内容如图&#xff1a; 第 6 行是一个比较明显的命令执行代码&#xff0c;第 3 行尝试拦截 system、exec 或 passthru 等函数&#xff08;PHP 中有许多其他函数可以执行…

CTF REVERSE练习之脱壳分析

今天要介绍脱壳分析的实验。壳&#xff0c;在自然界中&#xff0c;植物用壳来保护种子&#xff0c;动物用壳来保护身体等。同样&#xff0c;在一些计算机软件里也有一段专门负责保护软件不被非法修改或反编译的程序。他们附加在原程序上通过Windows加载器载入内存后&#xff0c…

FreeRTOS(任务管理的创建、删除、挂起、恢复)

目录 一、任务的基本概念 二、任务状态的概念 1、Running—运行态&#xff1a; 2、Ready—就绪态 3、Blocked—阻塞态 4、Suspended—挂起态 三、任务状态的切换 四、系统启动 1、vTaskStartScheduler()函数 1.1 作用 1.2 启动函数介绍 2、空闲任务 2.1 空闲任务的作…

mac安装vscode 配置git

1、安装vscode 官网地址 下载mac稳定版安装很慢的解决办法 (转自) mac电脑如何解决下载vscode慢的问题 选择谷歌浏览器右上角的3个点&#xff0c;选择下载内容&#xff0c;右键选择复制链接地址&#xff0c;在新窗口粘贴地址&#xff0c; 把地址中的一段替换成下面的cscode.sd…

新的里程碑!纪念正月十六工作室博客总访问量突破两百万

时值盛夏&#xff0c;清风徐徐&#xff0c;不觉间我们的博客访问量又迈入了新的里程碑——访问量突破两百万&#xff01; 总访问量突破百万&#xff1a; 个人成就&#xff1a; 记得上次突破重大里程碑还是去年夏天&#xff0c;那时我们重修岳阳楼&#xff0c;追往忆&#…

小程序商品如何设置阶梯价?

阶梯价在电商小程序中是一种常见的销售策略&#xff0c;可以吸引更多的消费者并提高销售额。下面将介绍一些怎么设置小程序产品的阶梯价的方法。 1. 添加/修改商品的时候&#xff0c;点击阶梯价&#xff0c;会弹出阶梯价设置界面。 2. 设置阶梯价规则。例如&#xff0c;当消费者…

http相关知识点

文章目录 长链接http周边会话保持方案1方案2 基本工具postmanFiddlerFiddler的原理 长链接 一张网页实际上可能会有多种元素组成&#xff0c;这也就说明了网页需要多次的http请求。可由于http是基于TCP的&#xff0c;而TCP创建链接是有代价的&#xff0c;因此频繁的创建链接会…

gSpan算法执行步骤详解示例

目录 1. 问题描述2. gSpan算法步骤2.1 数据预处理2.2 深度递归挖掘频繁子图2.2.1 获取所有的频繁边2.2.2 深度递归挖掘频繁子图 参考文献 1. 问题描述 gSpan 是一款图规则挖掘算法&#xff0c;目标是从现有的图集中挖掘频繁子图。如下图中包含三个图&#xff1a; 其中圆圈为顶…

centos安装python3的多个版本

标题 原本安装了python3.6&#xff0c;但是又有一个项目需要py3.7&#xff0c;所以需要让两个版本共存 操作 1、下载py3.7 wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz2、解压 tar -xzvf Python-3.7.3.tgz进入到文件夹 cd Python-3.7.33、安装 本人c…

【Go 基础篇】Go语言初探:第一段代码与执行过程解析

介绍 Go语言&#xff08;也称为Golang&#xff09;作为一门现代化的编程语言&#xff0c;以其简洁的语法、高效的性能和丰富的标准库而受到了广泛关注和使用。对于初学者来说&#xff0c;编写和执行第一段Go代码是迈向这门语言的重要一步。本篇博客将带您深入了解Go语言的第一…

BLE蓝牙协议栈分析

BLE——协议层次结构 一、BLE Controller Controller实现射频相关的模拟和数字部分&#xff0c;完成最基本的数据发送和接收&#xff0c;Controller对外接口是天线&#xff0c;对内接口是主机控制器接口HCI&#xff08;Hostcontroller interface&#xff09;&#xff1b; 控制…

46.利用matlab绘制维安尼曲线(matlab程序)

1.代码 clear close all syms s t k u r; x12*sin(s)*cos(t);y12*sin(s)*sin(t);z12*cos(s); x2-2*cos(k)*cos(k);y22*sin(k)*cos(k);z2u; subplot(1,2,1);ezmeshc(x2,y2,z2,[0,pi,-2,2]); %绘制圆柱面 hold on; ezsurf(x1,y1,z1,[-pi,pi,0,pi]); %绘制球面 title( 球面与圆柱…

SpringBoot--发邮件的方法(有示例)

原文网址&#xff1a;SpringBoot--发邮件的方法(有示例)_IT利刃出鞘的博客-CSDN博客 简介 本文介绍SpringBoot发邮件的方法(有示例)。 依赖 pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-ma…

nginx简介与安装配置,目录结构和配置文件介绍

一.nginx简介 1.简介 2.特性 二.nginx安装 1.rpm包方式 &#xff08;1&#xff09;下载扩展源 &#xff08;2&#xff09;安装扩展rpm包&#xff0c;nginx -V查看配置参数&#xff0c;后面源码安装时要用到 2.源码方式 &#xff08;1&#xff09;建议提前下好所需要的部…