4. 发布确认

news2025/1/12 19:02:34

4.1. 发布确认原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的
消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队
列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传
给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信
道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调
方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消
息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
4.2. 发布确认的策略
4.2.1.
开启发布确认的方法
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布
确认,都需要在 channel 上调用该方法
在这里插入图片描述
4.2.2.
单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。

public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.3.
批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地
提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现
问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种
方案仍然是同步的,也一样阻塞消息的发布。

public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.4.
异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,
他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,
下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述

public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long,
String>
outstandingConfirms
=
new
ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long,
String>
confirmed
=
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.5.
如何处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传
递。

4.2.6.
以上 3 种发布确认速度对比
单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条
消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

public static void main(String[] args) throws Exception {
//这个消息数量设置为 1000 好些 不然花费时间太长
publishMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
//运行结果
发布 1,000 个单独确认消息耗时 50,278 ms
发布 1,000 个批量确认消息耗时 635 ms
发布 1,000 个异步确认消息耗时 92 ms

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872393.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[python]RuntimeError: Can‘t decrement id ref count (unable to close file...

使用spectralspatial模型进行EEG分类时&#xff0c;出现以下错误 RuntimeError: Cant decrement id ref count (unable to close file, errno 5, error message Input/output error) Segmentation fault (core dumped) 猜测是因为存储的model太大了导致的&#xff0c;找到了…

QGIS3.28的二次开发八:显示shp的属性表

这里实现两个基本的 GIS 软件需求&#xff1a;矢量图层的属性表显示&#xff0c;以及根据属性筛选要素。 具体需求如下&#xff1a; 加载一个矢量图层并打开其属性表&#xff1b;输入筛选条件确认无误后&#xff0c;画布上和属性表中均只显示筛选后的要素。 QGIS 提供了若干…

V3s uboot 通过env 修改LCD 参数信息

实际项目中我们可能使用各种参数的LCD 显示器&#xff0c;有7吋&#xff0c;4.3 寸等等&#xff0c;我这里使用的uboot 版本是U-Boot 2017.01-rc2 &#xff0c;在make menuconfig 时候会填入lcd 配置信息&#xff0c;如下&#xff1a; 所以这里使用起来很不方便&#xff0c;查看…

Vulnhub: DriftingBlues: 3靶机

kali&#xff1a;192.168.111.111 靶机&#xff1a;192.168.111.192 信息收集 端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.192 查看robots.txt得到提示 访问eventadmins提示littlequeenofspades.html 查看littlequeenofspades.html源码 base64解密…

掌握Python的X篇_35_用Python为美女打码_图像库Pillow

本篇将会介绍python中的一个图像库Pillow。 文章目录 1. Pillow与PIL的关系2. 调整大小3. 加滤镜4. 剪裁5. 生成验证码 1. Pillow与PIL的关系 我们在网上搜python的图像库的话&#xff0c;可能搜到的时PIL。实际上之前python发展的时候就是PIL&#xff0c;这个库比较好用&…

IL汇编语言做一个窗体

网上看到一段代码&#xff0c; .assembly extern mscorlib {} .assembly Classes { .ver 1:0:1:0 } .namespace MyForm { .class public TestForm extends [System.Windows.Forms]System.Windows.Forms.Form { .field private class [System]…

C语言快速回顾(二)

前言 在Android音视频开发中&#xff0c;网上知识点过于零碎&#xff0c;自学起来难度非常大&#xff0c;不过音视频大牛Jhuster提出了《Android 音视频从入门到提高 - 任务列表》&#xff0c;结合我自己的工作学习经历&#xff0c;我准备写一个音视频系列blog。C/C是音视频必…

房屋中介系统springboot框架jsp房产信息管理java源代码

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 房屋中介系统springboot框架 系统有1权限&#xff1a…

SAP MM学习笔记19- SAP中的库存处理 单纯的退货,交货不足和过量交货

下面这篇文章讲了 SAP中的库存类型。本篇讲一些库存处理场景。 SAP MM学习笔记19- SAP中的库存类型&#xff0c;以及 保留在库的利用场景_东京老树根的博客-CSDN博客 1&#xff0c;单纯的退货 收货之后发现不合格货物&#xff0c;然后就想退货。而且退货之后不想再要了&#…

RES 系列 GRES: Generalized Referring Expression Segmentation 论文阅读笔记

RES 系列 GRES: Generalized Referring Expression Segmentation 论文阅读笔记 一、Abstract二、引言三、相关工作有关的指代任务和数据集指代分割方法 四、任务设置及数据集4.1 GRES 设置RES 回顾一般化的 RES评估 4.2 gRefCOCO&#xff1a;一个大尺度的 GRES 数据集多目标样本…

使用腾讯云轻量服务器Matomo应用模板建网站流量统计系统

腾讯云百科分享使用腾讯云轻量应用服务器Matomo应用模板搭建网站流量统计系统&#xff0c;Matomo 是一款开源的网站数据统计软件&#xff0c;可以用于跟踪、分析您的网站的流量&#xff0c;同时充分保障数据安全性、隐私性。该镜像基于 CentOS 7.6 64位操作系统&#xff0c;已预…

Docker技术入门教程

Docker技术入门教程 一、docker概念 一款产品从开发到上线&#xff0c;从操作系统&#xff0c;到运行环境&#xff0c;再到应用配置。作为开发运维之间的协作我们需要关心很多东西&#xff0c;这也是很多互联网公司都不得不面对的问题&#xff0c;特别是各种版本的迭代之后&a…

Digital thread中文术语标准化|Digital thread何时是“数字主螺纹”的意思?

文章仅供个人学习使用&#xff0c;请勿传播&#xff01; 原文来源&#xff1a; 段海波 数字孪生体实验室 2021-12-30 18:22 https://mp.weixin.qq.com/s/-bgMkSewxOsjhTiagUnfsw 作者一直以来主张区分数字孪生系统和数字孪生体&#xff0c;进而构建系统化的数字孪生概念和术语体…

时序预测 | MATLAB实现基于BiLSTM双向长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于BiLSTM双向长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于BiLSTM双向长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 Matlab实现BiLST…

个人对哈希数据结构学习总结 -- 理论篇

个人对哈希数据结构学习总结 -- 理论篇 引言哈希表设计思考哈希冲突Hash Functions冲突解决开放地址法(Open Addressing)分离链表法(Separate Chaining)Two-way Chaining Dynamic Hash Tableschained Hashingextendible hashinglinear hashing说明 spiral storage 使用场景小结…

深入了解 Postman Test 校验的使用方法

Postman 是一个广泛使用的 API 开发工具&#xff0c;它允许开发人员测试 API 的各个方面&#xff0c;包括请求、响应、身份验证等等&#xff0c;其中最常用的功能之一就是 Test 校验。那今天就一起来看看 Postman 的 Test 校验该如何使用。 Test 校验是什么&#xff1f; Test…

Java中的继承

目标&#xff1a; 1&#xff0c;认识继承 Java中提供了一个关键字extends&#xff0c;用这个关键字&#xff0c;可以让一个类和另一个类建立起父子关系。 继承的特点&#xff1a; 子类可以继承父类的非私有成员&#xff08;成员变量&#xff0c;成员方法&#xff09;&#x…

LeetCode_04Java_88. 合并两个有序数组

给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 注意&#xff1a;最终&#xff0c;合并后数组…

第三章 图论 No.11二分图,匈牙利算法与点覆盖

文章目录 二分染色&#xff1a;257. 关押罪犯增广路径372. 棋盘覆盖 最小点覆盖376. 机器任务 最大独立集378. 骑士放置 最小路径点覆盖 二分染色&#xff1a;257. 关押罪犯 257. 关押罪犯 - AcWing题库 最大最小问题&#xff0c;一眼二分 答案的范围在 [ 1 , 1 e 9 ] [1, 1…

数据驱动与关键字驱动

初次接触自动化测试时&#xff0c;对数据驱动和关键字驱动不甚理解&#xff0c;觉得有点故弄玄须&#xff0c;不就是参数和函数其嘛&#xff01;其实其也体现了测试所不同与开发的一些特点&#xff08;主要指系统测试&#xff09;&#xff0c;以及和对技术发展的脉络的展现。 …