RabbitMQ的发布确认

news2025/1/19 7:07:47

文章目录

  • 前言:为什么要用发布确认
  • 一、发布确认原理
  • 二、发布确认的策略
    • 2.1 开启发布确认的方法
    • 2.2 单个确认发布
    • 2.3 批量确认发布
    • 2.4 异步确认发布
    • 如何处理异步未确认消息
    • 以上 3 种发布确认速度对比


前言:为什么要用发布确认

答:持久化章节中,会将队列和消息都进行持久化,但是会存在当队列或者消息往磁盘中写入时,RabbitMQ出现了宕机,此时队列和消息还未写入磁盘中,未进行持久化操作。

一、发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。


提示:以下是本篇文章正文内容,下面案例可供参考

二、发布确认的策略

2.1 开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

Channel channel = RabbitMqUtils.getChannel();
//开启发布确认
channel.confirmSelect();

2.2 单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了

代码如下(示例):

package com.example.publishConfirm;

import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认
 *
 */
public class ConfirmMessage {

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1.单个确认
        publishMessageIndividually();//发布1000条单独确认消息,耗时334ms
    }
    
    //单个确认
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上进行发布确认
            boolean b = channel.waitForConfirms();
            if (b){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条单独确认消息,耗时" + (end - begin) + "ms");
    }
}

2.3 批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

代码如下(示例):

package com.example.publishConfirm;

import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认
 *
 */
public class ConfirmMessage {

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //2.批量确认
        publishMessageBatch();//发布1000条批量确认消息,耗时56ms
    }

    //批量发布确认
    public static void publishMessageBatch() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小 可以100条100条确认
        int batchSize = 100;

        //批量发送消息 批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {

            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());

            //判断达到100条消息的时候 批量确认一次
            if (i%batchSize == 0){
                //发布确认
                channel.waitForConfirms();
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条批量确认消息,耗时" + (end - begin) + "ms");
    }
    
}

2.4 异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,
他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

在这里插入图片描述

代码如下:

package com.example.publishConfirm;

import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认
 *
 */
public class ConfirmMessage {

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //3.异步批量确认
        publishMessageAsync();//发布1000条异步发布确认消息,耗时44ms
    }

    //异步发布确认
    public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表  适用于高并发的情况下
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目  只要给到序号
         * 3.支持高并发(多线程)
         */

        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
        
        

        //消息确认成功 回调函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            if (multiple){
                //【2】:删除到已经确认的消息 剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }

            System.out.println("确认的消息:" + deliveryTag);
        };

        //消息确认失败 回调函数
        /**
         * 第一个参数:消息的标记
         * 第二个参数:是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            //【3】:打印以下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:" + message + "::::未确认的消息tag:" + deliveryTag);
        };

        //准备消息的监听器 监听哪些消息成功了 哪些消息失败了
        /**
         * 第一个参数:监听哪些消息成功了
         * 第二个参数:监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback,nackCallback);//异步通知

        //开始时间
        long begin = System.currentTimeMillis();

        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息 " + i;
            channel.basicPublish("",queueName,null,message.getBytes());

            //【1】:此处记录下所有要发送的消息  消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条异步发布确认消息,耗时" + (end - begin) + "ms");
    }
}

如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传
递。

以上 3 种发布确认速度对比

  • 单独发布消息
    • 同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息
    • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
  • 异步处理
    • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/84890.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在word文档表中插入图片不变形

在word文档表中插入图片不变形 目录 在word文档表中插入图片不变形 1、点击左上角【全选图标 】选中表格&#xff0c;鼠标右键点击【表格属性】 2、点击【选项】点击 取消勾选【自动重调尺寸以适应内容】&#xff0c;最后点击【确定】 ​3、依次点击【插入】【图片】点击图片…

【TSP问题】基于蜜蜂算法求解旅行商问题附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

安全数据资产统一管理

安全数据资产 统一管理DataOps&#xff0c;即 Data 和 Operations 的集成&#xff0c;于 2014 年首次提出。Gartner 将 DataOps 定义为“一种协作性的数据管理 实践&#xff0c;专注于改进组织内数据管道的通信、集成和自动化”[7]。DataOps 是一种面向流程的自动化方法&#x…

U盘被写保护怎么解除?解决方案只需这几个

U盘写保护是一种物理开关保护功能&#xff0c;用于防止存储介质上的数据被错误删除或者写入。如果你想去掉“写保护”&#xff0c;u盘被写保护怎么解除&#xff1f;看看下面的解决方案是怎么说的&#xff0c;赶紧跟随下面去掉U盘写保护的步骤来操作吧&#xff01; 方案一&#…

基于opencv传统数字图像处理实现车道线检测详细过程(附源码)

车道线检测 &#xff08;Lane Detection&#xff09; 1、实验内容 本实验使用数字图像处理的基本方法&#xff0c;构建了一个车道线检测模型。该模型可以识别图像中所有的车道线&#xff0c;并得到完整的车道线信息。模型在tuSimple Lane Dataset大小为100的数据子集进行了测…

餐饮门店数字化转型|餐厅管理系统小程序

餐饮行业规模非常庞大&#xff0c;每年都有大量公司或个体户入局&#xff0c;国内各类美食非常多&#xff0c;不同品类菜品都有大量需求&#xff0c;以前几乎在业的餐饮门店&#xff0c;只要运营得当&#xff0c;挣多挣少总归是有利的&#xff0c;也能很好的生存下去&#xff0…

nodejs+vue大学生交流互动论坛网站系统

目 录 1 概述 1 1.1课题背景及意义 1 1.2 国内外研究现状 1 1.3 本课题主要工作 2 2 系统开发环境 3 前端技术&#xff1a;nodejsvueelementui 前端&#xff1a;HTML5,CSS3、JavaScript、VUE 系统分为不同的层次&#xff1a;视图层&#xff08;vue页面&…

【云计算与大数据技术】Bloom Filter、LSM树、Merkle哈希树、Cuckoo哈希等数据结构的讲解(图文解释 超详细)

一、重要数据结构与算法 分布式存储系统中存储大量的数据,同时需要支持大量的上层读/写操作&#xff0c;为了实现高吞吐量&#xff0c;设计和实现一个良好的数据结构能起到相当大的作用 这是以下三个数据库使用的数据结构&#xff0c;一个良好的数据结构对于分布式系统来说有…

Swift 周报 第十九期 |技术汇总

前言 本期是 Swift 编辑组自主整理周报的第十期&#xff0c;每个模块已初步成型。各位读者如果有好的提议&#xff0c;欢迎在文末留言。 欢迎投稿或推荐内容。目前计划每两周周一发布&#xff0c;欢迎志同道合的朋友一起加入周报整理。 十期磨一剑&#xff0c;废铁亦有形&am…

使用TensorFlow Probability实现最大似然估计

TensorFlow Probability是一个构建在TensorFlow之上的Python库。它将我们的概率模型与现代硬件(例如GPU)上的深度学习结合起来。 极大似然估计 最大似然估计是深度学习模型中常用的训练过程。目标是在给定一些数据的情况下&#xff0c;估计概率分布的参数。简单来说&#xff0…

开源依赖项管理指南

就像人际关系中人与人之间的关系一样&#xff0c;软件生态系统中包含一个庞大的关系网络。其中一些联系非常深入&#xff0c;而有一些关系则更为表面。但实际上&#xff0c;现代基于开源的软件开发涉及一个极其庞大的依赖关系树&#xff0c;依赖关系层层叠加&#xff0c;同时涉…

喜讯丨创新微MinewSemi的MS11SF1系列荣获2022中国IoT创新奖—产品金狮奖

北京时间2022年12月8日&#xff0c;由知名电子科技媒体“电子发烧友”举办的2022第九届中国IoT大会在深圳圆满落幕&#xff0c;创新微MinewSemi凭借高性能、低功耗的WiFiBLE Combo 模块—MS11SF1系列&#xff0c;在众多参会嘉宾和行业主流媒体的共同见证下&#xff0c;荣获2022…

卷积神经网络中卷积的作用与原理

目录 前言 卷积的作用 卷积的参数 卷积核大小&#xff08;kernel_size&#xff09; 填充&#xff08;padding&#xff09; same valid full 卷积核算子&#xff08;operator&#xff09; Robert 算子 Prewitt算子 Sobel 算子 Laplance 算子 卷积核深度与个数&…

【C++进阶】哈希(万字详解)—— 运用篇(下)

&#x1f387;C学习历程&#xff1a;入门 博客主页&#xff1a;一起去看日落吗持续分享博主的C学习历程博主的能力有限&#xff0c;出现错误希望大家不吝赐教分享给大家一句我很喜欢的话&#xff1a; 也许你现在做的事情&#xff0c;暂时看不到成果&#xff0c;但不要忘记&…

[附源码]Python计算机毕业设计电子工厂进销存管理系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

为什么要把测试环境的告警当成生产环境的告警处理?是一个哲学问题,还是一个技术问题?...

开发不愿意了一个后端服务通常有三个环境&#xff1a;测试环境&#xff0c;预发布环境&#xff0c;生产环境。运维在给测试环境增加告警规则和告警路由时&#xff0c;开发人员反对。这很容易理解&#xff0c;如果真把告警规则配置到测试环境&#xff0c;他们可能无时不刻地收到…

Web GIS开发教程

Web GIS开发教程 非程序员的基本 Web GIS 开发 课程英文名&#xff1a;Web GIS development course 此视频教程共4.0小时&#xff0c;中英双语字幕&#xff0c;画质清晰无水印&#xff0c;源码附件全 下载地址 课程编号&#xff1a;355 百度网盘地址&#xff1a;https://p…

杭州联合银行 x 袋鼠云:打造智能标签体系,助力银行大零售业务转型

“智能标签平台上线后&#xff0c;支行及业务部门已创建多个客群用于营销&#xff0c;为我行客户精细化管理打下了良好基础。” 2021 年&#xff0c;联合银行就已搭建了大数据基础平台&#xff0c;围绕平台搭建了数据研发平台、大数据调度平台及大数据服务平台&#xff0c;提高…

(附源码)Python飞机票销售系统 毕业设计 141432

摘 要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&#xff0c;科学化的管理&#xff0c;使信息存…

Vue组件的嵌套关系,父组件传递子组件 ,事件总线,Provide,inject,作用域插槽,具名插槽非props的attribute ,子组件传递父组件

组件化 – 组件间通信 认识组件的嵌套 ◼ 前面我们是将所有的逻辑放到一个App.vue中:  在之前的案例中,我们只是创建了一个组件App;  如果我们一个应用程序将所有的逻辑都放在一个组件中,那么这个组件就会变成非常的臃 肿和难以维护;  所以组件化的核心思想应该是对…