RabbitMQ 高级特性——消息确认

news2024/9/20 18:43:00

在这里插入图片描述

文章目录

  • 前言
  • 消息确认机制
  • SpringBoot 实现消息确认
    • NONE
    • AUTO
    • MANUAL

前言

前面我们学习了 SpringBoot 整合 RabbitMQ,并且使用 RabbitMQ 实现了几种工作模式,接下来我们将学习关于 RabbitMQ 的高级特性——消息确认机制,持久化和发送方确认。

消息确认机制

大家应该学习过了计算机网络吧,那么 TCP 连接大家也一定不陌生吧,TCP 三次握手的时候,当服务器接收到建立连接的请求的时候,服务器就会返回一个 ACK 数据包,来告诉客户端我收到了你发送的请求。前面我们讲了一种确认机制,publisher/confirm 发布确认模式,那么这种模式是针对 RabbitMQ Broker 响应生产者发送的消息,而消费者消费消息的时候,也是需要告诉我们的队列是否接收到了这个消息的,以便队列能够选择删除这条消息还是重发这条消息。

对于消费者的消息确认有两种方式——自动确认和手动确认。

  1. 自动确认:当 autoAck 等于 true 的时候,RabbitMQ 会自动把发送出去的消息置为确认,让后从内存(或者磁盘)中删除,而不会管消费者是否正确的处理完成这条消息,自动确认适用于对于消息可靠性要求不高的场景。
  2. 手动确认:当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式的调用 Basic.Ack 命令,回复确认后才从内存(或者磁盘)中移去消息。这种模式适用于对消息可靠性要求比较高的场景。

自动确认:

public class Constants {
    public static final String CONFIRM_QUEUE= "confirm.queue";
}

生产者代码:

public class ConfirmProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("123.57.1.114");
        factory.setPort(5672);
        factory.setVirtualHost("test");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constants.CONFIRM_QUEUE,true,false,false,null);
        //发送消息
        String msg = "rabbitmq confirm";
        channel.basicPublish("",Constants.CONFIRM_QUEUE,null,msg.getBytes());
        System.out.println("消息发送成功");
        channel.close();
        connection.close();
    }
}

消费者代码:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("123.57.1.114");
        factory.setPort(5672);
        factory.setVirtualHost("test");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
            }
        };
        //true参数表示自动确认
        channel.basicConsume(Constants.CONFIRM_QUEUE,true,consumer);
        channel.close();
        connection.close();
    }
}

运行生产者代码,并且观察管理页面:

在这里插入图片描述
然后运行消费者代码:

在这里插入图片描述

消费者处理了这条消息并且自动的进行了确认。

如果我们在消费者消费完成消息之前创建一个异常,看看结果会怎样:

Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, 	AMQP.BasicProperties properties, byte[] body) throws IOException {
        int ret = 3/0;
        System.out.println("接收到消息:" + new String(body));
    }
};

在这里插入图片描述

可以看到,就算我们的消费者没有正确的处理完消息,队列也会将这条消息从队列中删除掉,那么如果我们的消费者需要重新获取到这条消息的话,就无法获得了。

手动确认:

//消费消息
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:" + new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
channel.basicConsume(Constants.CONFIRM_QUEUE,false,consumer);
  • deliveryTag:这是一个由RabbitMQ服务器分配给每条消息的唯一标识符。客户端使用这个标识符来确认它已经成功处理(或消费)了消息。
  • mutiple:是否批量确认,这是一个布尔值,用于指示是否确认单个消息(当为false时)还是确认deliveryTag之前的所有消息(当为true时)。

如果在消费者手动确认之前出现了异常,那么队列中的消息就不会被删除:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:" + new String(body));
        int ret = 3/0;
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};

在这里插入图片描述
在这里插入图片描述

当 autoAck 被设置为 false,对于 RabbitMQ 而言,队列中的消息被分成了两个部分:

一是等待投递给消费者的消息,二是已经投递给消费者,但是还没有收到消费者确认信号的消息。

如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。

在这里插入图片描述

否定确认:

手动确认可以分为上面的肯定确认,也可以否定确认:

Channel.basicReject(long deliveryTag, boolean requeue)

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:" + new String(body));
        channel.basicReject(envelope.getDeliveryTag(), false);
    }
};

在这里插入图片描述

不管是肯定确认还是否定确认,都是确认消息,所以队列中 Unacked 消息的个数就是 0。

basicReject 命令一次只能拒绝一次消息,如果想要批量拒绝消息,则可以使用 basicNack() 方法:

hannel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • long deliveryTag:这是一个唯一标识符,用于标识从RabbitMQ服务器接收到的消息。每个消息在被发送到消费者时都会被分配一个唯一的deliveryTag。通过这个标识符,消费者可以明确告诉RabbitMQ服务器它正在拒绝哪个具体的消息。
  • boolean multiple:这个参数指定了是否拒绝deliveryTag之前(包括deliveryTag本身)的所有未确认的消息。如果设置为true,则RabbitMQ会拒绝从当前channel上接收到的、且尚未被确认的、所有小于或等于该deliveryTag的消息。如果设置为false,则仅拒绝具有该特定deliveryTag的消息。
  • boolean requeue:这个参数决定了被拒绝的消息是否应该被重新放回队列中,以便可以被其他消费者重新处理。如果设置为true,消息将被重新放回队列的末尾(或根据队列的设置可能放到其他队列中,如果使用了死信队列等高级特性)。如果设置为false,消息将被丢弃(或根据服务器的配置可能进入死信队列)。

我们先将上面的手动确认代码的这一行注释掉,然后启动生产者和消费者,制造几个 Unacked 的消息,然后使用批量确认的方法,看前面的消息是否会被确认:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:" + new String(body));
        //这里设置被拒绝的消息会重新入列,fasle 表示被拒绝的消息不会被重新放入队列
        channel.basicNack(envelope.getDeliveryTag(), true,true);
    }
};

在这里插入图片描述
如果我们设置的被拒绝的消息会被重新放入队列的,并且我们的消费者没有及时关闭资源的话,就可能会导致我们消费者死循环的消费,因为当队列中存在 Ready 消息的时候,我们的消费者拒绝了这些消息然后将这些拒绝的消息重新放回队列,因为消费者的资源没有关闭,所以会继续监听这个队列,那么此时被拒绝的消息又加入到队列当中了,所以就又会重新执行上面的操作,最终导致一直消费-放入。

SpringBoot 实现消息确认

Spring AMQP 对消息确认机制提供了三种策略:

public enum AcknowledgeMode {
	NONE,
	MANUAL,
	AUTO;
}
  • AcknowledgeMode.NONE
    • 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 会自动确认消息,从 RabbitMQ 队列中删除该消息,如果消费者处理消息失败,消息可能会丢失
  • AcknowledgeMode.AUTO(默认)
    • 消费者在消息处理成功时会自动确认消息,如果处理过程中抛出了异常,则不会确认消息
  • AcknowledgeMode.MANUAL
    • 手动确认模式,消费者必须在处理完成消息之后显式的调用 basicAck 方法来确认消息,如果消息未被确认,RabbitMQ 会认为该消息未被成功处理,并且在消费者可用的时候重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理

那么在 SpringBoot 中如何配置消息确认的策略呢,同样还是在 application 配置文件中:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none
        //acknowledge-mode: auto
        //acknowledge-mode: manual

NONE

我们先来看看 none 策略的消息确认:

public static final String ACK_QUEUE = "ack.queue";
@Bean("ackQueue")
public Queue ackQueue() {
    return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@RequestMapping("/ack")
public String confirm() {
    rabbitTemplate.convertAndSend("", Constants.ACK_QUEUE,"rabbit ack");
    return "消息发送成功";
}
@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void queueListener(String message, Channel channel) {
        System.out.println("接收到消息:" + message + channel);
    }
}

在这里插入图片描述
如果消费者在处理的过程中抛出了异常:

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void queueListener(String message, Channel channel) {
        System.out.println("接收到消息:" + message + channel);
        int ret = 3/0;
    }
}

在这里插入图片描述

通过 RabbitMQ 管理页面观察队列情况:

在这里插入图片描述

可以发现消息被自动确认了,但是消费者并没有正确处理这个消息。

AUTO

AUTO 策略当消费者消费完成之后会自动确认,如果处理过程中抛出了异常则不会自动确认:

消费者正确处理消息的代码我就不演示了,我们直接来看处理异常的情况

在这里插入图片描述

而且运行代码我们还可以发现,代码会陷入死循环,这是因为,当我们的消费过程中抛出了异常之后,那么这个消息就会被设置为 Unacked,因为我们的消费者与 RabbitMQ Server 并没有断开连接,所以消费者会继续监听队列,读取队列中 Ready 和 Unacked 部分的消息,那么当这个消费者读取消息的时候,这个消息又会被设置为 Unacked,所以就会一直重复这个过程。

在这里插入图片描述
而当我们手动关闭这个程序的时候,这条消息又会被设置为 Ready:

在这里插入图片描述

MANUAL

MANUAL 策略需要我们手动确认消息,在这种模式下如果消费未被正确处理,那么这个消息会被重新投递给消费者,保证了可靠性:

我们还是看消息未被正确处理的情况:

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void queueListener(Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息:" + message + channel);
            int ret = 3/0;
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            try {
                //第二个参数表示是否批量确认,第三个参数表示当拒绝消息的时候是否将该消息重新进入队列
                channel.basicNack(deliveryTag,false,true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

在这里插入图片描述

启动项目之后,程序还是会重复执行,跟 auto 的原因是一样的,结束程序之后也是:

在这里插入图片描述

那么这三种策略就是我们的 Spring 提供的三种消息确认策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2119295.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

99.WEB渗透测试-信息收集-网络空间搜索引擎shodan(1)

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于: 易锦网校会员专享课 上一个内容:98.WEB渗透测试-信息收集-Google语法(12) 信息收集方向-网络空间…

探索全球实时云渲染与XR技术的多平台兼容性:谁是行业引领者?

在扩展现实(XR)技术与实时云渲染技术的飞速发展中,多平台兼容性已经成为行业技术竞争的关键要素。能够在不同的平台和设备上高效运行的解决方案,不仅关系到开发效率和场景多样性,还直接影响到用户体验和市场占有率。Pa…

续航和性能好的随身WiFi怎么选?一篇文章告诉你哪个随身WiFi值得买,格行vs华为vs中兴vs飞猫vs闪鱼

各大购物平台的大促已经开始,还在纠结入手哪个随身WiFi的小伙伴,小编今天用一篇文章告诉你哪款随身WiFi值得买 一、格行:成立于2009年,有15年的行业经验,是随身WiFi、物联网行业的巨头,销量持续保持领先&am…

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

码上进阶_刷题模块测试_用例设计

码上进阶_刷题模块测试_用例设计 系统概述: 码上进阶是为程序员专门打造的交流平台,采用主流的微服务框架和C端技术栈作为技术基础。在这个平台上,程序员 可以通过刷题、练习和模拟面试来提升自己的面试能力。 功能测试: 登录…

机器人领域超重量奖项TRO傅京孙最佳论文奖汇总【上】

更多优质内容,请关注公众号:智驾机器人技术前线 简介 IEEE Transactions on Robotics King-Sun Fu Memorial Best Paper Award这个奖项是为了表彰每年在《IEEE Transactions on Robotics》上发表的最佳论文。 以下是该奖项的一些信息: 奖项…

2025通信硕士找工作纪实

通信算法工程师秋招基本情况 读博难 国内读博难:华五以上,信息与通信工程专业,基本无普博hc,都是直博。偶尔有一些招普博的老师,是许久不科研,或者来了都去做横向。唯一可能的普博机会,是找刚入…

1.1 半导体基础知识

文章目录 半导体的特点本征半导体本征激发 杂质半导体N型半导体(电子型)电离施主杂质:多子少子 P型半导体(空穴型)电离受主杂质:多子少子 杂质半导体的示意图 PN结1、 PN 结中载流子的运动(1&am…

苏茵茵:以时尚之名,诠释品质生活

在女性追求个性化与自我表达的今天,时尚早已超越了简单的穿着打扮,它成为女性展现自我风格、彰显独特魅力的重要方式。从广泛的兴趣爱好到精心雕琢的个人风格,每一处细节都闪耀着女性对个性独特与自我表达的深切渴望。正是这股不可阻挡的潮流…

《深度学习》OpenCV 模版匹配多个对象、图片旋转 综合应用

目录 一、模板匹配 1、什么是模版匹配 2、原理 3、应用领域 4、案例实现 1)模版图片和输入图片信息 2)代码实现 运行结果: 二、图像旋转 1、使用numpy方法 运行结果: (图片来源网络,如有侵权敬…

ORBSLAM2三维重建后上下颠倒

调整 Z 坐标的符号,将d改为-d 引用文献 https://blog.csdn.net/qq_42450767/article/details/114144439

【技术调研】三维(1)-ThreeJs-基础常识及第一个程序

前言 ​ 公司有网页三维以及客户端、vr三维相关项目机会,需要对相关技术进行调研,进行项目可行性评估和大致成本评估。基于此对三维一些内容进行调研。 什么是three.js ​ Three.js是一款基于原生WebGL封装通用Web 3D引擎。由于前文已经了解过webGL,知道通过webGL的API我…

人工智能技术导论——基于产生式规则的机器推理

在引出本章的内容之前先介绍一个概念 知识 知识的概念 知识(Knowledge)是人们在改造客观世界的实践中形成的对客观事物(包括自然的和人造的)及其规律的认识,包括对事物的现象、本质、状态、关系、联系和运动等的认识…

大数据技术体系架构

数据源 社交媒体平台 云平台 网站资源 物联网(IOT) 数据库 特点 分布式 数据源一般分布在不同的设备上,这些设备通常由网络连接在一起,网络空间的安全及其重要; 异构性 数据的来源广泛,比如社交媒…

Qt常用控件——QRadioButton和QCheckBox

文章目录 QRadioButtonQAbstractButton信号实现简单的点餐页面QCheckBox QRadioButton QRadioButton是单选按钮,可以让我们在多个选项当中选择一个 作为QAbstractButton和QWidget的子类,它们的属性和语法,对于QRadioButton同样适用 QAbstrac…

springboot+vue+mybatis计算机毕业设计医护系统的设计与实现+PPT+论文+讲解+售后

近些年来,随着科技的飞速发展,互联网的普及逐渐延伸到各行各业中,给人们生活带来了十分的便利,医护系统的设计与实现利用计算机网络实现信息化管理,使整个医护系统的发展和服务水平有显著提升。 本文拟采用Eclipse开发…

【尚跑】2024陕西淳化天然氧吧半程马拉松赛149顺利完赛

1、赛事背景 奔跑美丽淳化,畅游天然氧吧。9月8日上午,2024淳化天然氧吧半程马拉松赛在淳化县润镇东街鸣枪开跑,4000名马拉松爱好者相聚美丽淳化,赏荞麦花海、闻硕果飘香,共同开启这场挑战自我、超越极限的奔跑之旅 本次…

EasyRecovery破解版下载无需注册,easyrecovery数据恢复软件免费版激活码密钥

EasyRecovery易恢复是一款功能强大的数据恢复软件,为无数人群解决了数据丢失的烦恼,为工作生活带去了便捷。无数使用者在使用过后,都肯定了其强大的数据恢复功能。具体来说,EasyRecovery易恢复可以恢复多方面的数据,Ea…

如何规避SQL注入漏洞

1 引言 对于很多初学者而言,SQL注入攻击是一种很容易被忽略的安全漏洞,其原理很简单,在日常编码中需要注意规避,养成良好的系统安全意识。 2 原理 SQL注入漏洞产生的根本原因,就是在编码过程中手动拼接sql参数造成的…

os模块函数

1、常用命令 os.getcwd() 返回当前工作目录 os.listdir() 返回指定文件路径下的文件夹列表或者文件列表 os.mkdir 创建文件夹,不能创建递归文件夹,也就是上一层文件夹必须存在,不存在时会报错,同时在指定目录下有相同的文件夹名称,再创建会报错 os.makedirs 可以创建…