四、发布确认

news2025/3/10 5:24:20

1、发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了;

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理kao

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息

2、发布确认策略

2.1 开启发布确认的方法

发布确认模式时没有开启的,如果需要开启,则需要在channel上调用 confirmSelect() 方法

//开启发布确认
channel.confirmSelect();

2.2 单个发布确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。

waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
(也就是说调用 channel.waitForConfirms() 后,信道会一直等待消息进行确认后才会返回true,否则一直阻塞,直到超时发生异常)

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

//1、单个确认模式
    public static void pulishMessageSingleConfirm() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //开启发布确认模式
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息发送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
        }
    }

2.3 批量发布确认

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布

思路:攒100个消息一起进行发布,当发布完第一百个消息时,监听消息是否被确认

 //2、批量发布确认模式
    public static void pulishMessageBatchConfirm() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //开启发布确认模式
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            //生产者每次发布100个消息,确认一次
            int batchConfirmSize = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                batchConfirmSize++;
                if (batchConfirmSize % 100 == 0) {
                    boolean flag = channel.waitForConfirms();
                    if (flag) {
                        System.out.println("消息发送成功");
                    }
                    batchConfirmSize = 0;
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
        }
    }

2.4 异步发布确认

维护一个所有已发布消息的map,通过回调函数传递回来当前确认的消息,然后从map中移除掉已经确认的消息,剩下的就是已经发布但是没有确认的消息
在这里插入图片描述

//3、异步发布确认模式
    public static void pulishMessageAsyncConfirm() throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            //创建一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            /**
             * 线程安全有序的一个哈希表,适用于高并发的情况
             * 1、轻松的将序号和消息进行关联
             * 2、轻松的批量删除条目  只需要给到序列号
             * 3、支持并发访问
             * */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
            /**
             * 确认收到消息的一个回调
             * 1、参数1:当前收到的消息的序列号
             * 2、参数2:是否批量确认
             */
            ConfirmCallback ackCallBack = (sequenceNumber, mutiple) -> {
                if (mutiple) {
                    System.out.println("生产者发布的消息" + outstandingConfirms.get(sequenceNumber) + "被确认,序列号" + sequenceNumber);
                    //返回的是小于等于当前序列号的未确认消息,是一个map
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
                    //消除该部分未确认消息
                    confirmed.clear();
                } else {
                    System.out.println("生产者发布的消息" + outstandingConfirms.get(sequenceNumber) + "被确认,序列号" + sequenceNumber);
                    //只消除当前序号的消息
                    outstandingConfirms.remove(sequenceNumber);
                }

            };
            //消息未确认的回调
            ConfirmCallback nackCallBack = (sequenceNumber, mutiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("生产者发布的消息" + message + "未被确认,序列号" + sequenceNumber);
            };
            /*
              添加一个异步确认的监听器
                1、确认收到消息的回调
                2、未收到消息的回调
             */
            channel.addConfirmListener(ackCallBack, nackCallBack);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
        }
    }

2.5 如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列

2.6 上述三种发布确认模式比较

  • 单个发布确认:同步等待确认,简单,但吞吐量非常有限。
  • 批量发布确认:简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题
  • 异步发布确认:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

某小公司面试记录

记录一次面试过程&#xff0c;还有一些笔试题&#xff0c;挺简单的&#xff0c;排序&#xff0c;去重&#xff0c;this指向&#xff0c;深浅拷贝&#xff0c;微任务的执行顺序&#xff0c;变量提升等。 ES6数组新增的方法 Array.from&#xff1a; 将两类对象转为真正的数组&am…

微信又变天!

大家好&#xff0c;我是良许。 不知道大家有没发现&#xff0c;过去两周&#xff0c;微信又双叒改版了&#xff01; 这个改版&#xff0c;喜欢看公众号的小伙伴可能会不习惯&#xff0c;作为公众号的作者更为难受&#xff0c;用一个变天来形容都不为过。 微信又搞啥幺蛾子呢…

软件测试---测试分类

一 : 按测试对象划分 1.1 可靠性测试 可靠性&#xff08;Availability&#xff09;即可用性&#xff0c;是指系统正常运行的能力或者程度&#xff0c;一般用正常向用户提供软件服务的时间占总时间的百分比表示。 1.2 容错性测试 行李箱 , 四个轮子 , 坏了一个 , 说明这个容错…

如何在香港BGP服务器上进行安全性和隐私性配置?

​  香港BGP服务器是在香港运营的&#xff0c;它是基于BGP多线路的网络拓扑所构建的服务器&#xff0c;主要面向于中国内地和海外地域。香港BGP服务器庞大的市场扩张&#xff0c;引来了国内外企业的眼光。然而&#xff0c;如果想要确保香港BGP服务器上的数据安全可靠&#xf…

Tapdata Cloud 基础课:新功能详解之「微信告警」,更及时的告警通知渠道

【前言】作为中国的 “Fivetran/Airbyte”, Tapdata 是一个以低延迟数据移动为核心优势构建的现代数据平台&#xff0c;内置 60 数据连接器&#xff0c;拥有稳定的实时采集和传输能力、秒级响应的数据实时计算能力、稳定易用的数据实时服务能力&#xff0c;以及低代码可视化操作…

MFC界面控件BCGControlBar v33.4 - 支持Win 11 Mica material主题

BCGControlBar库拥有500多个经过全面设计、测试和充分记录的MFC扩展类。 我们的组件可以轻松地集成到您的应用程序中&#xff0c;并为您节省数百个开发和调试时间。BCGControlBar专业版和BCGSuite for MFC v33.4已正式发布了&#xff0c;该版本包含了对Windows 11 Mica materia…

小Redis:开源一款迷你C++17 KV内存型数据库

A KV high-performance mini-database based on memory and C17 This project is inspired by Redis source code. 部分模仿Redis源码。 https://github.com/ZYunfeii/MiniKV Command line tools Developed command line tool kvctl. value type:string yunfeiubuntu:~/Min…

JavaScript函数之prototype原型和原型链

文章目录1. 原型2. 显式和隐式原型3. 原型链3.1 访问顺序4. instanceof4.1 如何判断1. 原型 函数的prototype属性 每个函数都有一个prototype属性&#xff0c;它默认指向一个Object空对象&#xff08;即&#xff1a;原型对象&#xff09;。原型对象中有一个属性constructor&a…

【C++从入门到放弃】类和对象(中)———类的六大默认成员函数

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《C从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; 类和对…

Python | 蓝桥杯进阶第一卷——字符串

欢迎交流学习~~ 专栏&#xff1a; 蓝桥杯Python组刷题日寄 蓝桥杯进阶系列&#xff1a; &#x1f3c6; Python | 蓝桥杯进阶第一卷——字符串 &#x1f50e; Python | 蓝桥杯进阶第二卷——递归&#xff08;待续&#xff09; &#x1f49d; Python | 蓝桥杯进阶第三卷——动态…

论文阅读-End-to-End Open-Domain Question Answering with BERTserini

论文链接&#xff1a;https://aclanthology.org/N19-4013.pdf 目录 摘要 1 简介 2 背景及相关工作 3 系统架构 3.1 Anserini Retriever 3.2 BERT 阅读器 4 实验结果 5演示 6结论 摘要 我们展示了一个端到端的问答系统&#xff0c;它将 BERT 与开源 Anserini 信息检索…

MSYS2安装

最近在学习windows上编译FFmpeg&#xff0c;需要用到msys2&#xff0c;在此记录一下安装和配置过程。 点击如下链接&#xff0c;下载安装包&#xff1a; Index of /msys2/distrib/x86_64/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 我下载的是&#xff1a;ms…

相信人还是相信ChatGPT,龙测首席AI专家给出了意料之外的答案

最近&#xff0c;关于ChatGPT的话题太火了&#xff01;各大社交软件都是他的消息&#xff01;从去年12月份ChatGPT横空出世&#xff0c;再到近期百度文心一言、复旦Moss的陆续宣布&#xff0c;点燃了全球对AIGC&#xff08;内容人工智能自动生成&#xff09;领域的热情&#xf…

搭建Bitbucket项目管理工具详细教程

目录 1.安装前准备 2.jdk安装 2.1.rpm安装方式&#xff1a; 3.创建bitbucket数据库 4.安装Git 5.安装bitbucket 5.1下载完成上传至服务器的 /usr/atlassian/ 目录下 5.2安装atlassian-bitbucket-7.21.0 5.3安装MySQL驱动 5.4破解激活bitbucket 1.安装前准备 首先查看操…

Python 之网络式编程

一 客户端/服务器架构 即C/S架构&#xff0c;包括 1、硬件C/S架构&#xff08;打印机&#xff09; 2、软件B/S架构&#xff08;web服务&#xff09; C/S架构与Socket的关系&#xff1a; 我们学习Socket就是为了完成C/S的开发 二 OSI七层 引子&#xff1a;   计算机组成…

【Spark分布式内存计算框架——Spark Streaming】13. 偏移量管理(下)MySQL 存储偏移量

6.3 MySQL 存储偏移量 此处将偏移量数据存储到MySQL表中&#xff0c;数据库及表的DDL和DML语句如下&#xff1a; -- 1. 创建数据库的语句 CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci; USE db_spark ; -- 2. 创建表的语句 CRE…

蓝牙资讯|2022 年 Q4 全球 TWS 耳机出货量 7900 万部

Canalys 最新数据显示&#xff0c;2022 年第四季度&#xff0c;全球个人智能音频设备出货量下降 26%&#xff0c;跌至 1.1 亿部。所有品类的出货量都面临不一的下滑趋势&#xff0c;甚至是一直支撑市场的 TWS 品类也遭遇 23% 两位数的下降至 7900 万部。 全球市场方面&#x…

MySQL中varchar(M)存储字符串过长

最近写项目&#xff0c;数据库报了一个错&#xff0c;错误原因是MySQL中存储的字符串过长最近在学MySQL的基础&#xff0c;刚好学到了关于varchar类型要存储的字符串是 “<p>12121212121212</p>\n<p><img src\"https://zzjzzjzzjbucket.oss-cn-hangz…

附录5-大事件项目前端

目录 1 前言 2 用到的插件 2.1 截取图像 cropper 2.2 富文本编辑器 tinymce 3 项目结构 4 config.js 5 主页 5.1 iframe 5.2 页面的宽高 5.3 修改文章 6 个人中心-基本资料 7 个人中心-更换头像 8 个人中心-更换密码 9 文章管理-文章分类 10 文章…

Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

Springboot集成kafka一、前言&#x1f525;二、环境说明&#x1f525;三、概念&#x1f525;四、CentOS7安装kafka&#x1f525;1.下载kafka安装包2.下载好后&#xff0c;进行解压六、kafka项目集成&#x1f525;1️⃣pom引入2️⃣配置kafka3️⃣一个kafka消息发送端4️⃣定义一…