RabbitMQ学习04

news2024/12/31 6:48:49

文章目录

    • 发布确认
      • 1. 发布确认的原理
      • 2. 发布确认的策略
        • 2.1.开启发布确认的方法
        • 2.2.单个确认
        • 2.3.批量确认发布
        • 2.4.异步确认发布
        • 2.5.如何处理异步未确认消息
        • 2.6 总结:

发布确认

1. 发布确认的原理

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
    confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2. 发布确认的策略

2.1.开启发布确认的方法

    发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

Channel channel  = connection.createChannel();
channel.confirmSelect();
2.2.单个确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
    这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

    /**
     * 1.单个发布确认
     * @throws Exception
     */
    public static void publishMessageIndividually() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:455ms

2.3.批量确认发布

    上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
     * 批量发布确认
     */
    public static void publishMessageBatch() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;


        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            if (i % batchSize == 99){
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("消息发送成功");
                }
            }
        }
        
        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:62ms

2.4.异步确认发布

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述

/**
     * 异步发布确认
     */

    public static void publishMessageAsync() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
            System.out.println("确认消息" + deliveryTag);
        };
        /**
         * 1.消息的表示
         * 2.是否为批量
         */
        ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
            System.out.println("未确认消息:" + deliveryTag);
        };
        //准备消息监听器
        /**
         * 1.成功的消息
         * 2.失败的消息
         */
        channel.addConfirmListener(ackCallback,nackCallback);


        //批量发布消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());

        }

        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:28ms

2.5.如何处理异步未确认消息

    最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

    /**
    * 异步发布确认
    */

   public static void publishMessageAsync() throws Exception{

       Channel channel = RabbitMQUtils.getChannel();
       //队列的声明
       String queueName = UUID.randomUUID().toString();
       channel.queueDeclare(queueName,true,false,false,null);
       //开启发布确认
       channel.confirmSelect();
       //开启时间
       long begin = System.currentTimeMillis();
       /**
        * 线程安全有序的一个哈希表  适用于高并发场景
        * 1.他能轻松的将序号与消息关联
        * 2.批量删除条数  只要给到序号
        * 3.支持高并发(多线程)
        *
        */
       ConcurrentSkipListMap<Long,String> outStandingConfirms =
               new ConcurrentSkipListMap<>();
       
       ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
           if (multiple){
               //2.删除到已经确认的消息,剩下的就是未确认的消息
               ConcurrentNavigableMap<Long, String> confirmed =
                       outStandingConfirms.headMap(deliveryTag);
               confirmed.clear();
           }else {
               outStandingConfirms.remove(deliveryTag);
           }
           System.out.println("确认消息" + deliveryTag);
       };
       /**
        * 1.消息的表示
        * 2.是否为批量
        */
       ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
           //3.打印未确认的消息
           String message = outStandingConfirms.get(deliveryTag);
           System.out.println("未确认消息:" + message);
       };
       //准备消息监听器
       /**
        * 1.成功的消息
        * 2.失败的消息
        */
       channel.addConfirmListener(ackCallback,nackCallback);


       //批量发布消息
       for (int i = 0; i < MESSAGE_COUNT; i++) {
           String message = i + "";
           channel.basicPublish("",queueName,null,message.getBytes());
           // TODO: 1.记录下所有要发送的消息,消息的总和
           outStandingConfirms.put(channel.getNextPublishSeqNo(),message);

       }

       //结束时间
       long end = System.currentTimeMillis();

       System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
   }
2.6 总结:

单独发布消息:
同步等待确认,简单,但吞吐量非常有限。
批量发布消息:
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1147498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Cloud Alibaba 教程 Fegin 篇

Spring Cloud Alibaba 教程 | Feign 篇 写在前面的话&#xff1a; 本笔记在参考网上视频以及博客的基础上&#xff0c;只作为个人学习笔记&#xff0c;如有侵权联系删除&#xff0c;谢谢&#xff01; 1、Feign替代RestTemplate ​ 1.1 引入依赖 <!-- Feign 客户端依赖 --&…

StringBuffer类提供针对字符的操作方法

StringBuffer类是Java中用于操作字符串的一个类&#xff0c;提供了许多针对字符的操作方法&#xff0c;例如&#xff1a; append()&#xff1a;用于在字符串末尾添加字符或字符串。 insert()&#xff1a;用于在字符串的指定位置插入字符或字符串。 delete()&#xff1a;用于删…

Spring中简单的获取Bean对象(对象装配)

获取Bean对象也叫做对象装配&#xff0c;是把对象取出来放到某个类中&#xff0c;有时候也叫对象注入&#xff01; 对象装配&#xff08;对象注入&#xff09;更加简单的读取Bean&#xff08;是从Spring容器中读取某个对象放到当前类里面&#xff09;的实现方法有以下3种&…

06 MIT线性代数-线性无关,基和维数Independence, basis, and dimension

1. 线性无关 Independence Suppose A is m by n with m<n (more unknowns than equations) Then there are nonzero solutions to Ax0 Reason: there will be free variables! A中具有至少一个自由变量&#xff0c;那么Ax0一定具有非零解。A的列向量可以线性组合得到零向…

【AD9361 数字接口CMOS LVDSSPI】C 并行数据之LVDS

接上一部分&#xff0c;AD9361 数字接口CMOS &LVDS&SPI 目录 一、LVDS模式数据路径和时钟信号LVDS模式数据通路信号[1] DATA_CLK[2] FB_CLK[3] Rx_FRAME[4] Rx_D[5&#xff1a;0][5] Tx_FRAME[6]Tx_D[5&#xff1a;0][7] ENABLE[8] TXNRX系列 二、LVDS最大时钟速率和信…

附录B 其他第三方软件移植(FTP、OpenSSH、GDB)

目录 开发板 FTP 服务器移植与搭建vsftpd 源码下载vsftpd 移植vsftpd 服务器测试配置vsftpd添加新用户Filezilla 连接测试 开发板 OpenSSH 移植与使用OpenSSH 简介OpenSSH 移植OpenSSH 源码获取移植zlib 库移植openssl 库移植openssh 库 openssh 设置openssh 使用ssh 登录scp 命…

opencv4.x通过cmake编译带cuda

首选确定目标&#xff0c;需要编译的是opencv4.5.5带第三方库&#xff0c;带cuda的版本&#xff0c;使用vs编译器&#xff0c;编译releasedebug版本。 需要先安装好cmake cuda cudnn等基础依赖&#xff0c;并且确保安装好vs的编译器&#xff0c;并且大小版本都符合实际要求。 …

【C++的OpenCV】第十四课-OpenCV基础强化(三):Mat元素的访问之data和step属性

&#x1f389;&#x1f389;&#x1f389; 欢迎来到小白 p i a o 的学习空间&#xff01; \color{red}{欢迎来到小白piao的学习空间&#xff01;} 欢迎来到小白piao的学习空间&#xff01;&#x1f389;&#x1f389;&#x1f389; &#x1f496; C\Python所有的入门技术皆在 我…

信息系统项目管理师教程 第四版【第4章-信息系统管理-思维导图】

信息系统项目管理师教程 第四版【第4章-信息系统管理-思维导图】

由一个单例模式引发的思考-holder类方式

前言&#xff1a; 最近在看《Java并发编程实践》&#xff0c;里面提到了一种实现单例模式的方式&#xff0c;并大致说明了机制&#xff0c;但仍不是很清晰&#xff0c;今日有空&#xff0c;查阅相关书籍&#xff0c;尝试解释其中道理。 单例模式&#xff1a; 单例模式是一种常…

队列(8.6)

目录 2.队列 2.1队列的概念及结构 2.2队列的实现 2.2.1初始化队列 2.2.2队尾入队列 2.2.3队头出队列 2.2.4获取队列头部元素 2.2.5 销毁队列 3.栈和队列面试题 225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 232. 用栈实现队列 - 力扣&#xff08;LeetC…

【送书福利-第二十一期】《ChatGPT进阶:提示工程入门》

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究生。公粽号&#xff1a;程序员洲洲。 &#x1f388; 本文专栏&#xff1a;本文…

正点原子嵌入式linux驱动开发——Linux 多点电容触摸屏

随着智能手机的发展&#xff0c;电容触摸屏也得到了飞速的发展。相比电阻触摸屏&#xff0c;电容触摸屏有很多的优势&#xff0c;比如支持多点触控、不需要按压&#xff0c;只需要轻轻触摸就有反应。ALIENTEK的三款RGB LCD屏幕都支持多点电容触摸&#xff0c;本章就以ATK7016这…

Spring Cloud Alibaba Seata 实现 SAGA 事物

Seata 是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式&#xff0c;为用户打造一站式的分布式解决方案 Seata 官网&#xff1a;https://seata.io/zh-cn/ Spring Cloud Alibaba 官…

【Java网络原理】 六

本文主要介绍了网络层的IP协议/NAT机制/IPv6的由来以及在数据链路层涉及到的以太网协议和DNS域名解析系统 一.网络层 1.IP协议 各个字段所表示的含义 >4位版本号 用来表示IP协议的版本&#xff0c;现在只有两个版本IPv4 &#xff0c;IPv6 >4位首部长度 IP报头可变&…

【自然语言处理】【长文本处理】RMT:能处理长度超过一百万token的Transformer

相关博客 【自然语言处理】【长文本处理】RMT&#xff1a;能处理长度超过一百万token的Transformer 【自然语言处理】【大模型】MPT模型结构源码解析(单机版) 【自然语言处理】【大模型】ChatGLM-6B模型结构代码解析(单机版) 【自然语言处理】【大模型】BLOOM模型结构源码解析(…

AI直播换脸——DeepFaceLab 3.0模型训练与微调

前言 DeepFaceLab是一种基于深度学习的人脸合成和转换工具。它使用了深度神经网络来分析和修改图像中的人脸部分&#xff0c;可以实现将一个人的脸部特征应用到另一个人的照片上&#xff0c;或者进行面部表情、年龄、性别等特征的变换。 DeepFaceLab具备一系列核心功能&#x…

Matplotlib详解(plt 和ax分别是什么)

Matplotlib中的plt 和 ax 分别是什么&#xff1f; 概念引入两种绘图方式的区别subplot 绘制Matplotlib 常见组件设置整理4.1 设置显示中文字体4.2 设置标题4.3 边框的显示问题4.4 图例设置&#xff08;legend&#xff09;4.5 图形与边框之间的留白控制4.6 设置双坐标轴4.7 坐标…

Java毕业设计 SpringBoot 新能源充电桩管理系统

Java毕业设计 SpringBoot 新能源充电桩管理系统 SpringBoot 新能源充电桩管理系统 功能介绍 管理员 登录 验证码 注册 系统用户管理 普通用户管理 通知公告管理 留言管理 充电站管理 充电桩管理 充电桩预约 充电管理 订单管理 修改密码 普通用户 登录 修改个人资料 通知公告…

椭圆曲线在SM2中的应用(三)

一、SM2加密运算 1.1加密原始数据 SM2加密运算首先是用户A对数据加密,用户A拥有原始数据 椭圆曲线系统参数长度为klen比特的消息M公钥Pb椭圆曲线系统参数,已经在 椭圆曲线参数(二)中详细介绍;M就是需要加密消息,长度为klen; 1.1.1 公钥Pb的计算方式 公钥Pb=dBG,其中…