RabbitMQ学习(六):发布确认

news2024/11/18 4:35:15

一、发布确认的原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队 列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消 息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

二、发布确认的策略

2.1 开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布 确认,都需要在 channel 上调用该方法 。

Channel channel = connection.createChannel();
channel.confirmSelect();

2.2 单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢。因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

核心代码:

for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = i + "";
    channel.basicPublish("", queueName, null, message.getBytes());
    //服务端返回 false 或超时时间内未返回,生产者可以消息重发
    boolean flag = channel.waitForConfirms();
    if(flag){
        System.out.println("消息发送成功");
    }
}

2.3 批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。

核心代码:

for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = i + "";
    channel.basicPublish("", queueName, null, message.getBytes());
    outstandingMessageCount++;
    //每发batchSize条消息,就确认一次
    if (outstandingMessageCount == batchSize) {
        channel.waitForConfirms();
        outstandingMessageCount = 0;
    }
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
    channel.waitForConfirms();
}

2.4 异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

我们不再在接收消息之后立即确认,而是将确认的任务交给broker。不管消息是否正常消费,消息队列都将收到消息。

核心代码:

/**
 * 确认收到消息的一个回调
 * 1.消息序列号
 * 2.true 可以确认小于等于当前序列号的消息
 * false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
    if (multiple) {
        //返回的是小于等于当前序列号的未确认消息 是一个 map
        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
        //清除该部分未确认消息
        confirmed.clear();
    }else{
        //只清除当前序列号的消息
        outstandingConfirms.remove(sequenceNumber);
    }
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
    String message = outstandingConfirms.get(sequenceNumber);
    System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};

/**
 * 添加一个异步确认的监听器
 * 1.确认收到消息的回调
 * 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = "消息" + i;
    /**
     * channel.getNextPublishSeqNo()获取下一个消息的序列号
     * 通过序列号与消息体进行一个关联
     * 全部都是未确认的消息体
    */
    outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
    channel.basicPublish("", queueName, null, message.getBytes());
}

2.5 如何处理异步未确认消息

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue。这个队列在 confirm callbacks 与发布线程之间进行消息的传 递。

2.6 以上 3 种发布确认速度对比

  • 单独发布

消息 同步等待确认,简单,但吞吐量非常有限

  • 批量发布

消息 批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难推断出是哪条 消息出现了问题

  • 异步处理

最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/348781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

V4l2框架基础知识(一)

V4L2框架-v4l2 device V4l2视频设备驱动基础 1.V4L2是专门为linux设备设计的整套视频框架&#xff08;其主要核心在linux内核&#xff0c;相当于操作系统上层的视频源捕获驱动框架&#xff09;&#xff0c;为上层访问系统底层的视频设备提供了一个统一的标准接口&#xff0c;…

【LeetCode】剑指 Offer 05. 替换空格 p50 -- Java Version

题目链接&#xff1a; https://leetcode.cn/problems/ti-huan-kong-ge-lcof/ 1. 题目介绍&#xff08;05. 替换空格&#xff09; 请实现一个函数&#xff0c;把字符串 s 中的每个空格替换成"%20"。 【测试用例】&#xff1a; 示例1&#xff1a; 输入&#xff1a;s …

TransH模型原理

从TransE到TransH模型 在之前知识图谱模型中&#xff0c;我们介绍了TransE模型的基本原理&#xff0c;对于TransE模型其基本原理为&#xff1a; hrth r thrt 其中hhh是头实体向量&#xff0c;rrr是关系向量&#xff0c;ttt是尾实体向量。根据这个核心公式&#xff0c;我们不…

AI工衣工服智能识别检测算法 yolov7

AI工衣工服智能识别检测算法通过yolov7网络模型深度学习算法&#xff0c;AI工衣工服智能识别检测算法对场人员穿戴进行实时不间断监测&#xff0c;发现现场人员未按要求穿戴时&#xff0c;立即抓拍告警。YOLO 的核心思想就是把目标检测转变成一个回归问题&#xff0c;利用整张图…

Unity 编辑器工具之批量设置图片压缩

一个简单的工具,对Unity下的图片做批量压缩处理,主要有以下功能:自动取消 "Generte Mip Maps" 勾选;针对文件夹批量自动(或手动选择压缩格式)设置图片压缩并自动保存;单个图片文件的压缩设置;使用方法,右键单张图片(或者包含图片的文件夹)会打开一个设置窗口 如下,窗…

Vue笔记(2)——页面渲染与数据收集

一、条件渲染 v-show v-if 1. v-show 2. v-if v-else的块和v-if的块间不能有中断&#xff0c;否则无效 3. v-if与template配合 当同时条件渲染多个元素时&#xff0c;可以将v-if与template的配合使用&#xff0c;若条件值为false&#xff0c;vue模板解析时会直接去掉这一块…

AcWing语法基础课笔记 第二章 printf语句与C++中的判断结构

第二章 printf语句与C中的判断结构 学习语言最好的方式就是实践&#xff0c;每当掌握一个新功能时&#xff0c;就要立即将这个功能应用到实践中。 ——闫学灿 一、printf输出格式 注意&#xff1a;使用printf 时最好添加头文件 #include <cstdio>。 Int、float、double、…

基于共聚焦显微技术的显微镜和荧光显微镜的区别

荧光显微镜主要应用在生物领域及医学研究中&#xff0c;能得到细胞或组织内部微细结构的荧光图像&#xff0c;在亚细胞水平上观察诸如Ca2 、PH值&#xff0c;膜电位等生理信号及细胞形态的变化&#xff0c;是形态学&#xff0c;分子生物学&#xff0c;神经科学&#xff0c;药理…

GEE学习笔记 八十九:在自己的APP中使用绘制矢量(中)

这一篇先讲一下ui.Map.GeometryLayer(...)&#xff0c;也就是生成显示的绘制矢量图形图层&#xff0c;具体来讲就是地图上左上角绘制的图形后添加的图层。 1、什么是GeometryLayer&#xff1f; &#xff08;1&#xff09;直接在地图上加载定义的图层 //1. add normal layer …

基于springboot+bootstrap+mysql+redis搭建一套完整的权限架构【二】【整合springSecurity】

1、创建数据库 注意&#xff1a;mysql默认字符集为utf8&#xff0c;默认排序规则为utf8_general_ci。一般我们也会选择字符集为utf-8 MySQL在5.5.3之后增加了这个utf8mb4的编码&#xff0c;utf8mb4完全向下兼容utf8&#xff0c;为了节省空间&#xff0c;一般情况下使用utf8也就…

中国国家级地面气象站基本气象要素日值数据集(V3.0)

数据集摘要 数据集包含了中国基本气象站、基准气候站、一般气象站在内的主要2474个站点1951年1月以来本站气压、气温、降水量、蒸发量、相对湿度、风向风速、日照时数和0cm地温要素的日值数据。数据量为21.3GB。 (1)SURF_CLI_CHN_MUL_DAY-TEM-12001-201501.TXT 气温数据TEM, 包…

央行数据-一款查逆回购 LPR 货币供应量 资产负债表 Shibor 数据的专业工具

自己开发的APP, App Store搜索"央行数据" 即可下载欢迎大家下载,给修改意见逆回购、正回购、MLF、票据&#xff0c;俗称央行发钱房贷基准利率多少? M2/M1/M0, 资产负债表,Shibor 了解下这款APP是经济,投资理财,股市,房价分析参考利器适用于关注经济、货币政策的用户…

第五章.与学习相关技巧—权重初始值(随机初始值,Xavier初始值,He初始值)

第五章.与学习相关技巧 5.2 权重初始值 本节将介绍权重初始值的推荐值&#xff0c;并通过实验确认神经网络的学习是否会快速进行。 1.权值衰减 权值衰减就是一种以减少权重参数的值为目的进行学习的方法&#xff0c;通过减少权重参数值来抑制过拟合的情况发生。 2.权重初始值不…

表现良好的最长时段[前缀和思想子数组]

前缀和与最长子数组前言一、表现良好的最长时间段二、前缀和思想&子数组1、前缀和&map2、前缀和&单调栈总结参考文献前言 对于子数组/子串问题&#xff0c;紧密连续前缀和/滑动窗口/单调栈&#xff1b;挖掘内在规律&#xff0c;可以简化代码&#xff0c;降低时空复…

Python多进程同步——文件锁

多个进程共享同一份资源&#xff08;共享内存、文件等&#xff09;时&#xff0c;会涉及到资源竞争问题。为了解决这种问题&#xff0c;一般采取的措施是进程在访问资源前加锁保护&#xff0c;避免多个进程同时读写。本文介绍的Python文件锁可以用来解决多进程的同步问题。 目录…

天荒地老修仙功-第六部第二篇:Spring Cloud Eureka自我保护机制

Eureka Server 在运行期间会去统计心跳失败比例在 15 分钟之内是否低于 85%&#xff0c;如果低于 85%&#xff0c;Eureka Server 会将这些实例保护起来&#xff0c;让这些实例不会过期&#xff0c;但是在保护期内如果服务刚好这个服务提供者非正常下线了&#xff0c;此时服务消…

[SCTF2019]babyre 题解

对未来的真正慷慨&#xff0c;是把一切献给现在。 ——加缪 目录 1.查壳 2.处理花指令&#xff0c;找到main函数 这一操作过程可以参考下面的视频&#xff1a; 3.静态分析第一部分,psword1 4.静态分析第二部分,psword2 5.静态分析第五部分&#xff0c;psword3 6.根据ps…

国产Linux操作系统读写RFID、NFC、IC卡示例源码

Windows系统应该是我们接触最多、最为熟悉的电脑端操作系统。Windows操作系统只能安装在x86指令集的CPU电脑中&#xff0c;x64是x86的升级版&#xff0c;Intel、Amd是x86指令集CPU最大的2个生产商。Windows系统下&#xff0c;外设接口驱动一般都封装成DLL动态库内&#xff0c;通…

基于springboot开发众筹平台前后台管理系统【完整源码+数据库+运行指导】

一、项目简介 本项目是一套基于springboot开发众筹平台前后台管理系统&#xff0c;主要针对计算机相关专业的正在做bishe的学生和需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目可以直接作为bishe使用。 项目都经过严格调试&#…

2023csoj寒假训练10

csoj寒假训练10 A 并查集 两个黑球之间距离不够这个白球通过的话&#xff0c;视为一个集合 考虑怎样维护这样两两之间的关系&#xff0c;我们使用并查集 同时黑球与直线的关系也要做一次维护 最后可以直接判断是否上下两条直线是否在一个集合里面 如果在一个集合里面说明…