基于springboot实现的rabbitmq消息确认

news2025/1/19 11:14:53

概述

RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

详细

一、运行效果

image.png

二、实现过程

①、引入rabbitmq包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

②、修改application.properties配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

③、定义exchange和queue,并将queue绑定在exchange上

package com.mm.springbootrabbitmqconfirmdemo.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean(name = "confirmQueue")
    public Queue confirmQueue(){
        return  new Queue("confirmQueue",true,false,false);
    }
 
    @Bean(name = "confirmExchange")
    public FanoutExchange confirmExchange(){
        return new FanoutExchange("confirmExchange");
    }
 
    @Bean
    public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange,
                                                 @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange);
    }
 
}

④、消息发送确认

发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。

消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。

我们可以利用这两个Callback来确保消息的100%送达。

1、 ConfirmCallback确认模式

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

package com.mm.springbootrabbitmqconfirmdemo.service;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
 
    }
 
 
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。

  • ack:消息投递到broker 的状态,true表示成功。

  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

com.mm.springbootrabbitmqconfirmdemo.service;

lombok.extern.slf4j.;
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.stereotype.;

ReturnCallbackService  RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 Confirm 和 Return 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id 为10000000000

⑤、消息发送确认

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
     
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("小富收到消息:{}", msg);
  
            //TODO 具体业务
             
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        }  catch (Exception e) {
             
            if (message.getMessageProperties().getRedelivered()) {
                 
                log.error("消息已重复处理失败,拒绝再次接收...");
                 
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                 
                log.error("消息即将再次返回队列处理...");
                 
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

三、项目结构图

image.png

四、补充

1、别忘确认消息

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。

2、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("消费者 2 号收到:{}", msg);
  
            int a = 1 / 0;
  
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        } catch (Exception e) {
  
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis将消息持久化,通过再消息中的唯一性属性校验。

可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:

  • 消息生产者 - > rabbitmq服务器(消息发送失败)

  • rabbitmq服务器自身故障导致消息丢失

  • 消息消费者 - > rabbitmq服务(消费消息失败)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【APUE】标准I/O库

目录 1、简介 2、FILE对象 3、打开和关闭文件 3.1 fopen 3.2 fclose 4、输入输出流 4.1 fgetc 4.2 fputc 4.3 fgets 4.4 fputs 4.5 fread 4.6 fwrite 4.7 printf 族函数 4.8 scanf 族函数 5、文件指针操作 5.1 fseek 5.2 ftell 5.3 rewind 6、缓冲相关 6.…

安装samba服务器

1.实验目的 &#xff08;1&#xff09;了解SMB和NETBIOS的基本原理 &#xff08;2&#xff09;掌握Windows和Linux之间&#xff0c;Linux系统之间文件共享的基本方法。 2.实验内容 &#xff08;1&#xff09;安装samba服务器。 &#xff08;2&#xff09;配置samba服务器的…

Visual Studio 线性表的链式存储节点输出引发异常:读取访问权限冲突

问题&#xff1a; 写了一个线性表的链式存储想要输出&#xff0c;能够输出&#xff0c;但是会报错&#xff1a;读取访问权限冲突 分析&#xff1a; 当我们输出到最后倒数第二个节点时&#xff0c;p指向倒数第二个节点并输出&#xff1b; 下一轮循环&#xff1a;p指向倒数第二…

Helm Kubernetes Offline Deploy Rancher v2.7.5 Demo (helm 离线部署 rancher 实践)

文章目录 1. 简介2. 预备条件3. 选择 SSL 配置4. 离线安装的 Helm Chart 选项5. 下载介质6. 生成证书7. 镜像入库8. 安装 rancher9. 配置 nodeport10. 配置 ingress11. 界面访问11.1 首页预览11.2 查看集群信息11.3 查看项目空间11.4 查看节点信息 1. 简介 Rancher 是一个开源…

17-数据结构-查找-(顺序、折半、分块)

简介&#xff1a;查找&#xff0c;顾名思义&#xff0c;是我们处理数据时常用的操作之一。大概就是我们从表格中去搜索我们想要的东西&#xff0c;这个表格&#xff0c;就是所谓的查找表&#xff08;存储数据的表&#xff09;。而我们怎么设计查找&#xff0c;才可以让计算机更…

lv4 嵌入式开发-4 标准IO的读写(二进制方式)

目录 1 标准I/O – 按对象读写 2 标准I/O – 小结 3 标准I/O – 思考和练习 文本文件和二进制的区别&#xff1a; 存储的格式不同&#xff1a;文本文件只能存储文本。除了文本都是二进制文件。 补充计算机内码概念&#xff1a;文本符号在计算机内部的编码&#xff08;计算…

2023/09/10

文章目录 1. 使用Vue单页面全局变量注意事项2. 伪元素和伪类3. Vue3中定义数组通常使用ref4. Vue Router的 $router 和 $route5. Vue路由中的query和params的区别6. vue3defineExpose({})属性不能重命名&#xff0c;方法可以重命名7. 显卡共享内存的原理8. deltaY9. 快速生成方…

电池2RC模型 + 开路电压法 + 安时积分 + 电池精度测试 + HPPC脉冲

电池2RC模型 电池2RC模型是一种等效电路模型&#xff0c;用于描述电池的动态特性。该模型将电池视为一个理想电容器和一个理想电阻的并联&#xff0c;其中理想电容器代表电池的化学反应&#xff0c;理想电阻代表电池的内阻。该模型适用于描述电池的充电和放电过程。 开路电压…

Java中如何判断字符串输入[hello,world]是不是字符串数组参数

Java中如何判断字符串输入[hello,world]是不是字符串数组参数&#xff1f; 在Java中&#xff0c;可以使用正则表达式来判断一个字符串是否符合字符串数组的参数格式。你可以使用matches()方法和对应的正则表达式进行判断。 以下是一个示例代码&#xff1a; public static bo…

SpringCloudGateway网关实战(二)

SpringCloudGateway网关实战&#xff08;二&#xff09; 本文我们在前文的基础上&#xff0c;开始讲gateway过滤器的部分内容。gateway的过滤器分为内置过滤器Filter和全局过滤器GlobalFilter。本章节先讲内置过滤器Filter。 需要先声明一下内置过滤器和全局过滤器之间的区别…

mysql文档--innodb中的重头戏--事务隔离级别!!!!--举例学习--现象演示

阿丹&#xff1a; 先要说明一点就是在网上现在查找的mysql中的事务隔离级别其实都是在innodb中的事务隔离级别。因为在mysql的5.5.5版本后myisam被innodb打败&#xff0c;从此innodb成为了mysql中的默认存储引擎。所以在网上查找的事务隔离级别基本上都是innodb的。并且支持事务…

JavaScript基础10——获取数据类型、类型转换

哈喽&#xff0c;大家好&#xff0c;我是雷工。 现如今知识大爆炸&#xff0c;到处都有海量的知识&#xff0c;常常见了就收藏&#xff0c;把网盘塞得满满的&#xff0c;却从来没有看过。 收藏起来装到网盘里并没有什么软用&#xff0c;要把知识装到脑袋里才行。 从几分钟开始&…

【LeetCode-中等题】26. 删除有序数组中的重复项

文章目录 题目方法一&#xff1a;快慢指针 题目 方法一&#xff1a;快慢指针 class Solution { //快慢指针public int removeDuplicates(int[] nums) {int fast 1;int slow 0;while(fast < nums.length){if(nums[fast] nums[fast-1]) fast;//若当前元素和之前元素相同 则…

华为OD机试 - 战场索敌 - 深度优先搜索dfs算法(Java 2023 B卷 100分)

目录 一、题目描述二、输入描述三、输出描述四、深度优先搜索dfs五、解题思路六、Java算法源码七、效果展示1、输入2、输出3、说明4、如果增加目标敌人数量K为55、来&#xff0c;上强度 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 一、题目描述 有一个大小是N*M…

ros_launch配置

创建launch文件批运行节点和包 将以下四条指令写入launch文件中 获取软件包的完整指令 rospackpkg&#xff1a;软件包名字 type&#xff1a;节点名字 四条指令转化成launch文件

无人机航线规划

无人机航线规划&#xff0c;对于无人机的任务执行有着至关重要的作用&#xff0c;无人机在从起点飞向目的点的过程中&#xff0c;如何规划出一条安全路径&#xff0c;并且保证该路径代价最优&#xff0c;是无人机航线规划的主要目的。其中路径最优的含义是&#xff0c;在无人机…

大数据-玩转数据-Flink 容错机制

一、概述 在分布式架构中&#xff0c;当某个节点出现故障&#xff0c;其他节点基本不受影响。在 Flink 中&#xff0c;有一套完整的容错机制&#xff0c;最重要就是检查点&#xff08;checkpoint&#xff09;。 二、检查点&#xff08;Checkpoint&#xff09; 在流处理中&am…

初识Nacos

前言 Nacos是一个用于微服务架构下的服务发现和配置管理以及服务管理的综合解决方案&#xff08;官网介绍&#xff09;&#xff0c;这里的服务发现其实就是注册中心&#xff0c;配置管理就是配置中心&#xff0c;而服务管理是二者的综合&#xff1b; Nacos特性 1.服务发现与…

《Go语言在微服务中的崛起:为什么Go是下一个后端之星?》

&#x1f337;&#x1f341; 博主猫头虎&#x1f405;&#x1f43e; 带您进入 Golang 语言的新世界✨✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文并茂&#x1f…

【JavaEE】_CSS常用属性值

目录 1. 字体属性 1.1 设置字体家族 font-family 1.2 设置字体大小 font-size 1.3 设置字体粗细 font-weight 1.4 设置字体倾斜 font-style 2. 文本属性 2.1 设置文本颜色 color 2.2 文本对齐 text-align 2.3 文本装饰 text-decoration 2.4 文本缩进 text-indent 2.…