【RabbitMQ | 第五篇】RabbitMQ实现消息的可靠抵达

news2025/2/27 21:12:57

在这里插入图片描述

文章目录

  • 5.RabbitMQ实现消息的可靠抵达
    • 5.1引入背景
    • 5.2确认机制分类
      • 5.2.1ConfirmCallback (确认模式:消息生产者确认)
        • (1)开启确认配置
        • (2)实现ConfirmCallback回调接口
      • 5.2.2ReturnCallback(回退模式:交换机确认)
        • (1)开启回退配置
        • (2)实现ReturnCallback回调接口
      • 5.2.3ACK机制(确认模式:消费者确认)
        • (1)消息接收确认模式类型
        • (2)手动确认回复方法
        • (3)basicAck方法
        • (4)basicNack方法
        • (5)basicReject方法
        • (6)开启手动ack机制
        • (7)消费者消费消息并手动确认
    • 5.3总结

5.RabbitMQ实现消息的可靠抵达

5.1引入背景

为保证消息不丢失,可靠抵达,可以使用事务消息,但是性能下降250倍,为此引入确认机制,来实现消息的可靠抵达

5.2确认机制分类

RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认

其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图

image-20240319095647841

  • confirmCallback 确认模式:确认消息是否到达交换机
  • returnCallback退回模式:若消息没有传递给指定队列,就触发这个失败回调
  • ack机制:消费者确认模式
    • CK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
    • 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中

5.2.1ConfirmCallback (确认模式:消息生产者确认)

(1)开启确认配置
#老版本
spring:
	rabbitmq:
		publisher-confirms: true
		
#新版本
spring:
	rabbitmq:
		publisher-confirms-type: correlated
(2)实现ConfirmCallback回调接口
  • ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
@Configuration
public class MyRabbitConfig {
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    //消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
    //确认消息送到交换机(Exchange)回调
    @PostConstruct		//创建MyBabbiConfig对象后,执行该方法
    public void initRabbitTemplate(){
        //确认消息送到队列(Queue)回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
        {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage)
            {
                System.out.println("\n确认消息送到队列(Queue)结果:");
                System.out.println("发生消息:" + returnedMessage.getMessage());
                System.out.println("回应码:" + returnedMessage.getReplyCode());
                System.out.println("回应信息:" + returnedMessage.getReplyText());
                System.out.println("交换机:" + returnedMessage.getExchange());
                System.out.println("路由键:" + returnedMessage.getRoutingKey());
            }
        });
	}
}		

被 broker 接收到只能表示 message 已经到达交换机,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

5.2.2ReturnCallback(回退模式:交换机确认)

  • 过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调
  • 该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。
(1)开启回退配置
spring:
	rabbit:
		#开启发送端消息抵达Queue确认
		publisher-returns: true
		#只要消息不能抵达queue时,该消息不会被丢弃,而是会被返回给生产者:可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
		template:
			mandatory: true
(2)实现ReturnCallback回调接口
@Configuration
public class MyRabbitConfig {
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    @PostConstruct		//创建MyBabbiConfig对象后,执行该方法
    public void initRabbitTemplate(){
        
        //确认消息送到队列(Queue)失败回调:注意是失败
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
        {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage)
            {
                System.out.println("\n确认消息送到队列(Queue)结果:");
                System.out.println("发生消息:" + returnedMessage.getMessage());
                System.out.println("回应码:" + returnedMessage.getReplyCode());
                System.out.println("回应信息:" + returnedMessage.getReplyText());
                System.out.println("交换机:" + returnedMessage.getExchange());
                System.out.println("路由键:" + returnedMessage.getRoutingKey());
            }
        });
        
        //确认消息送到队列(Queue)回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
        {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage)
            {
                System.out.println("\n确认消息送到队列(Queue)结果:");
                System.out.println("发生消息:" + returnedMessage.getMessage());
                System.out.println("回应码:" + returnedMessage.getReplyCode());
                System.out.println("回应信息:" + returnedMessage.getReplyText());
                System.out.println("交换机:" + returnedMessage.getExchange());
                System.out.println("路由键:" + returnedMessage.getRoutingKey());
            }
        });
	}
}

5.2.3ACK机制(确认模式:消费者确认)

  1. 消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理消息,比如重新发送或者丢弃
  2. RabbitMQ 消息确认机制 (ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
(1)消息接收确认模式类型
  1. AcknowledgeMode.NONE:自动确认

    • 默认自动ack,消息被消费者收到(注意:只是收到),就会从broker的queue中移除
    • 存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了
  2. AcknowledgeMode.AUTO:根据情况确认。

  3. AcknowledgeMode.MANUAL:手动确认

    确认过程:就算消费者已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked,消息处理分为以下三种情况:

    1. 消息处理成功ack(),接受下一个消息,此消息broker就会移除
    2. 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
    3. 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

    img

(2)手动确认回复方法
  • 消费者获取到消息,成功处理,可以回复Ack给Broker
    • basic.ack:用于肯定确认broker将移除此消息
    • basic.nack:用于否定确认可以指定broker是否丢弃此消息,可以批量
    • basic.reject:用于否定确认当前消息;同上,但不能批量
(3)basicAck方法

basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

参数说明:

  1. long deliveryTag唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  2. boolean multiple 是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
(4)basicNack方法
  1. basicNack 方法用于否定当前消息
  2. basicReject 方法一次只能拒绝一条消息
  3. 如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

参数说明:

  1. long deliveryTag: 唯一标识 ID。
  2. boolean multiple: 上面已经解释。
  3. boolean requeue: 如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
(5)basicReject方法

basicReject 方法用于明确拒绝当前的消息而不是确认。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

参数说明:

  1. long deliveryTag: 唯一标识 ID。
  2. boolean requeue: 上面已经解释。

测试场景:

  • 发送五个消息测试,
  • 此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked
  • 除非手动确认
(6)开启手动ack机制
spring:
	listener:
		simple:
			acknowledge-mode: manual
(7)消费者消费消息并手动确认
package com.pjb.receiver;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
 
/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
public class Receiver implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
            {
                System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行queue_name中的消息的业务处理流程......");
            }
 
            if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
            {
                System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行fanout.A中的消息的业务处理流程......");
            }
 
            /**
             * 确认消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             */
            channel.basicAck(deliveryTag, true);
 
            /**
             * 否定消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            //channel.basicNack(deliveryTag, true, false);
        }
        catch (Exception e)
        {
            e.printStackTrace();
 
            /**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag, true);
        }
    }
}

5.3总结

RabbitMQ系列第五篇介绍了实现消息的可靠抵达的两大模式:发送者确认、消费者确认;其中发送确认又可以分为消息生产者到交换机的确认(confirmcallback接口:消息到达交换机回调)、交换机到队列的确认(returncallback接口:消息到达不了队列回调);而消费者回调ACK机制可分为自动确认、手动确认、根据情况确认三种类型;自动确认可能会出现消息丢失问题(消息到达消费者后,队列立刻删除该消息,但是此时消费者次此时出现异常或者宕机),手动确认的三个方法(basicAck、basicNack、basicReject)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1528435.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3.x 使用jsplumb进行多列拖拽连线

前言&#xff1a; 最近很多小伙伴问到使用jsplumb进行多列拖拽连线怎么实现&#xff1f; 下面介绍vue3.x 使用jsplumb进行多列拖拽连线示例&#xff0c;以三列举例&#xff1a; 安装 npm install --save jsplumb引入 <script lang"ts" setup>import {ref, r…

#Linux(VIM编辑器使用)

&#xff08;一&#xff09;发行版&#xff1a;Ubuntu16.04.7 &#xff08;二&#xff09;记录&#xff1a; &#xff08;1&#xff09;打开一个文本文件输入vi文件名&#xff08;如果存在该文件则直接打开&#xff0c;如果没有则创建一个文件&#xff09; &#xff08;2&…

【早鸟优惠|高录用|EI稳定检索】2024年虚拟现实、图像和信号处理国际学术会议(ICVISP 2024)诚邀投稿/参会!

【早鸟优惠|高录用|EI稳定检索】 2024年虚拟现实、图像和信号处理国际学术会议&#xff08;ICVISP 2024&#xff09;诚邀投稿/参会&#xff01; # 早鸟优惠 # 先投稿先送审 # #投稿免费参会、口头汇报及海报展示# 2024年虚拟现实、图像和信号处理国际学术会议&#xff08;I…

递推与递归

92. 递归实现指数型枚举 - AcWing题库 #include <bits/stdc.h> using namespace std; const int N17; int n; bool vis[N];//记录某一个数是否出现过 void dfs(int dep){// if(vis[dep])continue;//没有这一句 因为一定不会有已经选过的数if(depn1){//对于每个数都做完了…

C++ QT串口通信(2)-串口通信入门实例

本文通过实例讲解C++ QT串口通信。 入门实例设计一个串口助手,能够很好的涵盖串口要点的使用。 成品图 如下; 实现代码如下: 首先在pro文件中添加串口模块 UI界面如下 <?xml version="1.0" encoding="UTF-8"?> <ui version="4.0&q…

漏洞挖掘 | 记一个奇怪的万能密码

前言 打的站点打多了&#xff0c;什么奇怪的问题都会发生 打点 开局一个登录框 用户枚举到账号爆破 测了一下&#xff0c;没发现admin的弱口令&#xff0c;但是发现存在用户枚举漏洞&#xff0c;因此准备跑一下账号 输入密码为123456 进行账号爆破 成功爆破出账号 是的…

综合知识篇10-计算机网络考点(2024年软考高级系统架构设计师冲刺知识点总结系列文章)

专栏系列文章: 2024高级系统架构设计师备考资料(高频考点&真题&经验)https://blog.csdn.net/seeker1994/category_12593400.html案例分析篇00-【历年案例分析真题考点汇总】与【专栏文章案例分析高频考点目录】(2024年软考高级系统架构设计师冲刺知识点总结-案例…

【读书笔记】 40本脑科学书籍总结出的方法论——《涂鸦启示录》我庆幸没有错过

文章目录 《费曼学习法》《SMART法则》 概述原书内容脉络第一部分 目标管理第二部分 习惯养成第三部分 时间管理第四部分 思维方式第五部分 学习方法第六部分 解决问题 拓展 阅读体验感受评价补充&#xff1a; 总结 《涂鸦启示录》是诸多认知&#xff0c;思维书籍、演讲的大合集…

angularjs 指令实现自定义滚动条

场景&#xff1a;横向商品栏&#xff0c;把原有的滚动条改成自定义的样式&#xff0c;并且给两边加上箭头可以调整&#xff0c;可以拖动商品和滚轮实现滚动条效果。 js appService.directive(customScrollbar, function() {return {restrict: A,transclude: true,scope: {ena…

MATLAB环境下基于离散小波变换和主成分平均的医学图像融合方法

随着计算机技术和生物影像工程的日趋成熟&#xff0c;医学图像为医疗诊断提供的信息越来越丰富。目前&#xff0c;由于医学成像的设备种类繁多&#xff0c;导致医生获得的图像信息差异较大。如何把这些信息进行整合供医生使用成为当务之急。基于此&#xff0c;医学图像融合技术…

php 对接Mintegral汇量海外广告平台收益接口Reporting API

今天对接的是Mintegral广告reporting api接口&#xff0c;拉取广告收益回来自己做统计。记录分享给大家 首先是文档地址,进入到Mintegral后台就能看到文档地址以及参数&#xff1a; 文档地址&#xff1a;https://cdn-adn-https.rayjump.com/cdn-adn/reporting_api/MintegralRA.…

2024你值得拥有,Go语言入门学习线路推荐

“小众”的编程语言的Go语言在今年2月成功挤进TOIBE排行榜前10&#xff0c;3月稳居第8名。从最低时的第122名&#xff0c;到现在第8名&#xff0c;Go 的身影越来越清晰。 其实它早已被广泛应用于云计算、大数据、区块链、微服务、游戏开发等领域&#xff0c;因而也有越来越多的…

相比于 HTTP 协议,WebSocket协议的必要性体现在哪里?

HTTP 协议的一个缺点 从 HTTP 协议的角度来看&#xff0c;就是点一下网页上的某个按钮&#xff0c;前端发一次 HTTP请 求&#xff0c;网站返回一次 HTTP 响应。这种由客户端主动请求&#xff0c;服务器响应的方式也满足大部分网页的功能场景。但是有没有发现&#xff0c;在HTTP…

WiFi7为什么需要6G频谱

从5925MHz到7125MHz&#xff0c;整整1200MHz的频谱&#xff0c;都被分配给了WiFi7。非常得豪&#xff01; 只是国内还没有这个东西。 为什么要这么宽的频谱呢&#xff1f; Intel作过实验&#xff0c;发现在日常的场合下 一定是3个320MHz宽的不重叠信道&#xff0c;方能达到AV/…

小游戏实战-Python实现石头剪刀布+扫雷小游戏

小游戏实战-Python实现石头剪刀布扫雷小游戏 我想说废话止于此石头剪刀布-入门必学游戏游戏规则实现思路示例代码知识要点运行效果 扫雷-内网摸鱼必备游戏游戏规则实现思路示例代码知识要点运行效果 进阶练习-走迷宫&#xff08;预留&#xff09;游戏规则预期效果 总结 我想说 …

算法第三十天-矩阵中移动的最大次数

矩阵中移动的最大次数 题目要求 解题思路 网格图 DFS 从第一列的任一单元格 ( i , 0 ) (i,0) (i,0) 开始递归。枚举往右上/右/右下三个方向走&#xff0c;如果走一步后&#xff0c;没有出界&#xff0c;且格子值大于 g r i d [ i ] [ j ] grid[i][j] grid[i][j]&#xff0c;则…

Java使用itextpdf往pdf中插入图片

引入maven依赖 <dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.9</version> </dependency>java代码 import cn.hutool.extra.qrcode.QrCodeUtil; import com.itextpdf.text.*; i…

华为携手8家企业打造“AI大模型+行业”生态网络 | 百能云芯

据媒体报道&#xff0c;在“大模型行业创新合作计划”签约仪式上&#xff0c;华为云携手循环智能、迪安诊断、零浩网络、云译科技、蓝青教育、航天天目、标普云、乐聚机器人等8家企业&#xff0c;共同开启了一段全新的合作旅程。 这次合作将聚焦于“AI大模型行业”的应用开发&a…

IP代理的认证方式和协议介绍

“IP代理是指使用固定的IP地址作为代理服务器进行代理访问的方式。在网络应用中&#xff0c;IP代理可以为用户提供更加稳定的代理服务&#xff0c;同时也提高了访问网站的安全性。IP代理的认证方式和协议是实现代理服务的重要组成部分。” 一、认证方式 1.用户名和密码认证&am…

RK平台第一次开机速度优化 “Large app, accepted running with swap.“

RK平台第一次开机速度优化 "Large app, accepted running with swap." 问题描述解决方法 郑重声明:本人原创博文&#xff0c;都是实战&#xff0c;均经过实际项目验证出货的 转载请标明出处:攻城狮2015 Platform: Rockchip OS:Android 6.0.1 CPU:3368 Kernel: 3.10 问…