消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

news2025/1/12 6:17:53

九、发布确认

在这里插入图片描述

1、发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。(批量)

(异步)confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

2、发布确认的策略

开启发布确认的方法发布确认默认是没有开启的,如果要开启,需要调用方法 confirmSelect每当你要想使用发布确认,都需要在 channel 上调用该方法(反复确认)。

1)单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

/**
 * 单个发送
 */
public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");

}

测试效果:

在这里插入图片描述

在这里插入图片描述

2)批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

在这里插入图片描述

在这里插入图片描述

/**
 * 批量
 */
public static void publishMessageBatch() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();
    //批量确认消息大小
    int batchSize = 100;
    //未确认消息个数
    int outstandingMessageCount = 0;
    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        outstandingMessageCount++;
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }
    }
    //为了确保还有剩余没有确认消息 再次确认
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

测试效果:

在这里插入图片描述

在这里插入图片描述

3)异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。

异步确认发布

1、RabbitMQ使用map来记录消息序号和内容

2、代码实现异步确认

(1)选择有两个参数的addConfirmListener函数

在这里插入图片描述

(2)查看源码:知道需要配置的参数
在这里插入图片描述

都是需要实例化接口函数的

在这里插入图片描述

(3)使用匿名函数的方式

在这里插入图片描述

(4)打印消息序号

在这里插入图片描述

在这里插入图片描述

(5)测试效果:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

如何处理异步未确认消息?

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

(1)代码实现:

在这里插入图片描述

①记录下所有要发送的消息,消息的总和

在这里插入图片描述

②删除到已经确认的消息,剩下的就是未确认的消息

在这里插入图片描述

③打印一下未确认的消息都有哪些

在这里插入图片描述

//    异步确认发布
    private static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();


        /**线程安全有序的一个哈希表,适用于高并发的情况下
          1.轻松的将序号与消息进行关联
          2.轻松批量删除条目,只要给到序号
          3.支持高并发(多线程)
         **/
        //消息确认成功,回调函数
        ConcurrentSkipListMap<Long,String> outstandingConfirms =
                new ConcurrentSkipListMap<>();
        ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个
            if (multiple){  //批量的
                //2.轻松批量删除条目,只要给到序号
                ConcurrentNavigableMap<Long,String> confirmed =
                        outstandingConfirms.headMap(deliveryTag); //消息的序号
                confirmed.clear();
            }else {  //单个的
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };

        //消息确认失败,回调函数
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag , multiple)->{
            //3、打印一下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);
        };


        //准备消息的监听器 ,监听哪些消息成功了, 哪些失败了
        channel.addConfirmListener( ackCallback, nackCallback);

        //开始时间
        long begin = System.currentTimeMillis();

        //发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            //1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

测试效果:

在这里插入图片描述

以上 3 种发布确认速度对比 :

  • 单独发布消息:同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。

  • 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。

消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459613.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Input Output模型

一、I/O介绍 I/O在计算机中指Input/Output&#xff0c; IOPS (Input/Output Per Second)即每秒的输入输出量(或读写次数)&#xff0c;是衡量磁盘性能的主要指标之一。IOPS是指单位时间内系统能处理的I/O请求数量&#xff0c;一般以每秒处理的I/O请求数量为单位&#xff0c;I/O…

ETL、ELT区别以及如何正确运用

一、 浅谈ETL、ELT ETL与ELT的概念 ETL (Extract, Transform, Load) 是一种数据集成过程&#xff0c;通常用于将数据从一个或多个源系统抽取出来&#xff0c;经过清洗、转换等处理后&#xff0c;加载到目标数据存储中。这种方法适用于需要对数据进行加工和整合后再加载到目标…

react实现转盘抽奖功能

看这个文章不错&#xff0c;借鉴 这个博主 的内容 样式是背景图片直接&#xff0c;没有设置。需要的话应该是 #bg { width: 650px; height: 600px; margin: 0 auto; background: url(turntable-bg.jpg) no-repeat; position: relative; } img[src^"pointer"] {positi…

redis的搭建 RabbitMq搭建

官网 Download | Redis wget https://github.com/redis/redis/archive/7.2.4.tar.gz 编译安装 yum install gcc g tar -zxvf redis-7.2.4.tar.gz -C /usr/localcd /usr/local/redis make && make install 常见报错 zmalloc.h:50:10: fatal error: jemalloc/jemal…

[office] excel图表怎么发挥IF函数的威力 #微信#媒体

excel图表怎么发挥IF函数的威力 IF函数应该是最常用的Excel函数之一了&#xff0c;在公式中经常能够看到她的“身影”。IF函数的基本使用如图1所示。 图1 IF函数之美 IF函数是一个逻辑函数&#xff0c;通过判断提供相应操作&#xff0c;让Excel更具智能。 然而&#xff0c;…

js设计模式:装饰者模式

作用: 可以给原有对象的身上添加新的属性方法 可以让对象或者组件进行扩展 示例: class Person{constructor(name,selfSkill){this.name namethis.selfSkill selfSkill}run 会走路}//所有人类都有的共同特性和技能let wjt new Person(王惊涛,写代码)let mashi new Pers…

2024.02.20作业

1. 使用多进程完成两个文件的拷贝&#xff0c;父进程拷贝前一半&#xff0c;子进程拷贝后一半&#xff0c;父进程回收子进程的资源 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <time.h> #includ…

Sora的原理,中国小学生游戏在践行

大家龙年好呀&#xff0c;春节假期和家人出去浪了&#xff0c;旅行期间&#xff0c;几乎没刷社交媒体信息。等我17号回到家仔细看手机&#xff0c;Sora的消息铺面而来&#xff0c;什么“新革命”、“划时代”、“新纪元”说的挺神呼。 任何新事物出现&#xff0c;讨论热烈是好…

AS-V1000 视频监控平台产品介绍:客户端功能介绍(四)

目 录 一、引言 1.1 AS-V1000视频监控平台介绍 1.2平台服务器配置说明 二、软件概述 2.1 客户端软件用途 2.2 客户端功能 三、客户端功能说明 3.1告警管理 3.1.1告警联动 &#xff08;1&#xff09;告警联动显示 &#xff08;2&#xff09;告警联动处理 3…

unity学习(31)——跳转到角色选择界面(打勾?手滑挂错脚本)

There are 2 audio listeners in the scene. Please ensure there is always exactly one audio listener in the scene. 是因为后来创建了一个camera&#xff0c;因为camera中自带一个组件Audio Listener。所以有两个camera就有两个audio listener导致报错。 一个简单的解决…

C++(18)——适配器概念以及stack、queue、优先队列的模拟实现

上篇文章中&#xff0c;给出了对于模拟实现中功能的补全&#xff0c;本篇文章将优先介绍一个新的容器之后引入什么是适配器&#xff0c;以及适配器的使用方法&#xff0c;再通过适配器的思想来完成对于&#xff0c;、优先级队列_的实现。 目录 1. deque: 1.1 什么是deque&…

【嵌入式-Keil】keil代码提示快捷键

CTRL空格 如果没有提示&#xff0c;可能跟输入法的快捷键冲突&#xff0c; 右键->设置->按键->勾掉第一个就行了 再按CTRL空格就有提示了 参考&#xff1a;串口发送&串口发送接收

SAP PP学习笔记02 - PP中配置品目Master时的顺序

配置品目Master的时候&#xff0c;最佳实践是要遵循什么顺序呢&#xff1f; 一般而言是如下顺序 - 新规物料类型&#xff08;或利用现有类型也可以&#xff09; - 设定料号范围 - 设定物料状态&#xff08;比如准备好之前&#xff0c;要先锁住&#xff0c;等准备好了之后再…

CTFshow web(SQL注入176-179)

web176 没啥好说的&#xff0c;直接上万能密码&#xff1a; 1 or usernameflag 当然了还有别的方法&#xff1a; 1 union Select 1,2,group_concat(password) from ctfshow_user where username flag -- web177 没啥好说的&#xff0c;直接上万能密码&#xff1a; 1 or user…

《VitePress 简易速速上手小册》第1章:VitePress 入门(2024 最新版)

文章目录 1.1 VitePress 简介与架构1.1.1 基础知识点解析1.1.2 重点案例&#xff1a;企业文档站点1.1.3 拓展案例 1&#xff1a;个人博客1.1.4 拓展案例 2&#xff1a;产品展示网站 1.2 安装与初次运行1.2.1 基础知识点解析1.2.2 重点案例&#xff1a;公司内部知识分享平台1.2.…

关于VIT(Vision Transformer)的架构记录

在VIT模型设计中&#xff0c;尽可能地紧密遵循原始的Transformer模型&#xff08;Vaswani等人&#xff0c;2017年&#xff09;。这种刻意简化的设置的一个优势是&#xff0c;可扩展的NLP Transformer架构及其高效的实现几乎可以即插即用。 图&#xff1a;模型概述。我们将图像分…

MySQL在OpenEuler中的安装及数据库的备份

MySQL在OpenEuler中的安装 MySQL以二进制形式进行安装 1.获取软件包 &#xff08;在进行获取时&#xff0c;检查网络是否通畅&#xff09; wget -c https://mirrors.aliyun.com/mysql/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz2.创建用户组和用户 groupadd -g…

图片怎么变成透明背景?分享这些变透明的方法

很多从事编辑和图片设计的同行在日常工作中经常需要处理图片的背景色。为了更好地进行设计和编辑&#xff0c;将图片的背景色替换成透明是非常必要的。然而&#xff0c;对于一些新手来说&#xff0c;使用专业的图像处理软件可能有些困难。不过&#xff0c;现在有很多在线的图像…

大厂的数据质量中心系统设计

日常工作中&#xff0c;数据开发上线完一个任务后并不是就可以高枕无忧&#xff0c;时常因上游链路数据异常或者自身处理逻辑的 BUG 导致产出的数据结果不可信。而问题发现可经历较长周期&#xff08;尤其离线场景&#xff09;&#xff0c;往往是业务方通过上层数据报表发现数据…

网页布局之浮动

一&#xff0c;传统网页布局的三种方式 普通流&#xff08;标准流&#xff09;、浮动、定位。 二&#xff0c;标准流&#xff08;普通流/文档流&#xff09; 即为标签按照规定好的默认方式排列。 1.块级元素会独占一行&#xff0c;从上向下顺序排列。 常用元素&#xff1a;…