RabbitMQ入门(三)消息应答与发布确认

news2025/1/11 17:53:00

前言:

消息应答与发布确认都是保证消息不丢失。而重复消费问题则是消息幂等性。(之后会说幂等性)

消息应答:

应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息应答有自动应答、手动应答。mq默认为自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

//在消费时候,设置 false;
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

 在消费者成功消费的回调方法 DeliverCallback 中应答:

         //声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
            /**
             * 手动确认应答
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

除了手动应答外,还有:

/**
             * 否定确认应答
             * 1.拒绝 deliveryTag 对应的消息
             * 2.是否 requeue:true 则重新入队列,false 则丢弃或者进入死信队列。
             * 该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
             */
          //  channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            /**
             * 用于否定确认,表示己拒绝处理该消息,可以将其丢弃了
             * 1.拒绝 deliveryTag 对应的消息
             * 2.是否 应用于多消息
             *      Multiple 的解释:手动应答的好处是可以批量应答并且减少网络拥堵
             *      true 代表批量应答 channel 上未应答的消息
             *      比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 ,
             *      那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
             *      false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
             * 3.是否 requeue,与 basicReject 区别就是同时支持多个消息,
             *   可以 拒绝签收 该消费者先前接收未 ack 的所有消息。拒绝签收后的消息也会被自己消费到。
             */
          //  channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);
            /**
             * 是否恢复消息到队列
             * 1.是否 requeue,true 则重新入队列,并且尽可能的将之前 recover 的消息投递给其他消费者消费,
             *   而不是自己再次消费。false 则消息会重新被投递给自己。
             * 消息自动重新入队:
             *   如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),
             *   导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。
             *   如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。
             *   这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
             */
           // channel.basicRecover(false);

特别注意:消息在手动应答是不丢失的,它会放回队列中重新消费

发布确认:

发布功能属于生产者,生产消息到 RabbitMQ,RabbitMQ 需要告诉生产者已经收到消息。

发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

 发布确认有:单个确认批量确认异步确认

发布确认命令:channel.waitForConfirms();

单个确认:发布一个消息之后只有它被确认发布,后续的消息才能继续发布

package com.example.mqtest.mqtest;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ConfirmMessage {

    //单个发消息的个数
    public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        publishMessageIndividually();//发布1000个单独确认消息,耗时:599ms
    }
    //单个确认
    public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);

        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < 1000; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");

    }
}

批量确认:先发布一批消息然后一起确认可以极大地提高吞吐量,

缺点:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

public class ConfirmMessage2 {

    //批量发消息的个数
    public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写

    public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        publishMessageBatch(); //发布1000个批量确认消息,耗时:111ms
    }

    //批量发布确认
    public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, true, false, null);

        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize =100;

        //批量发送消息,批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message=i+"";
            channel.basicPublish("",queueName,null,message.getBytes());

            //判断达到100条消息的时候,批量确认一次
            if((i+1)%batchSize==0){
                //发布确认
                channel.waitForConfirms();
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
    }
}

异步发布:利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

public class ConfirmMessage3 {

    public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
    
    public static void main(String[] args) throws Exception {
        publishMessageAsync(); //发布1000个异步发布确认消息,耗时:43ms
    }

    //异步发布确认
    public static void publishMessageAsync() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, true, false, null);

        //开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目 只要给到序号
         * 3.支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms=
                new ConcurrentSkipListMap<>();

        //消息确认回调的函数
        ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
            if(multiple) {
                //2.删除掉已经确认的消息 剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed =
                        outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:" + deliveryTag);
        };
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         */
        //消息确认失败回调函数
        ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
            //3.打印一下未确认的消息都有哪些
            String message = outstandingConfirms.remove(deliveryTag);
            System.out.println("未确认的消息是:"+message+":::未确认的消息tag:"+deliveryTag);
        };

        //准备消息的监听器 监听那些消息成功了,哪些消息失败了
        /**
         * 1.监听哪些消息成功了
         * 2.监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback,nackCallback);//异步通知

        //开始时间
        long begin = System.currentTimeMillis();

        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message=i+"消息";
            channel.basicPublish("",queueName,null,message.getBytes());
            //1.此处记录下所有要发送的消息 消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时:"+(end-begin)+"ms");
    }
}

将发布的消息存入 Map 里,方便获取。headMap 方法用于将已确认的消息存入新的 Map 缓存区里,然后清除该新缓存区的内容。因为 headMap 方法是浅拷贝,所以清除了缓存区,相当于清除了内容的地址,也就清除了队列的确认的消息。

处理异步未确认消息最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

以上 3 种发布确认速度对比:

  • 单独发布消息

    同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息

    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

  • 异步处理

    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习——残差网络(ResNet)笔记

残差网络&#xff1a;经常使用的网络之一 1.随着神经网络的不断加深能改进精度吗&#xff1f; 不一定 ①蓝色五角星表示最优值&#xff0c;Fi闭合区域表示函数&#xff0c;闭合区域的面积代表函数的复杂程度。在这个区域能够找到一个最优的模型&#xff08;区域中的一个点表…

「重学JS」带你一文吃透作用域与闭包

前言 学习了这么久前端&#xff0c;发现自己对于基础知识的掌握并没有那么通透&#xff0c;于是打算重新学一遍JS&#xff0c;借用经济学的一句话&#xff1a;JS基础决定能力高度&#x1f926;&#x1f3fb; 基础很重要&#xff0c;只有基础好才会很少出 bug&#xff0c;大多数…

二叉树的性质

由于二叉树的结构特殊&#xff0c;会有一系列的数学性质 性质一&#xff1a;对于一棵二叉树&#xff0c;第i层的最大结点数量为 个&#xff0c;比如二叉树的第一层只有一个根结点&#xff0c;而二叉树的第三层可以有 个结点。 性质二&#xff1a;对于一棵深度为k的二叉树&am…

【Python】函数

文章目录1. 函数介绍2. 函数的定义与调用3. 函数参数4. 函数返回值5. 变量作用域6. 函数执行过程7. 链式调用8. 嵌套调用9. 函数递归10. 参数默认值11关键字参数1. 函数介绍 编程中的函数不同于数学中的函数&#xff1a; 数学上的函数&#xff0c;比如 y sin x&#xff0c;x…

Vue快速上门|了解MVVM

1.1、先了解下MVVM VUE是基于MVVM思想实现的,❓那什么是MVVM呢?—— MVVM,是Model-View-ViewModel的缩写,是一种软件架构模式。其核心思想就是分离视图、数据、逻辑,VUE框架解决了数据Model到视图View的双向绑定,我们只关注业务逻辑ViewModel即可,极大的提高的编程效率…

BadUSB超详细制作, 实现CobaltStrike远控上线

前言 在2014年美国黑帽大会上&#xff0c;安全研究人员JakobLell和独立安全研究人员Karsten Nohl展示了他们称为“BadUSB”的攻击方法&#xff0c;这种攻击方法让USB安全和几乎所有和USB相关的设备(包括具有USB端口的电脑)都陷入相当危险的状态 现在的USB设备很多&#xff0c…

高级篇之ENC1当作采集卡使用方案推荐

高级篇之ENC1当作采集卡使用0 背景&#xff1a;1 准备工作2 连接示意图3 配置步骤&#xff1a;3.1 在笔记本电脑上安装NDI4工具3.2 ENC1设备连接3.3 配置电脑的USB网卡的IP地址3.4 配置ENC1设备3.5 打开NDI工具的虚拟输入功能0 背景&#xff1a; HDMI视频采集卡分为内嵌式采集…

【GCC编译优化系列】宏定义名称与函数同名是一种什么骚操作?

作者简介 *架构师李肯&#xff08;全网同名&#xff09;**&#xff0c;一个专注于嵌入式IoT领域的架构师。有着近10年的嵌入式一线开发经验&#xff0c;深耕IoT领域多年&#xff0c;熟知IoT领域的业务发展&#xff0c;深度掌握IoT领域的相关技术栈&#xff0c;包括但不限于主流…

​全网最牛的Fiddler系列文章(一):fiddler的介绍及安装​

Fiddler(1)&#xff1a;fiddler的介绍及安装 Fiddler简介 Fiddler是比较好用的web代理调试工具之一&#xff0c;它能记录并检查所有客户端与服务端的HTTP/HTTPS请求&#xff0c;能够设置断点&#xff0c;篡改及伪造Request/Response的数据&#xff0c;修改hosts&#xff0c;限…

【UEFI实战】Redfish的BIOS实现1

Redfish的BIOS实现 EDK2提供了Redfish框架&#xff0c;用来实现带外的BIOS配置&#xff0c;其基本框架如下&#xff1a; 通过RedfishPkg中提供的Driver&#xff0c;可以实现BIOS与BMC或者其它的软件进行通信。它主要分为两个部分&#xff0c;分别是Client和Foundation。Client…

[论文解析]DREAMFUSION: TEXT-TO-3D USING 2D DIFFUSION

code links&#xff1a;dreamfusion3d.github.io 文章目录OverviewWhat problem is addressed in the paper?What is the key to the solution?What is the main contribution?What can we learn from ablation studies&#xff1f;Potential fundamental flaws; how this w…

MATLB|基于粒子群算法的能源管理系统EMS(考虑光伏、储能 、柴油机系统)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清…

原子操作类之18罗汉增强

原子操作类之18罗汉增强 是什么 都是java.util.concurrent.atomic包下的 有红框圈起来的&#xff0c;也有蓝框圈起来的&#xff0c;为什么&#xff1f; 阿里巴巴Java开发手册 为什么说18罗汉增强&#xff0c;却只有16个 再分类 基本类型原子类 AtomicInteger AtomicBoolea…

wpa_supplicant工具移植到嵌入式设备

1、wpa_supplicant源码下载 (1)源码下载地址&#xff1a;http://w1.fi/releases/&#xff1b; (2)本文是以wpa_supplicant-2.6.tar.gz版本进行移植&#xff1b; 2、编译openssl 2.1、确定适配的openssl版本 Optional libraries for EAP-TLS, EAP-PEAP, and EAP-TTLS: - OpenS…

【LeetCode】1827. 最少操作使数组递增

题目描述 给你一个整数数组 nums &#xff08;下标从 0 开始&#xff09;。每一次操作中&#xff0c;你可以选择数组中一个元素&#xff0c;并将它增加 1 。 比方说&#xff0c;如果 nums [1,2,3] &#xff0c;你可以选择增加 nums[1] 得到 nums [1,3,3] 。 请你返回使 nums …

ESXi8.0中NVME硬盘不识别解决方法1,设置直通

目录 1.前言 2.直通设置 3.槽点 1.前言 ESXi8.0删除了很多老版本的硬件的驱动程序&#xff0c;导致NVME1.3及更低协议的固态硬盘均无法被ESXi直接识别正如我手头准备了尚好的服务器专用PM983A却无法识别。本着不折腾先熟悉ESXi8.0的思路另外找了一块盘装了ESXi的系统。本以为…

云原生之使用Docker部署webssh工具sshwifty

云原生之使用Docker部署webssh工具sshwifty一、sshwifty介绍1.sshwifty简介2.shwifty 特点二、检查本地docker环境1.检查docker版本2.检查docker状态三、下载sshwifty镜像四、服务器生成凭证文件五、创建sshwifty容器1.创建部署目录2.创建sshwifty容器3.查看sshwifty容器状态六…

uniapp 之 小程序线上版本一直处于加载状态

前言 最开始小程序都是体验版的&#xff0c;后来应老大需求&#xff0c;把体验版提交审核为正式版本&#xff08;线上版本&#xff09;&#xff0c; 原本以为版本审核得花费几天时间&#xff0c;没想到它这审核速度挺快的&#xff0c;不到3小时就审核通过了&#xff0c;审核…

[go]汇编语言

文章目录计算机结构常量与变量全局变量常量数组字符串函数参数与返回值goroutineGo汇编程序无法独立使用&#xff0c;必须以Go包的方式组织&#xff0c;同时包中至少要有一个Go语言文件用于指明当前包名等基本包信息。如果Go汇编代码中定义的变量和函数要被其它Go语言代码引用&…

Spark的架构与基本运行流程

Spark的架构与基本运行流程一、Spark中的核心概念二、Spark中的核心架构设计一、Spark中的核心概念 &#xff08;1&#xff09;RDD&#xff0c;Spark中最核心的概念就是RDD&#xff08;Resillient Distributed Dataset&#xff0c;弹性分布式数据集&#xff09;。换而言之&…