RabbitMQ发布确认

news2024/9/29 13:31:52

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式, 所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后, broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出, broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

发布确认的策略

开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法 。

channel.confirmSelect();

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢, 因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

/**
 * 单个分发确认
 */
public static void publicMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtil.getChannel();
    String queue_name = UUID.randomUUID().toString();
    channel.queueDeclare(queue_name,false,false,false,null);
    //开启分发确认
    channel.confirmSelect();
    long start = System.currentTimeMillis();
    for(int i=0;i<100;i++){
        String message=i+"";
        channel.basicPublish("",queue_name,null,message.getBytes(StandardCharsets.UTF_8));
        boolean flag = channel.waitForConfirms();
        if(flag){
            System.out.println("消息分发成功:"+message);
        }
    }
    long end = System.currentTimeMillis();
    System.out.println("耗时:"+(end-start)+"ms");
}

批量确认发布

    /**
     * 批量分发确认
     */
    public static void publicMessageBatch() throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        String queue_name = UUID.randomUUID().toString();
        channel.queueDeclare(queue_name,false,false,false,null);
        //开启分发确认
        channel.confirmSelect();
        long start = System.currentTimeMillis();
        int outstandingMessageCount = 0;
        for(int i=0;i<100;i++){
            String message=i+"";
            channel.basicPublish("",queue_name,null,message.getBytes(StandardCharsets.UTF_8));
            outstandingMessageCount++;
            if(i%10==0) {
                channel.waitForConfirms();
                outstandingMessageCount=0;
            }
        }
        if(outstandingMessageCount>0) channel.waitForConfirms();
        long end = System.currentTimeMillis();
        System.out.println("耗时:"+(end-start)+"ms");
    }

缺点:不知道哪一个消息出现错误。
相比于单个确认发布耗时短

异步分发确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
image.png

    /**
     * 异步分发确认
     * @throws Exception
     */
    public static void publicMessageAsync() throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        String queue_name = UUID.randomUUID().toString();
        channel.queueDeclare(queue_name,false,false,false,null);
        //开启分发确认
        channel.confirmSelect();
        //消息分发成功回调 第一个参数 消息序列号,第二个参数 是否是批量分发
        ConfirmCallback askCallback=(deliveryTag,multiple)->{
            System.out.println("消息成功分发:"+deliveryTag);
        };
        //消息分发失败回调 第一个参数 消息序列号,第二个参数 是否是批量分发
        ConfirmCallback nackCallback=(deliveryTag,multiple)->{
            System.out.println("消息分发失败:"+deliveryTag);
        };
        //添加监听器 第一个参数成功时的回调,第二个参数失败时的回调
        channel.addConfirmListener(askCallback,nackCallback);
        long start = System.currentTimeMillis();
        int outstandingMessageCount = 0;
        for(int i=0;i<100;i++){
            String message=i+"";
            channel.basicPublish("",queue_name,null,message.getBytes(StandardCharsets.UTF_8));
            outstandingMessageCount++;
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时:"+(end-start)+"ms");
    }

处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列

    public static void publicMessageAsync() throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        String queue_name = UUID.randomUUID().toString();
        channel.queueDeclare(queue_name,false,false,false,null);
        //开启分发确认
        channel.confirmSelect();
        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目 只要给到序列号
         * 3.支持并发访问
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        //消息分发成功回调
        ConfirmCallback askCallback=(deliveryTag,multiple)->{
            if (multiple) {
                //返回的是小于等于当前序列号的未确认消息 是一个 map
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
                //清除该部分未确认消息
                confirmed.clear();
            }else{
                //只清除当前序列号的消息
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("消息成功分发:"+deliveryTag);
        };
        //消息分发失败回调
        ConfirmCallback nackCallback=(deliveryTag,multiple)->{
            System.out.println("消息分发失败:"+deliveryTag);
        };
        //添加监听器
        channel.addConfirmListener(askCallback,nackCallback);
        long start = System.currentTimeMillis();
        for(int i=0;i<100;i++){
            String message=i+"";
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
            channel.basicPublish("",queue_name,null,message.getBytes(StandardCharsets.UTF_8));
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时:"+(end-start)+"ms");
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1407549.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【每日一题】最长交替子数组

文章目录 Tag题目来源解题思路方法一&#xff1a;双层循环方法二&#xff1a;单层循环 写在最后 Tag 【双层循环】【单层循环】【数组】【2024-01-23】 题目来源 2765. 最长交替子数组 解题思路 两个方法&#xff0c;一个是双层循环&#xff0c;一个是单层循环。 方法一&am…

Likeshop单商户SaaS商城源码系统-商家用过都说太香啦!

在互联网快速发展的时代&#xff0c;拥有一个个性化、功能丰富的在线商城是企业拓展市场、提高用户粘性的重要手段。 我是一名电商从业者&#xff0c;同时也是一个热衷于DIY的人&#xff0c;我总喜欢在自己的店铺中加入自己的一些想法和创意。然而&#xff0c;一般的电商平台无…

【思路合集】talking head generation+stable diffusion

1 以DiffusionVideoEditing为baseline&#xff1a; 改进方向 针对于自回归训练方式可能导致的漂移问题&#xff1a; 训练时&#xff0c;在前一帧上引入小量的面部扭曲&#xff0c;模拟在生成过程中自然发生的扭曲。促使模型查看身份帧以进行修正。在像VoxCeleb或LRS这样的具…

EasyX的安装与使用(VisualStudio C++免费绘图库)

EasyX Graphics Library 是针对 Visual C 的免费绘图库 安装教程 安装到Visual C 2010 EasyX 安装完毕。 在VC2010中建立控制台工程 工程建好后&#xff0c;鼠标右键点击工程名&#xff0c;并选择属性 安装到Visual C 2010 EasyX 安装完毕。 安装示例程序 easyxdemo.cpp 在VC…

Vulnhub-dc4

靶场下载 https://download.vulnhub.com/dc/DC-4.zip 信息收集 判断目标靶机的存活地址: # nmap -sT --min-rate 10000 -p- 192.168.1.91 -oN port.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-21 16:36 CST Stats: 0:00:03 elapsed; 0 hosts completed (1 up…

机器学习 | 掌握Matplotlib的可视化图表操作

Matplotlib是python的一个数据可视化库&#xff0c;用于创建静态、动态和交互式图表。它可以制作多种类型的图表&#xff0c;如折线图、散点图、柱状图、饼图、直方图、3D 图形等。以渐进、交互式方式实现数据可视化。当然博主也不能面面俱到的讲解到所有内容&#xff0c;详情请…

装完32G内存条 电脑飞跃提升!

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 大家…

如何使用阿里云CDN服务?

如何使用阿里云CDN服务 一、开通阿里云CDN服务 注册自己阿里云账号&#xff0c;找到CDN服务&#xff0c;进行加速即可 二、配置域名信息 1、各配置参数的含义 添加加速域名&#xff1a; 如果需要使用CDN加速指定网站上的业务&#xff0c;则需要将该网站作为源站&#xff0…

ubuntu 20.04 使用 webrtc-streamer自动退出,报错GLIBC 问题解决方法

文章目录 前言Ubuntu 20.4中使用webrtc-streamer报错总结 前言 前端vue2 项目需要播放海康的视频流&#xff0c;本地启动起来了&#xff0c;现在需要的服务器上部署&#xff0c;服务器是Ubuntu 20.04&#xff0c;下面是部署时遇到的问题及解决方法&#xff0c;总耗时2天。 不知…

<蓝桥杯软件赛>零基础备赛20周--第16周--GCD和LCM

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周。 在QQ群上交流答疑&am…

dolphinscheduler节点二次开发需要改动的部分

dolphinscheduler节点二次开发需要改动的部分 前端 在dolphinscheduler-ui/public/images/task-icons/目录下新增两个节点的logo图片&#xff0c;一个为激活状态的一个为非激活状态的&#xff0c;如下。 修改文件dolphinscheduler-ui/src/views/projects/task/constants/task…

git bash右键菜单失效解决方法

git bash右键菜单失效解决方法 这几天重新更新了git&#xff0c;直接安装新版本后&#xff0c;右键菜单失效找不到了。找了好几个博客&#xff0c;发现都不全面&#xff0c;最后总结一下解决方法&#xff1a; &#xff08;1&#xff09;按winr&#xff0c;输入regedit打开注册…

【学网攻】 第(3)节 -- 交换机配置聚合端口

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用 前言 网络已经成为了我们生活中不可或缺的一部分&#xff0c;它连接了世界各地的人们&#xff0c;让信息和资源得以自由流动。随着互联网的发展&#xff0c;我们可以通过网络学习、工作、娱乐…

idea创建公用依赖包项目

创建parent项目 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/…

C++读取txt文件中的逐个字符

为了增加读取的灵活性&#xff0c;所以separator和filename都设置为在主函数中获取输入或者在函数中传参的视线方法 举个例子&#xff0c;txt文件如下&#xff1a; household;2;true; 首先声明一个读取数据的文件 void read_data_file(const string& filename,char se…

使用Fiddler进行弱网测试

测试APP、web经常需要用到弱网测试&#xff0c;也就是在信号差、网络慢的情况下进行测试。我们自己平常在使用手机APP时&#xff0c;在地铁、电梯、车库等场景经常会遇到会话中断、超时等情况&#xff0c;这种就属于弱网。 普通的弱网测试可以选择第三方工具对带宽、丢包、延时…

代码随想录算法训练营第29天(回溯算法05 | * 491.递增子序列 * 46.全排列 * 47.全排列 II

回溯算法part05 491.递增子序列解题思路与 90.子集II 不同的点回溯三部曲 46.全排列解题思路遇到的难点思考 47.全排列 II解题思路注意点拓展需要加深理解的点&#xff08;需复习 小总结 491.递增子序列 本题和大家刚做过的90.子集II非常像&#xff0c;但又很不一样&#xff0c…

简单实现网络编程

1. 前置知识 在学习网络编程前&#xff0c;我们需要先了解一些前置知识 1.1 客户端和服务器 在网络编程中&#xff0c;客户端和服务器是两个关键的角色。 客户端是发起连接并向服务器发送请求的一方。客户端通常是一个应用程序或设备&#xff0c;通过与服务器建立连接&…

线性表--链表--带头双向循环链表

目录 1.什么是带头双向循环链表&#xff1f; 2.实现增删查改功能&#xff1a; 2.1使用链表前必须对头节点初始化 2.2尾插 2.3尾删 2.4头插 2.5头删 2.8查找 2.7指定位置插入 2.8指定位置删除 2.9改变数据 ​编辑 2.10打印 2.11销毁 3.代码 1.什么是带头双向循环链表&…

docker 基础手册

文章目录 docker 基础手册docker 容器技术镜像与容器容器与虚拟机docker 引擎docker 架构docker 底层技术docker 二进制安装docker 镜像加速docker 相关链接docker 生态 docker 基础手册 docker 容器技术 开源的容器项目&#xff0c;使用 Go 语言开发原意“码头工人”&#x…