RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现

news2025/1/11 19:41:36

概念:虽然我们可以设置队列和队列中的消息持久化,但任然存在消息在持久化的过程中,即在写入磁盘的过程中,消息未完全写入,然后服务器宕机导致消息丢失的情况,发布确认就是为了解决这种情况的概念,在消息完全写入磁盘后才确认消息完全持久化了

1、发布确认模式:

(1)单个确认发布模式(简单,但吞吐量有限)

(2)批量确认发布模式(简单,吞吐量合理,但出现问题很难找出是那条消息出现的问题)

(3)异步确认发布模式(最佳性能和资源使用,在出现错误的情况下能很好的控制,推荐使用)

2、实现开启发布确认

在生产者的代码中在channel调用confirmSelect方法,即channel.confirmSelect()

注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

https://blog.csdn.net/m0_64284147/article/details/129465871

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Task02 {

    //队列名称(用于指定往哪个队列接收消息)
    public  static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        //开启发布确认
        channel.confirmSelect();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台读取要发送的信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 用信道对消息进行发布
             * 第一个参数:发送到哪个交换机
             * 第二个参数:路由的Key值是哪个,本次是队列名
             * 第三个参数:其他参数信息
             * 第四个参数:发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送成功:" + message);
        }
    }

}

3、新建包,用于装实现确认发布模式的代码

(1)新建一个名为confirm的包,用于装发布确认的代码

效果图:

(2)新建一个名为ConfirmMessage的类

4、单个确认发布模式

单个确认发布是一种同步确认发布的方式,在发布一个消息后并且该条消息被确认发布了,后续的消息才能继续发布,不过这种确认方式的最大缺点就是发布速度特别慢

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * 发布确认模式:
 * 1、单个确认发布模式
 * 2、批量确认发布模式
 * 3、异步确认发布模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //1、单个确认发布模式
        ConfirmMessage.publishMessageIndividually();

    }

    public static void publishMessageIndividually() throws  Exception{
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //单个消息发布确认
            channel.waitForConfirms();
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单个确认消息,耗时" + (end - begin) + "ms");
    }

}

运行代码,查看单个确认发布模式消耗的时间

5、批量确认发布模式

批量确认发布是一种能极大的提高吞吐量的发布模式,在发布一批消息后一起确认,不过这种确认方式的缺点是当发送故障导致发布出现问题时,不知道是哪个消息出现的问题

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{
        
        //2、批量确认模式
        ConfirmMessage.publishMessageBatch();

    }

    public static void publishMessageBatch() throws  Exception{
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //达到100条消息的时候,批量确认一次
            if(i % batchSize == 0) {
                //批量消息发布确认
                channel.waitForConfirms();
            }

        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
    }

}

运行代码,查看批量确认发布模式消耗的时间

6、异步确认发布模式

(1)代码实现

异步确认发布实现逻辑比上面两种要复杂,但性价比高,无论是可靠性还是效率都非常突出,异步确认发布通过回调函数来达到消息可靠性传递,消息的结构类似于map,都是key-value的结构,当相应的消息被消费了或消费失败了,都可以通过对应的key值来确认消费或消费失败的是哪一条消息,所以可靠性很高

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步批量确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //3、异步批量确认模式
        ConfirmMessage.publishMessageAsync();

    }

    //异步发布确认
    public static void  publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * 消息确认成功回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback ackCallback = (deliveryTag,  multiple) -> {
            System.out.println("确认的消息:" + deliveryTag);
        };

        /**
         * 消息确认失败回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback nackCallback = (deliveryTag,  multiple) -> {
            System.out.println("未确认的消息:" + deliveryTag);
        };

        /**
         * 消息监听器,用于监听消息发送是否成功
         * 第一个参数:消息确认成功回调函数
         * 第二个参数:消息确认失败回调函数
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms");

    }

}

(2)运行代码,查看异步确认发布模式消耗的时间

这里因为是异步的,即代码执行完并输出耗时时间了,但消息监听器还在运行,所以还在时间输出后还在输出确认的消息

(3)处理异步未确认消息

已确认的消息没必要处理,而未确认的消息需要进行重新入队的处理,但由上述步骤(2)的效果图可看出程序在执行完后监听器还在监听消息是否确认成功,而要怎么做才能在程序执行完后再处理监听器监听出来未确认的消息呢?最好的解决方案便是把未确认的消息放在一个基于内存的能被发布线程访问到的队列里,例如就用ConcurrentSkipListMap这个集合在confirm(发布确认)、 callbacks(回调)与发布线程之间进行消息的传递;实现的思路是先用ConcurrentSkipListMap记录发送的所有消息,然后监听器监听消息,确认消息成功后会执行消息确认成功的回调函数,而回调函数执行删除ConcurrentSkipListMap集合里当前被确认的消息的操作,最后ConcurrentSkipListMap里剩下的就是未确认成功的消息

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步批量确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //3、异步批量确认模式
        ConfirmMessage.publishMessageAsync();

    }

    //异步发布确认
    public static void  publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下
         * 功能:
         * 1、轻松的把序号(key)与消息(value)进行关联
         * 2、只要给了序号(key)就能批量删除条目
         * 3、支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long,String> outStandingConfirms = new ConcurrentSkipListMap<>();

        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * 消息确认成功回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback ackCallback = (deliveryTag,  multiple) -> {
            //删除队列里所有已经确认的消息,剩下的就是未确认的消息
            if(multiple) {
                //multiple为true时将一次性ack所有小于deliveryTag的消息,headMap是用于获取第一个key到传入key的所有的key
                ConcurrentNavigableMap<Long, String> confirmed = outStandingConfirms.headMap(deliveryTag);
                System.out.println("确认的消息:" + deliveryTag);
                confirmed.clear();
            }else {
                outStandingConfirms.remove(deliveryTag);
            }
        };

        /**
         * 消息确认失败回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback nackCallback = (deliveryTag,  multiple) -> {
            //打印未确认的消息
            String message = outStandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息:" + message + "未确认消息的tag:" + deliveryTag);
        };

        /**
         * 消息监听器,用于监听消息发送是否成功
         * 第一个参数:消息确认成功回调函数
         * 第二个参数:消息确认失败回调函数
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //收集所有发送的消息(channel.getNextPublishSeqNo()用于获取下一次发布的序号)
            outStandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms");

    }

}

效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/722591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端技术搭建五子棋游戏(内含源码)

The sand accumulates to form a pagoda ✨ 写在前面✨ 功能介绍✨ 页面搭建✨ 样式设置✨ 逻辑部分 ✨ 写在前面 上周我们实通过前端基础实现了拼图游戏&#xff0c;今天还是继续按照我们原定的节奏来带领大家完成一个五子棋游戏&#xff0c;功能也比较简单简单&#xff0c;也…

探究工业运营中的三大工具:根因分析、过程优化和预测性维护

在工业运营领域&#xff0c;根本原因分析、过程优化工具和预测性维护正在彻底改变维护实践的方式。这些工具的战略性组合使得制造工厂能够提升实践水平、提高生产力&#xff0c;并实现持续的成功。本文将以PreMaint为基础&#xff0c;探讨这些工具之间的差异&#xff0c;以及如…

添加数据维度并使用Python绘制5D散点图

大家好&#xff0c;散点图通常用于比较2个不同特征以确定它们之间的关系&#xff0c;散点图也可以添加更多的维度来反映数据&#xff0c;例如使用颜色、气泡大小等。在本文中&#xff0c;将介绍如何绘制一个五维的散点图。 数据集&#xff1a; https://github.com/checkming0…

物联网工控屏在ROV布放回收系统中的应用

一、背景 1. 深海作业装备制造行业 随着我国经济与科学技术的发展壮大&#xff0c;作为“蓝色粮仓”的海洋能源开采建设逐渐成为一项重要的事业。深海作业装备则成为海洋能源开采的必备和关键工具&#xff0c;其性能和可靠性须得以保障。也因此&#xff0c;开发性能表现更优、…

vs背景和主题设置(一看就会,简单实用)

VS背景设置 目录&#xff1a;一、背景插件下载二、主题切换三、调整成自己喜欢的界面 目录&#xff1a; 学习编程是个漫长的过程&#xff0c;设置一个自己喜欢的背景&#xff0c;可以使自己编写代码的时候更舒服。马上行动起来&#xff0c;设置一个自己喜欢的背景吧。 分享一下…

J2EE集合框架List

目录 一.UML ①集合类图 ②线下教育平台用例图 二.List集合特点 ①学集合框架就是了解容器的数据结构&#xff08;增删改查&#xff09; ②有序的 可重复的 三.遍历方式 ① foreach ② iterator 迭代器 ③ for 四.LinkedList ①对比ArrayList是数据结构 Linkedlist…

【回溯算法part01】| 理论基础、77.组合

&#x1f388;回溯算法理论基础 回溯算法的本质是穷举&#xff0c;并不是一个高效的算法&#xff0c;但是有的题必须要用回溯法&#xff0c;如&#xff1a; 组合问题&#xff1a;N个数里面按一定规则找出k个数的集合切割问题&#xff1a;一个字符串按一定规则有几种切割方式子集…

学习 | 药品GMP认证和药厂GMP认证是怎么回事?

可能本身从事药品生产经营的朋友会知道&#xff0c;药品的GMP认证是怎么一回事&#xff0c;但是对于一些想要进入药品生产行业的企业&#xff0c;例如化工产品想进入原料药生产、药用辅料生产&#xff0c;塑料等材料制作商进入药品包装材料生产&#xff0c;只是听这说听那说&am…

【STL】vector快速上手

目录 一&#xff0c;vector的模板特性 二&#xff0c;vector基本使用 1. 构造函数 2. operator 赋值 3. vector——增删 A, 尾插 && 尾删 B&#xff0c;insert C, erase 4. 访问vector 遍历vector中元素&#xff1a; 法一&#xff1a;数组[]法 || at法…

HashMap-JDK8源码讲解及常见面试题

数据结构 红黑树 在JDK8中&#xff0c;优化了HashMap的数据结构&#xff0c;引入了红黑树。即HashMap的数据结构&#xff1a;数组链表红黑树。HashMap变成了这样。 为什么要引入红黑树 1、主要是为了提高HashMap的性能&#xff0c;即解决发生hash冲突后&#xff0c;因为链…

一些总结-C++

1.spdlog 需要安装spdlog库&#xff0c;然后连接器增加-lspdlog 不需要复制头文件到目录&#xff0c;安装到机器上之后&#xff0c;从系统目录加载头文件即可。 部分用法&#xff1a; 2.redis 需要安装hiredis库&#xff0c;链接器-lhiredis 不需要复制头文件到目录&#…

Java Web HTTP 23.7.4

HTTP 1&#xff0c;Web概述 1.1 Web和JavaWeb的概念 Web是全球广域网&#xff0c;也称为万维网(www)&#xff0c;能够通过浏览器访问的网站。 在我们日常的生活中&#xff0c;经常会使用浏览器去访问百度、京东等这些网站&#xff0c;这些网站统称为Web网站。如下就是通过浏…

Linux学习之i节点(inode)和数据块操作

touch testfile创建一个空文件testfile。 stat testfile可以看一下文件的inode信息。 ls -li testfile看一下testfile相关信息。 上图中922208是inode号码&#xff0c;也称为inode编号&#xff0c;若是使用ls -i testfile就可以直接看到inode号码。 du -h testfile可以看…

【Unity3D 问题总结】☀️ | 解决LayoutGroup配合Content Size Fitter使用时发生子成员位置错乱问题

&#x1f3ac; 博客主页&#xff1a;https://xiaoy.blog.csdn.net &#x1f3a5; 本文由 呆呆敲代码的小Y 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f384; 学习专栏推荐&#xff1a;Unity系统学习专栏 &#x1f332; 游戏制作专栏推荐&#xff1a;游戏制作 &…

Android、iOS快速全球化工具

动机 在进行移动端全球化的时候&#xff0c;我们需要根据语言类型准备格式相同&#xff0c;文本不同的好多个文件&#xff0c;如果一个一个翻译显然很浪费时间&#xff0c;如果整篇复制到Google翻译通常翻译出来的文本是没办法直接用的&#xff0c;所以我通过有道云API实现了一…

AIGC:【LLM(三)】——JARVIS:连接ChatGPT和HuggingFace解决AI问题

文章目录 0.摘要1.引言2.相关工作3.HuggingGPT3.1 任务规划3.2 模型选择3.3 任务执行3.4 响应生成 4.限制5.结论6.参考资料 0.摘要 解决具有不同领域和模态的复杂人工智能任务是通往人工通用智能的关键骤。尽管存在丰富的适用于不同领域和模态的人工智能模型&#xff0c;但它们…

Linux系统之iostat命令的基本使用

Linux系统之iostat命令的基本使用 一、iostat命令介绍二、iostat命令帮助1.1 iostat的帮助信息1.2 iostat的选项解释 三、iostat命令的基本使用3.1 查看iostat工具版本3.2 直接使用iostat命令3.3 间隔5秒查看3次信息3.4 只查看磁盘状态3.5 以k或M为单位显示信息 四、查看磁盘I/…

JMM 规范

JMM是Java Memory Model&#xff08;Java 内存模型&#xff09;的缩写&#xff0c;是Java虚拟机规范中定义的一套规则&#xff0c;用来规范Java程序在多线程环境下的内存访问方式。其主要作用是保证多线程之间的数据可见性、有序性和原子性。JMM规范定义了一些程序员和JVM实现者…

机器学习(ML)策略

目录 1、正交化的概念 2、单一数字评估指标&#xff08;Single number evaluation metric&#xff09; 3、训练/开发/测试集划分 4、迁移学习 5、多任务学习 6、端到端深度学习 1、正交化的概念 正交化是机器学习中一种常用的数据预处理技术&#xff0c;用于减少特征之间…

IMX6ull SPI 协议

一 SPI 简介 1.1 SPI SPI 全称是 Serial Perripheral Interface&#xff0c;也就是串行外围设备接口。 SPI 是 Motorola 公司推出的一种同步串行接口 技术&#xff0c;是一种高速、全双工的同步通信总线&#xff0c; SPI 时钟频率相比 I2C 要高很多&#xff0c;最高可以工作 …