RabbitMQ之消息应答和持久化

news2025/1/24 2:23:39

文章目录

  • 前言
  • 一、消息应答
    • 1.概念
    • 2.自动应答
    • 3.消息应答方法
    • 4.Multiple 的解释
    • 5.消息自动重新入队
    • 6.消息手动应答代码
    • 7.手动应答效果演示
  • 二、RabbitMQ持久化
    • 1.概念
    • 2.队列如何实现持久化
    • 3.消息实现持久化
    • 4.不公平分发
    • 5.预取值
  • 总结


前言

在RabbitMQ中,我们的消费者在应答方面有需要注意的地方,接下来就跟我博主一起去了解一下吧!


一、消息应答

1.概念

  • 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
  • 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

2.自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

3.消息应答方法

  • A.Channel.basicAck(用于肯定确认),RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • B.Channel.basicNack(用于否定确认)
  • C.Channel.basicReject(用于否定确认),与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

4.Multiple 的解释

手动应答的好处是可以批量应答并且减少网络拥堵
在这里插入图片描述

  • multiple 的 true 和 false 代表不同意思
    • true 代表批量应答 channel 上未应答的消息
      • 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答
    • false 同上面相比
      • 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

在这里插入图片描述
在这里插入图片描述

5.消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息
未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者
可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。
在这里插入图片描述

6.消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。
在这里插入图片描述

消息生产者

public class Task02 {
 	private static final String TASK_QUEUE_NAME = "ack_queue";
 	public static void main(String[] argv) throws Exception {
 		try (Channel channel = RabbitMqUtils.getChannel()) {
 			channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
 			Scanner sc = new Scanner(System.in);
 			System.out.println("请输入信息");
 			while (sc.hasNext()) {
 				String message = sc.nextLine();
 				channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
 				System.out.println("生产者发出消息" + message);
 			}
 		}
 	}
}

消费者01

public class Work03 {
 	private static final String ACK_QUEUE_NAME="ack_queue";
 	public static void main(String[] args) throws Exception {
 		Channel channel = RabbitMqUtils.getChannel();
 		System.out.println("C1 等待接收消息处理时间较短");
 		//消息消费的时候如何处理消息
 		DeliverCallback deliverCallback=(consumerTag,delivery)->{
 			String message= new String(delivery.getBody());
 			SleepUtils.sleep(1);
 			System.out.println("接收到消息:"+message);
 			/**
 			* 1.消息标记 tag
 			* 2.是否批量应答未应答消息
 			*/
 			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 		};
 		//采用手动应答
 		boolean autoAck=false;
 		channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 			System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 		});
 	}
}

消费者02

public class Work04 {
 	private static final String ACK_QUEUE_NAME="ack_queue";
 	public static void main(String[] args) throws Exception {
 		Channel channel = RabbitMqUtils.getChannel();
 		System.out.println("C2 等待接收消息处理时间较长");
 		//消息消费的时候如何处理消息
 		DeliverCallback deliverCallback=(consumerTag,delivery)->{
 			String message= new String(delivery.getBody());
 			SleepUtils.sleep(30);
 			System.out.println("接收到消息:"+message);
 			/**
 			* 1.消息标记 tag
 			* 2.是否批量应答未应答消息
 			*/
 			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 		};
 		//采用手动应答
 		boolean autoAck=false;
 		channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 			System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 		});
 	}
}

睡眠工具类

public class SleepUtils {
 	public static void sleep(int second){
 		try {
 			Thread.sleep(1000*second);
 		} catch (InterruptedException _ignored) {
 			Thread.currentThread().interrupt();
 		}
 	}
}

7.手动应答效果演示

正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理
在这里插入图片描述
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了。
在这里插入图片描述

二、RabbitMQ持久化

1.概念

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

2.队列如何实现持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
在这里插入图片描述但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
在这里插入图片描述
以下为控制台中持久化与非持久化队列的 UI 显示区
在这里插入图片描述
这个时候即使重启 rabbitmq 队列也依然存在。

3.消息实现持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
在这里插入图片描述
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

4.不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1);

在这里插入图片描述
在这里插入图片描述

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

5.预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
在这里插入图片描述

总结

以上就是RabbitMQ之消息应答和持久化的相关知识,希望对你有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1209772.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

image J 对Western blot 条带进行灰度分析 量化分析

用ImageJ对条带进行定量分析 | Public Library of Bioinformatics (plob.org) 3分钟Get!大牛教你用 image J 对Western blot 条带进行灰度分析! - 哔哩哔哩 (bilibili.com) 科研人员做的western blot实验一般需要对其结果扫描后进行灰度分析&#xff0…

在qt的设计师界面没有QVTKOpenGLWidget这个类,只有QOpenGLWidget,那么我们如何得到QVTKOpenGLWidget呢?

文章目录 前言不过,时过境迁,QVTKOpenGLWidget用的越来越少,官方推荐使用qvtkopengnativewidget代替QVTKOpenGLWidget 前言 在qt的设计师界面没有QVTKOpenGLWidget这个类,只有QOpenGLWidget,我们要使用QVTKOpenGLWidget,那么我们如何得到QVTKOpenGLWidget呢? 不过,时过境迁,Q…

Docker-minio部署

1.创建目录 创建文件目录,用来存放配置和上传文件目录 (1)Minio 外部挂载的配置文件(/mydata/minio/config) (2)存储上传文件的目录(/mydata/minio/data) mkdir -p /home/minio/config mkdir -p /home/minio/data2.拉…

大文件分片上传、断点续传、秒传

小文件上传 后端&#xff1a;SpringBootJDK17 前端&#xff1a;JavaScriptsparkmd5.min.js 一、依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.2</ve…

demo(一)eureka----服务注册与提供

下面写一个简单的demo验证下eureka&#xff0c;实现服务注册、服务发现。 一、单节点&#xff1a; 1、api&#xff1a; 封装其他组件需要共用的dto 2、eureka-service服务注册中心&#xff1a; &#xff08;1&#xff09;pom: <?xml version"1.0" encoding&q…

[论文分享] Never Mind the Malware, Here’s the Stegomalware

Never Mind the Malware, Here’s the Stegomalware [IEEE Security & Privacy 2022] Luca Caviglione | National Research Council of Italy Wojciech Mazurczyk | Warsaw University of Technology and FernUniversitt in Hagen 近年来&#xff0c;隐写技术已逐渐被观…

生产环境中的面试问题,实时链路中的Kafka数据发现某字段值错误,怎么办?...

大家好呀&#xff0c;今天分享的是一个生产环境中遇到的问题。也是群友遇到的一个面试问题。 原问题是&#xff1a; 早晨8点之后发现kafka的record中某个字段的值出现了错误&#xff0c;现在已经10点了&#xff0c;需要对kafka进行数据订正&#xff0c;怎么样定位和解决这个问题…

【星海出品】SDN neutron (四) 流分析

Neutron框架之流分析 1.控制端neutron-server通过wsgi接收北向REST API请求&#xff0c;neutron-plugin通过rpc与设备端进行南向通信。 2.设备端agent则向上通过rpc与控制端进行通信&#xff0c;向下则直接在本地对网络设备进行配置。 3.Neutron-agent的实现很多&#xff0c;彼…

神经网络中的量化与蒸馏

本文将深入研究深度学习中精简模型的技术&#xff1a;量化和蒸馏 深度学习模型&#xff0c;特别是那些具有大量参数的模型&#xff0c;在资源受限环境中的部署几乎是不可能的。所以就出现了两种流行的技术&#xff0c;量化和蒸馏&#xff0c;它们都是可以使模型更加轻量级&…

Kubernetes介绍以及Kubernetes快速部署

Kubernetes介绍以及Kubernetes快速部署 文章目录 Kubernetes介绍以及Kubernetes快速部署1.Kubernetes介绍&#xff1a;1.1.Kubernetes简介1.2. Kubernetes应用部署方式演变1.3.Kubernetes功能1.4.Kubernetes工作原理1.5.工作流程1.6.优缺点 2.Kubernetes环境部署2.1.环境说明2.…

Leetcode179. 最大数

Every day a Leetcode 题目来源&#xff1a;179. 最大数 解法1&#xff1a;贪心 对于数组 nums 中的任意两个数 a 和 b&#xff0c;我们将其转换为字符串后&#xff0c;如果 to_string(a) to_string(b) > to_string(b) to_string(a)&#xff0c;说明 a 应该放在 b 的前…

arcgis--填充面域空洞

方法一&#xff1a;使用【编辑器】-【合并工具】进行填充。首选需要在相同图层中构造一个填充空洞的面域&#xff0c;然后利用【合并】工具进行最后填充。 打开一幅含有空洞的矢量数据&#xff0c;如下&#xff1a; 打开【开始编辑】-【构造工具】-【面】进行覆盖空洞的面域的…

k8s_base

应用程序在服务器上部署方式的演变,互联网发展到现在为止 应用程序在服务器上部署方式 历经了3个时代1. 传统部署 优点简单 缺点就是操作系统的资源是有限制的&#xff0c;比如说操作系统的磁盘&#xff0c;内存 比如说我8G&#xff0c;部署了3个应用程序&#xff0c;当有一天…

客服易中招的3大常见职业病及缓解方法分享

1️⃣无论和谁聊天都可以“亲”~“亲亲”我也很无奈&#xff0c;但真的习惯了&#xff08;其实也不是什么病啦&#xff0c;还能让朋友觉得我性格变好了呢哈哈哈&#xff09; 2️⃣鼠标手&#xff08;腕管综合征&#xff09;其实很多上班族都有这个职业病。但由于我们客服工作属…

中睿天下加入中关村华安关键信息基础设施安全保护联盟

近日&#xff0c;中睿天下正式加入中关村华安关键信息基础设施安全保护联盟&#xff0c;成为其会员单位。 中关村华安关键信息基础设施安全保护联盟是由北京市科学技术委员会、中关村科技园区管理委员会指导支持&#xff0c;经北京市民政局批准&#xff0c;于2023年8月正式注册…

【JavaEE】Servlet API 详解(HttpServlet类)

一、HttpServlet 写 Servlet 代码的时候, 首先第一步就是先创建类, 继承自HttpServlet, 并重写其中的某些方法 1.1 HttpServlet核心方法 1.2 Servlet生命周期 这些方法的调用时机, 就称为 “Servlet 生命周期”. (也就是描述了一个 Servlet 实例从生到死的过程) 1.3 处理G…

pycharm/vscode 配置black和isort

Pycharm blackd Pycharm中有插件可以实现后台服务运行black&#xff1a;BlackConnect 安装 配置 Pycharm isort pycharm中&#xff0c;isort没有插件&#xff0c;暂使用外部工具实现&#xff0c;外部工具也可添加快捷键实现快捷对文件、文件夹进行format import&#xff1…

向量矩阵范数pytorch

向量矩阵范数pytorch 矩阵按照某个维度求和&#xff08;dim就是shape数组的下标&#xff09;1. torch1.1 Tensors一些常用函数 一些安装问题cd进不去不去目录PyTorch里面_表示重写内容 在默认情况下&#xff0c;PyTorch会累积梯度&#xff0c;我们需要清除之前的值 范数是向量或…

学【Java多态】-- 写高质量代码

多态的实现条件 在java中要实现&#xff0c;必须要满足如下几个条件&#xff0c;缺一不可。 1.必须在继承体系下2.子类必须要对父类中的方法进行重写3.通过父类的引用调用冲写的方法。 想要真正的学好多态需要去学习一些前置知识&#xff0c;那我们直接开始吧&#xff01; …

Learning reliable modal weight with transformer for robust RGBT tracking

论文&#xff1a;《Learning reliable modal weight with transformer for robust RGBT tracking》 针对问题&#xff1a;局部线性匹配容易丢失语义信息 解决方法&#xff1a;为了增强特征表示和深化语义特征&#xff0c;分别设计了一种基于改进的Resnet-50的模态权值分配策略…