RabbitMQ消息的应答

news2025/2/25 16:32:15

消息的应答机制

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了, RabbitMQ 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵,批量应答的批量范围是channel 上未应答的消息。比如说 channel 上有传送 tag 的消息5,6,7,8 当前 tag 是8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。但是实际上还是不建议开启批量应答的。

image-20220530100237882

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

Channel.basicNack(用于否定确认)

不处理该消息了直接拒绝,可以将其丢弃了。

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数。

不处理该消息了直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

代码实现

生产者
public class MyProducer {

    @Test
    public void test() throws Exception {
        // 队列名称
        String queue = "xw_queue";
        String message = "Hello World -> ";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 20; i++) {
            // 发布消息
            channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());
        }
    }
}
消费者1

开启手动确认后,消费者1如果在处理消息的回调中不确认消息,那么队列中的消息会处于unacked的状态,如果消费者1突然挂掉,那么这些未确认的消费会重新发送给其他消费者。

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 队列名称
        String queue = "xw_queue";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind("", "xw_exchange", queue);
        // 配置开启手动应答
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费完成后手动应答
                // channel.basicAck(envelope.getDeliveryTag(), false);
                Thread.sleep(5000);
                System.out.println("消费者1:接收到消息: " + new String(body));
            }
        });
    }
}
消费者2

消费者2正常消费消息,收到消息后立刻确认。

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 队列名称
        String queue = "xw_queue";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind("", "xw_exchange", queue);
        // 配置开启手动应答
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费完成后手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println("消费者2:接收到消息: " + new String(body));
            }
        });
    }
}

效果展示

image-20220530102415347

image-20220530102445700

此时把消费者1停掉,那么上面这10条为确认的消费会重新入队,发送给另外的消费者。

image-20220530102642150

image-20220530102708467

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1268576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp打包ios有时间 uniapp打包次数

我们经常用的解决方案有,分包,将图片上传到服务器上,减少插件引入。但是还有一个方案好多刚入门uniapp的人都给忽略了,就是在源码视图中配置,开启分包优化。 1.分包 目前微信小程序可以分8个包,每个包的最大存储是2M,也就是说你文件总体的大小不能超过16M,每个包的大…

前端开发_HTML

简介 CSS用于美化内容 HTML用于摆放内容 可以理解为HTML是基础&#xff0c;CSS是工具 HTML定义 HTML 超文本标记语言——HyperText Markup Language 超文本——链接 标记——标签&#xff0c;即带尖括号的文本 标签语法 双标签 开始标签&#xff1a; <xxx> 即尖…

模糊C均值聚类(Fuzzy C-means clustering,FCM)的基本概念,详细流程以及广泛应用!

文章目录 1.基本概念2. FCM的详细流程3.FCM的应用 1.基本概念 模糊C均值聚类&#xff08;Fuzzy C-means clustering&#xff0c;FCM&#xff09;是一种软聚类方法&#xff0c;它允许数据点属于多个聚类中心&#xff0c;每个聚类中心都有一个权重。与传统的硬聚类方法&#xff…

hyper-V操作虚拟机ubuntu 22.03

安装hyper-V 点击卸载程序 都勾选上即可 新建虚拟机&#xff0c;选择镜像文件 选择第一代即可 设置内存 配置网络 双击 启动安装虚拟机 输入用户名 zenglg 密码&#xff1a;LuoShuwen123456 按照enter键选中openssh安装 安装中 安装完成 选择重启 输入用户名、密码

回归预测 | MATLAB实现基于LightGBM算法的数据回归预测(多指标,多图)

回归预测 | MATLAB实现基于LightGBM算法的数据回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现基于LightGBM算法的数据回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MATLA…

《数据结构、算法与应用C++语言描述》-优先级队列-大根堆的C++实现

优先级队列 完整可编译运行代码见&#xff1a;Github::Data-Structures-Algorithms-and-Applications/_25Priority queue 定义 优先级队列&#xff08;priority queue&#xff09;是0个或多个元素的集合&#xff0c;每个元素都有一个优先权或值&#xff0c;对优先级队列执行…

使用Redis实现接口防抖

说明&#xff1a;实际开发中&#xff0c;我们在前端页面上点击了一个按钮&#xff0c;访问了一个接口&#xff0c;这时因为网络波动或者其他原因&#xff0c;页面上没有反应&#xff0c;用户可能会在短时间内再次点击一次或者用户以为没有点到&#xff0c;很快的又点了一次。导…

Django路由分发

首先明白一点&#xff0c;Django的每一个应用下都可以有自己的templates文件夹&#xff0c;urls.py文件夹&#xff0c;static文件夹&#xff0c;基于这个特点&#xff0c;Django能够很好的做到分组开发&#xff08;每个人只写自己的app&#xff09;&#xff0c;作为老大&#x…

基于springboot+vue的在线考试系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

20. Matplotlib 数据可视化

目录 1. 简介2. Matplotlib 开发环境2.1 画图2.2 画图接口2.4 线形图2.5 散点图2.6 等高线图2.7 直方图 1. 简介 Matplotlib网址&#xff1a;https://matplotlib.org/ 数据可视化是数据分析中最重要的工作之一。Matploblib是建立在Numpy数组基础上的多平台数据可视化程序库&a…

UI自动化测试工具工作原理是怎样的?

随着软件开发的不断演进&#xff0c;保障软件质量成为了至关重要的一环。在这个过程中&#xff0c;UI自动化测试工具崭露头角&#xff0c;为开发团队提供了一种强有力的方式来确保应用程序的稳定性、功能性和兼容性。本文将深入探讨UI自动化测试工具的定义、工作原理以及其在提…

FOC系列(三)----AS5600磁编码器

一、 关于AS5600 1.1 芯片内部框图和引脚功能介绍 具体的内容大家可以查看数据手册&#xff1a;AS5600数据手册&#xff0c;在这里只是对一下重要的地方进行说明。    系统框图如下&#xff1a;    电源设计选项&#xff0c;我在设计时选择的是第二种电源方案&#xff0c…

基于Django+Tensorflow卷积神经网络鸟类识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介系统概述系统功能核心技术系统架构系统优势 二、功能三、系统四. 总结  总结 一项目简介 介绍一个基于DjangoTensorflow卷积神经网络鸟类识别系统是一个非…

NoSQL大数据存储技术思考题及参考答案

思考题及参考答案 第1章 绪论 1. NoSQL和关系型数据库在设计目标上有何主要区别&#xff1f; (1)关系数据库 优势&#xff1a;以完善的关系代数理论作为基础&#xff0c;具有数据模型、完整性约束和事务的强一致性等特点&#xff0c;借助索引机制可以实现高效的查询&#xf…

前五年—中国十大科技进展新闻(2012年—2017年)

前五年—中国十大科技进展新闻&#xff08;2012-2017&#xff09; 2017年中国十大科技进展新闻1. 我国科学家利用化学物质合成完整活性染色体2. 国产水下滑翔机下潜6329米刷新世界纪录3. 世界首台超越早期经典计算机的光量子计算机诞生4. 国产大型客机C919首飞5. 我国首次海域天…

SQL HAVING 子句详解:在 GROUP BY 中更灵活的条件筛选

SQL HAVING子句 HAVING子句被添加到SQL中&#xff0c;因为WHERE关键字不能与聚合函数一起使用。 HAVING语法 SELECT column_name(s) FROM table_name WHERE condition GROUP BY column_name(s) HAVING condition ORDER BY column_name(s);演示数据库 以下是Northwind示例数…

【BUG合集】(一)①数据库存1/0,请求结果返回true和false;②sql查数据库能查,但mybatis查为空;③data64图片存储为异常;

前言 最近&#xff0c;在工作上接手的任务中&#xff0c;各种 bug 问题出现&#xff0c;在解决的同时也可以记录一下。因此&#xff0c;觉得可以出个记录 bug 合集。方便后来者碰到类似情况&#xff0c;可以作为一个参考进行解决。 文章题目就包含当前文章内容中所遇到的三个 b…

立刻解决缺少msvcp140_1.dll解决方法,msvcp140_1.dll修复指南

在日常使用电脑的过程中&#xff0c;我们有时会遇到由于某些重要的DLL文件缺失而导致的程序无法正常启动的问题。很多用户可能都经历过由于缺少msvcp140_1.dll导致应用程序无法运行的情况。本文将为你提供解决msvcp140_1.dll缺失问题的详尽方法&#xff0c;附带对每种方法优点和…

配置自动化部署Jenkins和Gitea

配置自动化部署 这里使用的是JenkinsGitea 如果不知道怎么安装Jenkins和Gitea可以参考下面文章 https://blog.csdn.net/weixin_46533577/article/details/134644144 我的另一篇文章 介绍 前端 先说下自己的情况&#xff0c;因为自己服务器原因&#xff0c;使用的服务器内…

滴滴打车崩了!全过程

滴滴发布致歉10元补偿券&#xff0c;文末可领取 。 事情发生于 2023年11月27日晚~28日中午&#xff0c;滴滴打车服务出现大面积故障&#xff0c;登上微博热搜。 许多用户在使用滴滴出行时遇到了无法叫车、订单异常等问题&#xff0c;导致大量用户滞留在外&#xff0c;出行受阻…