【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

news2025/1/10 16:36:58

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认--单条确认】

(3)编写生产者【消息确认--批量确认】

(4)编写生产者【消息确认--异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

(2)编写生产者【消息确认--单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_direct_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、发送消息
            for (int i = 0; i < 10; i++) {
                // 路由键唯一标识
                String routingKey = "error";
                if (i % 3 == 0) {
                    routingKey = "info";
                } else if (i % 3 == 1) {
                    routingKey = "warn";
                }
                String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                // 等待RabbitMQ返回ACK标识
                boolean wait = channel.waitForConfirms();
                System.out.println("RabbitMQ是否接收成功: " + wait);
                if (!wait) {
                    // 消息发送失败,则可以重新发送
                    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

(3)编写生产者【消息确认--批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class ProducerBatch {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_direct_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、发送消息
            int batchSize = 3;
            int count = 0;
            for (int i = 0; i < 10; i++) {
                // 路由键唯一标识
                String routingKey = "error";
                if (i % 3 == 0) {
                    routingKey = "info";
                } else if (i % 3 == 1) {
                    routingKey = "warn";
                }
                String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                // 批量确认
                if (count == batchSize) {
                    // 等待RabbitMQ返回ACK标识
                    channel.waitForConfirmsOrDie();
                    count = 0;
                }
                count++;
            }
        } catch (IOException e) {
            System.out.println("消息发送失败啦");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

(4)编写生产者【消息确认--异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class ProducerAsync {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // TODO 开启消息确认机制
            channel.confirmSelect();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_confirm_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // TODO 一定要先调用监听接口,在发送消息
            channel.addConfirmListener(new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag 
                            + ",批量发送多条消息multiple=" + multiple);
                }
            }, new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("RabbitMQ接收失败啦.....");
                }
            });
            for (int i = 0; i < 10; i++) {
                // 6、发送消息
                String message = "这是发布确认模式,发送的消息数据";
                channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());
            }
        } catch (IOException e) {
            System.out.println("消息发送失败啦");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/373260.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Log4j2基本使用

文章目录1. Log4j2入门2. Log4j2配置3. Log4j2异步日志4. Log4j2的性能Apache Log4j 2是对Log4j的升级版&#xff0c;参考了logback的一些优秀的设计&#xff0c;并且修复了一些问题&#xff0c;因此带 来了一些重大的提升&#xff0c;主要有&#xff1a; 异常处理&#xff0c…

ubuntu/linux系统知识(36)linux网卡命名规则

文章目录背景命名规范系统默认命名规则优势背景 很久以前Linux 操作系统的网卡设备的传统命名方式是 eth0、eth1、eth2等&#xff0c;属于biosdevname 命名规范。 服务器通常有多块网卡&#xff0c;有板载集成的&#xff0c;同时也有插在PCIe插槽的。Linux系统的命名原来是et…

基于SpringCloud的可靠消息最终一致性01:定理、解决方案和框架

在互联网发展的早期,单体架构是主流的开发模式。因为访问的用户不多,所以整个系统的结构比较简单,就像一口竖井,从上到下,一通到底,如下图所示: 图一:单体应用 随着业务复杂度的不断提升,以及用户需求的不断增加,原来单个的业务系统已经不堪重负了。就好像一个窗口前…

redis数据结构的底层实现

文章目录一.引言二.redis的特点三.Redis的数据结构a.字符串b.hashc.listd.sete.zset(有序集合)一.引言 redis是一个开源的使用C语言编写、支持网络、可基于内存亦可持久化的日志型、key-value的NoSQL数据库。 通常使用redis作为缓存中间件来降低数据库的压力&#xff0c;除此…

CV学习笔记-MobileNet

MobileNet 文章目录MobileNet1. MobileNet概述2. 深度可分离卷积&#xff08;depthwise separable convolution&#xff09;2.1 深度可分离卷积通俗理解2.2 深度可分离卷积对于参数的优化3. MobileNet网络结构4. 代码实现4.1 卷积块4.2 深度可分离卷积块4.3 MobileNet定义4.4 完…

linux下使用vscode和cmake高效管理c++项目简明教程

安装vscode及c环境配置可以参见&#xff1a;https://blog.csdn.net/fangshuo_light/article/details/123635576   首先&#xff0c;创建工程目录&#xff0c;并在vscode中打开该文件夹&#xff0c;在里面创建如下文件夹&#xff1a; include&#xff1a;用于存放.h文件src&a…

HBase 一文读懂

本文基于《尚硅谷大数据技术之HBase》编写。HBase 简介HBase定义HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。HBase数据模型HBase的数据模型同关系型数据库&#xff08;RDMS&#xff09;很类似&#xff0c;数据存储在一张表中&#xff0c;有行有列。但从HBase的…

c语言tips-大端小端存储介绍和使用union判断大小端

1. 大小端介绍 大端&#xff08;Big Endian&#xff09;和小端&#xff08;Little Endian&#xff09;是两种CPU或者计算机系统存储数据的方式。 在大端系统中&#xff0c;数据的高位字节&#xff08;MSB&#xff09;存储在内存地址的低位&#xff0c;低位字节&#xff08;LSB…

Linux系统下搭建maven环境

文章目录前述从官网下载安装包安装 maven修改maven配置修改环境变量测试前述 安装 maven 环境前&#xff0c;需要先安装 java 环境&#xff0c;如果没有安装 java 环境&#xff0c;可以参考&#xff1a;https://blog.csdn.net/weixin_45583303/article/details/118631855 从官…

maven的仓库配置、指定jdk编译版本、相关编译命令简介、scope依赖的范围以及依赖的传递性

目录 1、配置阿里云提供的镜像仓库 2、指定jdk编译版本 3、执行 Maven 的构建命令 3.1、清理操作 3.2、编译操作 3.3、测试操作 3.4、打包操作 3.5、安装操作 4、scope依赖的范围 5、依赖的传递性 5.1、概念 5.2、传递的原则 1、配置阿里云提供的镜像仓库 将下面 m…

C++STL之list的模拟实现

目录 一.list准备 二. iterator迭代器 1._list_iterator 2.begin()、end() 3.const_begin()、const_end() 4.!&& 5. && -- 6.operator* 7.operator-> 三.Modify(修改) 1.insert() 2.erase() 3.push_back() && push_front() 4.pop_bac…

MySql触发器学习

文章目录1 触发器1.1介绍1.2 创建触发器1.2 删除触发器1.3查看触发器1 触发器 1.1介绍 触发器是与表有关的数据库对象&#xff0c;指在 insert/update/delete 之前或之后&#xff0c;触发并执行触发器中定义的SQL语句集合。触发器的这种特性可以协助应用在数据库端确保数据的…

(十八)操作系统-进程互斥的软件实现方法

文章目录一、知识总览二、单标志法三、双标志先检查法四、双标志后检查法五、Peterson算法六、总结一、知识总览 二、单标志法 算法思想&#xff1a;两个进程在访问临界区后&#xff0c;会把使用临界区的权限转交给另一个进程。也就是说每个进程进入临界区的权限只能被另一个进…

Guna Charts WinForm 1.0.8 Crack

Guna Charts 16 图表 在 16 种不同的图表类型中可视化您的数据。 Guna Charts 反应灵敏 轻松响应屏幕尺寸的变化。 Guna Charts 实时图表 创建实时数据仪表板现在非常容易。 Guna Charts 混合图表类型 混合多种图表类型&#xff0c;例如条形图和折线图/面积图。 Guna Charts…

MS9122是一款USB单芯片投屏器,内部集成了USB2 0 控制器和数据收发模块、HDMI 数据接口和音视频处理模块。MS9122可以通过USB接口显示

MS9122是一款USB单芯片投屏器&#xff0c;内部集成了USB2.0 控制器和数据收发模块、HDMI 数据接口和音视频处理模块。MS9122可以通过USB接口显示或者扩展PC、智能手机、平板电脑的显示信息到更大尺寸的显示设备&#xff0c;支持HDMI视频接口。 主要功能特征 HDMI v1.4兼容 最大…

【java基础】包装类,自动装箱和自动拆箱

文章目录基本介绍包装类自动装箱自动拆箱包装类注意事项包装类比较包装器内容不可变基本介绍 有时&#xff0c;需要将int这样的基本类型转换为对象。所有的基本类型都有一个与之对应的类。 例如&#xff0c;Integer类对应基本类型int。通常&#xff0c;这些类称为包装器&#…

网上这么多IT的培训机构,我们该怎么选?

说实话&#xff0c;千万不要把这个答案放在网上来找&#xff0c;因为你只能得到别人觉得合适的或者机构的广告&#xff1b;当然个人的培训经历可以听一听的&#xff0c;毕竟不靠谱的机构也有&#xff0c;比如让你交一两万去上线上课程或者一百号来人坐一起看视频&#xff0c;这…

【django】django-simpleui配置后,后台显示空白页解决方法

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 django后台显示空白页解决方法 1. 正文 添加完simpleui以后&#xff0c;后台显示一片空白&#xff0c;一脸问号&#xff1f;&#xff1f;&#xff1f; …

MacBook Pro 休眠后五国,自动重启报错

看了网上很多情况&#xff0c;都说是系统的原因&#xff0c;2018年款的Mac已经过了保修期去过天才吧说花4k 换主板解决&#xff0c;花个几千块去解决这个问题不如折旧加钱再买个新的总之先用应急的办法1.电池偏好设置&#xff0c;接通电源时勾选“ 当显示器关闭时&#xff0c;防…

类的加载过程(生命周期)

类的加载过程(生命周期) 一、装载&#xff1a;通过一个类的全限定名获取定义此类的二进制字节流将这个字节流所代表的静态存储结构转化为方法区的运行时数据结构在内存中生成一个代表这个类的java.lang.Class对象&#xff08;将字节码加载到内存中&#xff09;&#xff0c;作为…