RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

news2024/12/24 11:41:49

前言

消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?

  • 发送时丢失:
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机,queue将消息丢失
  • consumer 接收到消息后未消费就宕机
    在这里插入图片描述
    消息可靠性问题及其对应的解决方案:
场景publisher发送时丢失MQ消息丢失consumer消费问题
解决方案生产者确认机制消息持久化消费者消息确认&&失败重试机制

下面我们先说一下publisher 发送时丢失的问题应该如何处理

生产者确认机制的理论说明

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publish-confirm, 发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publish-return, 发送回执
    • 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因

注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突

在这里插入图片描述

代码实现

下面基于SpringAMQP 实现的生产者确认机制

  1. 在 publisher 服务的 application,yml 中添加以下配置:
spring:
	rabbitmq:
		publisher-confirm-type: correlated # 开启异步回调
		publisher-returns: true
		template:
			mandatory: true

配置说明:

  • publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型:
    • simple: 同步等待 confirm 结果, 直到超时
    • correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
  • publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
  • template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息

ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。

  1. 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
    (这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		// 获取 RabbitTemplate对象
		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
		// 配置 ReturnCallback
		rabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {
			// 记录日志
			log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",
			          replayCode, replayText, exchange, routingKey, message.toString());
			// 如果有需要的话,重发消息

		});
	}}
}	
  1. 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {
	// 1. 准备消息
	String message = "hello, spring amqp!";
	// 2. 准备CorrelationData
	// 2.1 消息id
	CorrelationData correlationData = new
	CorrelationData(UUID.randomUUID().toString());
	// 2.2 准备 ConfirmCallback
	correlationData.getFuture().addCallback(confirm -> {
		// 判断结果
		if(confirm.isAck()){
			// ACK
			log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
		}else {
			// NACK
			log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
			// 重发消息
		 }
		}, throwable -> {
			// 记录日志
			log.error("消息发送失败", throwable);
			// 重发消息
			
	});
	// 3.发送消息
	rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}

总结

SpringAMQP 中处理消息确认的几种情况:

  • publisher-confirm:
    • 消息发送到 exchange, 返回 ack
    • 消息发送失败,没有到达交换机,返回 nack
    • 消息发送过程中出现异常,没有收到回执
  • 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431470.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

聊聊如何运用JAVA注解处理器(APT)

什么是APT APT(Annotation Processing Tool)它是Java编译期注解处理器,它可以让开发人员在编译期对注解进行处理,通过APT可以获取到注解和被注解对象的相关信息,并根据这些信息在编译期按我们的需求生成java代码模板或…

基于DistFlow的含分布式电源配电网优化模型【IEEE39节点】(Python代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

SpringBoot【基础篇】---- SSMP整合综合案例

SpringBoot【基础篇】---- SSMP整合综合案例1. 模块创建2. 实体类开发3. 数据层开发----基于CRUD查看MP运行日志查看 MP 的运行日志4. 数据层开发----分页功能制作5. 数据层开发----条件查询功能制作6. 业务层开发业务层快速开发7. 表现层开发8. 表现层消息一致性处理9. 前后端…

STC32G单片机内置ADC及应用编程

一 STC32G单片机内置ADC模块简介 STC32G单片机内部集成了一个12位高速ADC转换器,ADC的最高时钟频率为系统频率的1/2。其输入通道多达15个(第15通道为专门测量内部1.19V参考信号源的通道),可分时切换使用。 STC15系列单片机内置AD…

AES加密

来源:链接: b站up主可厉害的土豆 (据评论区说图片中有计算错误,但是过程是对的。只是了解过程问题不大,专门研究这一块的话自己可以看视频算一下) 准备 首先,明文是固定长度 16字节 128位。 密钥长度可以…

C++语法(18)---- set和map

C语法(17)---- 二叉搜索树_哈里沃克的博客-CSDN博客https://blog.csdn.net/m0_63488627/article/details/130174864 目录 1.set的介绍 1.set使用 1.基本结构 2.insert 3.erase 4.find 5.count 2.multiset 1.count 2.find 2.map的介绍 1.map …

zookeeper + kafka集群搭建详解

目录 1.消息队列介绍 1.为什么需要消息队列 (MO) 2.使用消息队列的好处 3.消息队列的两种模式 2.Kafka相关介绍 1.Kafka定义 2.Kafka简介 3. Kafka的特性 3.Kafka系统架构 1. Broker(服务器) 2. Topic(一个队…

GaussDB数据库存储过程介绍

文章目录一、前言二、GaussDB中的定义三、存储过程的使用场景四、存储过程的使用优缺点五、存储过程的示例及示例解析1、GaussDB存储过程语法格式2、GaussDB存储过程语法示例3、存储过程的调用方法七、总结一、前言 华为云数据库GaussDB是一款高性能、高安全性的云原生数据库&…

链表基础知识

1.链表必知必会 什么是链表? 链表是一种通过指针串联在一起的线性结构,每一个节点由两部分组成,一个是数据域一个是指针域(存放指向下一个节点的指针),最后一个节点的指针域指向null(空指针的意思&#…

23北京邮电大学备考经验

目录【写在前面】本科成绩择校历程英语复习数学复习政治复习专业课复习其它建议笔记复盘压力处理恋爱关系【写在最后】【写在前面】 初试成绩: 本科成绩 总体:浙江某双非学校的软件工程专业、综合测评成绩班级前两名、浙江省省级优秀毕业生、发表过论…

【Node】Node.js 资源汇总推荐

【导读】:Node.js 是一个开源、跨平台的,用于编写服务器和命令行的 JavaScript 运行时工具。awesome-nodejs 是sindresorhus发起维护的 Node.js 资源列表,内容包括:命令行工具、日志、调试、HTTP、构建工具、文件系统、模板、Web …

Elasticjob(2.1.4) failover 、misfire及执行线程池分析

Failover 当设置failover为true时候,elasticjob 集群通过zookeeper 的event watcher 监听是否有instance 丢失,然后对丢失instance 对应的分片进行立即执行。重复一下,failover是立即执行,不是按crontab时间来触发,这…

基于RDF本体模型和图数据库实现知识查询与推理

基于RDF本体模型和图数据库实现知识查询与推理 基于RDF本体模型和图数据库实现知识查询与推理一、案例本体模型解释二、数据构建与查询 Here’s the table of contents: 基于RDF本体模型和图数据库实现知识查询与推理 本文主要使用ONgDB图数据库和Neosemantics组件,…

自建个人音乐播放器Navidrome - 内网穿透实现在外随时访问

文章目录 1. 前言2. Navidrome网站搭建2.1 Navidrome下载和安装2.1.1 安装并添加ffmpeg2.1.2下载并配置Navidrome2.1.3 添加Navidrome到系统服务 2.2. Navidrome网页测试 3. 本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4. 公网访问测试5. 结语 转…

【Android实战开发】flutter实现网络请求的方法示例

Flutter网络请求使用的是Dio。Dio是一个强大易用的dart http请求库,支持Restful API、FormData、拦截器、请求取消、Cookie管理、文件上传/下载……. Flutter json数据解析是使用了json_serializable package包。它是一个自动化源代码生成器,可以为我们…

C++快速幂详解例题

基本概念 什么是快速幂呢?个人理解,就是更快速的计算幂运算。 比如计算a^b 刚学这个算法的时候我也很疑惑,幂运算不是有现成的公式么,直接pow(a,b)不就好了吗? 后来才明白,pow(a,b)的…

三分钟了解什么是时序数据库

在介绍时序数据库之前,我们先来看看什么是时序数据。时序数据就是基于时间排序的数据,再通过时间坐标将这些数据连接起来,形成一个折线图,直观地展示一个指标在过去一段时间内的走势和规律,帮助定位数据异常点。 时序…

Oracle中Archived redolog的生成

目录 一、问题预览 二、问题解答 一、问题预览 大家都知道 Oracle 中 online redolog切换后会生成 archived redolog,心里默认的就是 online redolog 切换后 archived redolog 已经生成。切换示意图,如下图所示。 但事实真的是这样吗? 二、…

C++ 23 实用工具(一)

C 23 实用工具(一) 工具函数是非常有价值的工具。它们不仅可以用于特定的领域,还可以应用于任意值和函数,甚至可以创建新的函数并将它们绑定到变量上。 常用函数 你可以使用各种变体的 min、max 和 minmax 函数来对值和初始化列…

【使用ChatGPT自动化】批量转换.xls文件为.xlsx文件

第1次提问: 我:我想使用Python批量转换.xls文件为.xlsx文件,请你提供代码 它: 当涉及到批量处理文件时,我们通常需要使用Python中的os模块和glob模块。os模块用于管理文件和目录,glob模块用于匹配文件路径名…