RabbitMQ - 发布确认

news2025/1/22 21:52:50

RabbitMQ - 发布确认

  • 发布确认逻辑
  • 发布确认的策略
    • 单个确认发布
    • 批量确认发布
    • 异步确认发布

发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

发布确认的策略

开启发布确认的方法:

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

//开启发布确认
channel.confirmSelect();

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

/**
 * 单个发送
 */
public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");

}

批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
 * 批量
 */
public static void publishMessageBatch() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();
    //批量确认消息大小
    int batchSize = 100;
    //未确认消息个数
    int outstandingMessageCount = 0;
    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        outstandingMessageCount++;
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }
    }
    //为了确保还有剩余没有确认消息 再次确认
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。

在这里插入图片描述

如何处理异步未确认消息?

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

以上 3 种发布确认速度对比 :

  • 单独发布消息
    同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息
    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
  • 异步处理
    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/618488.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么时候 MySQL 查询会变慢?

前面几篇文章和小伙伴们聊的基本上都是从索引的角度去优化 MySQL 查询&#xff0c;然而&#xff0c;索引创建的好&#xff0c;并不意味着查询就一定快&#xff0c;影响查询效率的因素特别多&#xff0c;今天我们就来聊一聊这些可能影响到查询的因素。 1. 查询流程 开始今天的…

欢迎来到新世界

&#xff08;1&#xff09; 我去年对技术的发展是比较灰心的&#xff1a; 云原生&#xff1a;技术一直动荡&#xff0c;SOA->Servless、Docker->WASM、GitOpsCICDDevOps云计算&#xff1a;在中国从公有云走向了私有云&#xff0c;乃至金融云、国资云、政务云等等N种云Saa…

圆满收官!飞桨黑客松第四期高手云集,四大赛道开源贡献持续升级

2023年2月20日PaddlePaddle Hackathon 飞桨黑客马拉松&#xff08;以下简称为“飞桨黑客松”&#xff09;第四期活动发布后&#xff0c;开发者们反响热烈&#xff0c;围绕四大赛道展开了激烈角逐&#xff0c;超过2000位社区开发者参与到飞桨黑客松中&#xff0c;完成800余次任务…

直播教学签到功能(互动功能接收端JS-SDK)

功能概述 本模块主要用于接收和处理讲师、助教和管理员等用户发起的签到操作。 初始化及销毁 在实例化该模块并进行使用之前&#xff0c;需要对SDK进行初始化配置&#xff0c;详细见参考文档。 在线文件引入方式 // script 标签引入&#xff0c;根据版本号引入JS版本。 <…

ChatGPT 和 Bing Chat两者之间的比较,看完你就懂了

目录 一、ChatGPT 1.1 介绍 1.2 特点 1.3 使用场景 二、 Bing Chat 2.1 介绍 2.2 功能特点 2.3 使用场景 三、对比 一、ChatGPT 1.1 介绍 ChatGPT是一款基于人工智能技术的语言模型应用&#xff0c;由美国人工智能研究实验室OpenAI在2022年11月30日推出。该模型是一种…

【深度学习】跌倒识别 Yolov5(带数据集和源码)从0到1,内含很多数据处理的坑点和技巧,收获满满

文章目录 前言1. 数据集1.1 数据初探1.2 数据处理1.3 训练前验证图片1.4 翻车教训和进阶知识 2. 训练3.效果展示 前言 又要到做跌倒识别了。 主流方案有两种&#xff1a; 1.基于关键点的识别&#xff0c;然后做业务判断&#xff0c;判断跌倒&#xff0c;用openpose可以做到。…

干货分享 | CloudQuery 数据保护能力之动态数据脱敏!

在企业数字化转型的过程中&#xff0c;尤其随着互联网、云计算、大数据等信息技术与通信技术的迅猛发展&#xff0c;海量数据在各种信息系统上被存储和处理&#xff0c;其中包含大量有价值的敏感数据&#xff0c;这意味着数据泄露的风险也不断增加。 数据泄露可能由各种因素引…

【项目】实现web服务器

目录 1.需要实现的项目需求&#xff08;web服务器的工作原理&#xff09; 2.实现过程&#xff1a; 1.编写套接字 2.多线程的代码和任务类 3.文件描述符的处理方法的框架 4.读取请求 4.1.读取请求行 4.2.读取请求报头 4.3.分析请求行和报头 请求行的方法、URI、版本…

桌面图标删不掉?试试这几个解决办法!

案例&#xff1a;我想对电脑桌面上的应用进行删除&#xff0c;但是我怎么删也删不掉应用的图标&#xff1f;有人知道这是怎么回事吗&#xff1f;怎样才能成功删除桌面图标&#xff1f;求一个解决办法&#xff01; 有时候我们可能会遇到桌面图标无法删除的困扰&#xff0c;桌面…

【已解决】Macbook pro/Macbook air 电脑过热问题(附软件下载地址)

问题&#xff1a; 今天早上一上班打开我的macbook air&#xff0c;刚开机了十来分钟&#xff0c;就觉得左上角位置特别的热&#xff0c;耳朵凑近风扇处&#xff0c;基本听不到风扇的声音&#xff0c;风扇的转速太慢&#xff0c;导致cpu温度堆积造成温度升高。 解决办法&#…

【JS】1705- 重学 JavaScript API - Fullscreen API

❝ 前期回顾&#xff1a; 1. Page Visibility API 2. Broadcast Channel API 3. Beacon API 4. Resize Observer API 5. Clipboard API 6. Fetch API 7. Performance API 8. WebStorage API 9. WebSockets API ❞ 本文中&#xff0c;我们将探索 Fullscreen API 的概念、使用方法…

SpringBoot+Vue 的简历招聘系统

文章目录 1、效果演示2、 前言介绍3、主要技术4 **系统设计**4.1 系统体系结构4.2开发流程设计4.3 数据库设计原则4.4 数据表 5 **系统详细设计**5.1管理员功能模块5.2用户功能模块5.3前台首页功能模块 6、源码获取 1、效果演示 2、 前言介绍 随着科学技术的飞速发展&#xff…

加速5G部署,到底该怎么做?

今天&#xff0c;第31届中国国际信息通信展&#xff08;PT展&#xff09;在北京国家会议中心圆满落幕。 这次通信展&#xff0c;在举办日期上有着特殊的意义。因为&#xff0c;今年的6月6日&#xff0c;正好是国内5G牌照正式发放的四周年纪念日。而且&#xff0c;去年大概这个时…

DETR模型转RKNN

目录 1.前言 2.准备工作 3.开始转模型 4.测试代码 5.不想转&#xff0c;直接用也可以&#xff0c;转好的给你&#xff0c;请关注评论一下 1.前言 RKNN出最新版本了&#xff0c;测试了一下&#xff0c;rk在transformer方面做了很多的工作&#xff0c;至少之前不能转的模型&am…

ReadProcessMemory可不是一个进程间通信的好方法

有时候我看到有人会使用 ReadProcessMemory 这个 API 来实现进程间通信&#xff0c;老实说吧&#xff0c;我觉得这不是一个明智的选择&#xff0c;原因有如下几条。 首先&#xff0c;你不能使用 ReadProcessMemory 来跨越安全上下文 (Security Contexts)&#xff0c;至少你需要…

SpringCloud入门实战(八)- Gateway服务网关集成

&#x1f4dd; 学技术、更要掌握学习的方法&#xff0c;一起学习&#xff0c;让进步发生 &#x1f469;&#x1f3fb; 作者&#xff1a;一只IT攻城狮 &#xff0c;关注我&#xff0c;不迷路 。 &#x1f490;学习建议&#xff1a;1、养成习惯&#xff0c;学习java的任何一个技术…

macOS Sonoma 14.0 (23A5257q) Beta1 带 OC 引导双分区黑苹果镜像

6月6日&#xff0c;在WWDC2023开发者大会上&#xff0c;苹果带来了全新Mac系统&#xff0c;命名为macOS Sonoma。该系统最大的亮点是带来了小组件&#xff0c;macOS Sonoma可以添加手机上的所有小组件&#xff0c;包括车辆小组件。 镜像下载&#xff1a; 微信公众号&#xff1…

聚观早报 | 苹果发XR头显Vision Pro;英特尔将出售部分Mobileye股票

今日要闻&#xff1a;苹果发XR头显Vision Pro&#xff1b;英特尔将出售部分Mobileye股票&#xff1b;华为已申请注册两枚NETGPT&#xff1b;瑞幸咖啡全国门店数量突破1万家&#xff1b;iPhone15系列本月将在郑州富士康量产 苹果发XR头显Vision Pro 6 月 6 日&#xff0c;苹果 …

从零手写操作系统之RVOS环境搭建-01

从零手写操作系统之RVOS环境搭建-01 背景介绍操作系统的定义操作系统的分类典型的 RTOS 介绍课程系统RVOS简介 Hello WorldQEMU介绍QEMU-virt 地址映射 系统引导引导程序要做哪些事情如何判断当前hart是不是第一个hart?如何初始化栈? 如何在屏幕输出Hello World通过串口输出U…

基于java SpringBoot框架和Vue的智能停车场管理系统

近年来&#xff0c;中国不仅综合国力大幅提升&#xff0c;国民经济也快速增长&#xff0c;推动了中国汽车工业的发展。技术的飞速发展逐渐降低了汽车的制造成本&#xff0c;越来越受欢迎。今天&#xff0c;大多数家庭都有能力购买汽车&#xff0c;因此&#xff0c;中国城市的汽…