深入浅出消息队列----【如何保证消息不丢失?】

news2025/1/22 1:50:55

深入浅出消息队列----【如何保证消息不丢失?】

  • 消息流转链路
  • 生成且发送消息流程
  • 存储流程
  • 消费流程

本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】

消息流转链路

请添加图片描述

消息从生产者产生发送至 Broker,Broker 存储消息等待消费者来拉取,消费者从 Broker 拉取消息。

所以,如果要保证消息不丢失,那么从整个链路来看需要保证三大流程中消息都不丢失,缺一不可。

  1. 发送
  2. 存储
  3. 消费

生产者需要保证消息一定被完整的发送并存储至 Broker 中。

Broker 需要保证已经存储的消息不会丢失,比如 Broker 重启、宕机后还存在。

消费者需要保证拉取的消息一定被消费,比如消费一半重启了,需要确保还未消费的消息后续也能被消费。

生成且发送消息流程

消息从生产者产生,而且消息的生成往往伴随着某个业务,比如下单就加积分这个业务场景。

我们会在代码里先保存订单,然后发送消息让积分系统给对应的用户加积分:

public boolean addOrder(xx) {
	// do sth
	saveOrder();	// 保存订单
	sendMessage();	// 发送加积分消息
}

我们需要确保订单保存成功,积分消息一定要发送成功,不然消息就丢了。

这里就涉及了请求确认机制,即 ack,学过 TCP 协议应该都会知道 ack。

简单来说就是生产者与 Broker 交互通过 ack 来确认消息的成功接收。

请添加图片描述

当生产者发送消息给 Broker 后,如果 Broker 接收到这条消息,那么就返回 ack 给生产者,一旦生产者收到了 ack,那么就知道消息已经被 Broker 成功接收了,因此保证发送阶段消息不会丢失。

如果 Broker 没有返回 ack,那么咋办?

不管是网络原因还是什么原因,只要这时候生产者超时等待没收到 ack,那么就需要重试。

当然不可能无限重试,这样就会阻塞后续的业务流程,像 RocketMQ 默认重试 3 次失败后就会返回错误或直接抛出异常,这时需要人工介入处理。

因此,在使用的时候需要关注发送结果或者异常情况。

public boolean addOrder(xx) {
	// do sth
	saveOrder();	// 保存订单
	try {
		SendResult senResult = senMessage();	// 发送加积分消息
		if (sendResult...) {
			recordSend();	// 记录失败消息
		}
	} catch (Exception e) {
		log.error("send msg error");
		recordSend();	// 记录失败消息
	}
}

这种情况下消息发送异常,我们不能影响正常的下单流程,因此记录下错误日志,然后把发送积分的消息保存到数据库中,后续再通过定时任务来补偿这些没有加上的积分。

很多同学的处理可能就是直接抛错,但是抛错意味着 addOrder 这个方法报错了,那么用户下单就报错了,体验就很差,这里需要注意:不能让非主流程的功能影响主流程的功能,下单是主流程,加积分是非主流程。

同步发送可以通过 try-catch 来捕获异常,如果是异步发送,需要记得处理 onException 的逻辑:

// 异步发送消息,发送结果通过 callback 返回给客户端
producer.send(msg, new SendCallback() {
	@Override
	public void onSuccess(SendResult sendResult) {
		// do sth
	}
	@Override
	public void onException(Throwable e) {
		// 记得处理发送失败场景
		log.error("send msg error");	
		recordSend();
	}
});

小结

在生产阶段,利用请求确认机制,保证消息发送成功,如果没收到 ack 那么需要重试,如果重试失败,那么看场景分析。

可以直接将整个业务方法失败使得流程报错,这样业务没处理成功自然也不存在消息的丢失(好比下单没下成功,自然不需要发送积分)。

可以让业务的处理都正常,然后通过落库等其它手段保存这个消息,后续利用定时任务在尝试发送或其它手段补偿,来确保消息不丢失。

存储流程

当 Broker 返回 ack 给生产者之后,生产者认为消息已经被成功存储至 Broker,因此后续不会再发送这个条消息。

因此 Broker 返回 ack 给生产者之前需要确保消息真的已经被成功存储了

RocketMQ 的消息默认是异步刷盘,也就是消息先写入到 pageCache 中,即文件系统的缓存,然后等待操作系统或定时刷盘任务再将消息刷到磁盘上。

也就是默认情况下,当消息写入到缓存中,Broker 就给生产者返回 ack 了

如果 Broker 正常运行肯定是没问题的,但突然断电,那么 pageCache 里还未刷盘的消息就没了,此时消息就真的丢了!

生产者就懵逼了,明明说好的 ack,怎么消息说没就没了呢?

所以如果一定要确保消息不被丢失,那么需要将 Broker 的刷盘配置改成同步刷盘,即 flushDiskType 配置为 SYNC_FLUSH。

请添加图片描述

这样生产者接到 ack 的消息都是已经被刷到磁盘文件上的,断电了也不影响。

那么磁盘损坏了呢

默认情况下如果磁盘损坏了确实消息也就丢了,不过现在有很多磁盘阵列可以保证磁盘上内容的可靠性,简单理解就是备份,这里损坏了还能从另一个地方读。

还有其实生产上 Broker 可能是有很多台组成集群使用的。

在保证消息不丢失的场景,需要设置同步复制,这样当 master 挂了需要 salve 顶上的时候,才能保证 salve 的消息是全的。

小结

单台 Broker 需要保证同步刷盘,还有磁盘阵列来保证消息不会丢失。不过这里需要注意,同步刷盘的性能会差一些。

集群架构需要保证同步复制,这样当 salve 被消费的时候提供的消息才是全的。

消费流程

新手在实现消费流程的时候,很容易丢失消息。

消息是有点位提交的,consumer 需要上报给 Broker 已经消费到的设置,这样假设 consumer 重启后,也可以从 Broker 获取之前的消费位置,然后往后消费消息。

因此需要保证,上报给 Broker 的点位必须是已经被消费过的消息。

具体例子,消费者直接从 Broker 拉取了 50 条消息,分别是从 1-50,当消息到达消费者内存后,消费者直接将消息都提交到线程池中进行异步处理,然后直接返回消费成功(消费成功本质就是点位提交,RocketMQ pushConsumer 封装了这个功能)。

consumer.registerMessageListener(new MessageListenerConcurrently() {
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
		// 将消息提交到线程池
		executorPool.execute(xxxx);
		// 返回消息消费状态,为消费成功
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
}); 

这样实现的话,消费者客户端会认为这些消息已经被消费了,然后给 Broker 提交了点位,已经消费到 50 了,下一次拉取从 51 开始。

此时,假设消费者程序宕机了,意外重启了,线程池的任务队列可是内存队列,那么线程池里面还未消费的消息是不是就没了?

但重启后消费者从 Broker 拿到的消费点位已经从 51 开始了,前面 50 条认为已经消费完了,所以之前那些在线程池任务队列排队的还未被消费的消息就丢失了。

所以需要确保:只有在对应消息的业务流程处理完毕后,再给 Broker 返回消费确认,提交点位

请添加图片描述

小结

消费者消费消息的时候,只有当对应的业务都处理完了,再返回消息的正确消费即消息点位的提交。

注意 MessageListener 里面的一部处理,容易丢失消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2062163.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

斯坦福大学研究人员,推荐的课题申报AI提示词分享

我是娜姐 迪娜学姐 ,一个SCI医学期刊编辑,探索用AI工具提效论文写作和发表。 斯坦福大学心血管医学部研究人员创建了一个 GitHub 仓库,以汇集和整理使用AI帮助科研人员撰写更具竞争力的基金申请书。 以下是该Github仓库分享的好用的基金文本撰…

碰撞检测 | 基于ROS Rviz插件的多边形碰撞检测仿真平台

目录 0 专栏介绍1 基于多边形的碰撞检测2 碰撞检测仿真平台搭建2.1 多边形实例2.2 外部服务接口2.3 Rviz插件化 3 案例演示3.1 功能介绍3.2 绘制多边形 0 专栏介绍 🔥课设、毕设、创新竞赛必备!🔥本专栏涉及更高阶的运动规划算法轨迹优化实战…

Ryzen 9000X3D还没来,先等来了R5 7600X3D

原文转载修改自(更多互联网新闻/搞机小知识): R5 7600X3D或于下月初推出,6核12线程102MB缓存 昨天我们刚刚聊过Ryzen 9000在欧洲部分地区开售即打折的“骚操作”,上周我们也曾分享过R9 9900X和R9 9950X首周在德不过50…

智慧水务平台主要帮助水司解决哪些问题,是如何解决的?

在快速发展的现代社会,水资源管理与服务面临着前所未有的挑战。传统水务管理模式已难以满足日益增长的需求与高标准的服务要求。正是基于此,我们隆重推出“智慧水务平台”,专为水司量身定制,一键式解决运营中的各类难题&#xff0…

easy click安卓版纯本地离线文字识别插件

目的 easy click是一款可以模拟鼠标和键盘操作的自动化工具。它可以帮助用户自动完成一些重复的、繁琐的任务,节省大量人工操作的时间。easy click也包含图色功能,识别屏幕上的图像,根据图像的变化自动执行相应的操作。本篇文章主要讲解下更优…

EasyExcel_通过模板导出(多sheet、列表、图片)

文章目录 前言一、EasyExcel是什么?二、模板样式调整三、使用步骤1.引入jar包2.方法示例2.1 Controller:2.2 Service:2.3 ServiceImpl:2.4 合并策略:2.5 对应DTO 总结 前言 产品今天提了个需求,大概是这样的&#xff…

中秋节月饼销售利用106短信群发平台业绩翻倍案例分析

在中秋节这一传统佳节,月饼作为节日的标志性食品,其销售市场竞争尤为激烈。为了在众多品牌中脱颖而出,不少月饼销售企业开始探索创新的营销方式。其中,利用106短信群发平台进行精准营销,成为众多企业实现业绩翻倍的有效…

C\C++ Sqlite3使用详解

C\C++ Sqlite3使用详解 一、源码下载二、sqlite3接口说明C++2.1 项目创建以及sqlite3使用2.1 连接数据库2.2 sqlite创建表2.2.1 示例代码2.2.2 注意事项2.3 sqlite插入数据2.3.1 示例代码2.3.2 注意事项2.4 sqlite数据删除2.5 sqlite数据查询一、源码下载 下载地址: https://…

思科设备静态路由实验

拓扑及需求 网络拓扑及 IP 编址如图所示;PC1 及 PC2 使用路由器模拟;在 R1、R2、R3 上配置静态路由,保证全网可达;在 R1、R3 上删掉上一步配置的静态路由,改用默认路由,仍然要求全网可达。 各设备具体配置…

UE5.4 - 内容浏览器

目录 一. 简介 二. 打开方式 1.顶部菜单栏打开 2.工具栏创建 3.底部工具栏按钮 三.界面详细介绍 1.导航栏 2.源面板 3.集合 4.筛选器 ​编辑 5.搜索栏 6.资产视图 7.设置按钮 四. 开发者内容 一. 简介 一种你可以用于查看、管理和处理项目中所有资产的工具。 二…

java 变量 基础类型及其转换

为什么需要变量 一个程序就是一个世界 变量是程序的基本组成单位 //不论是使用那种高级程序语言编写程序,变量都是其程序的基本组成单位,比如 //变量有三个基本要素(类型名称值) class Test{public static void main(String[] args) {//定义了一个变量,类型int整型,名称a,值…

哈工大 | 乐聚人形机器人 | 最新演讲

笔者是清华在读研究生,主要关注人形机器人、具身智能。将持续分享行业前沿动态、学者观点整理、论文阅读笔记、知识学习路线等。欢迎交流 最近听了乐聚的最新进展演讲,以下是学习整理。部分图截自直播,若模糊望见谅 基本信息: 【…

光影漫游者:创新球形设计,重新定义移动空间—轻空间

在现代城市中,空间的灵活性和视觉吸引力变得越来越重要。为满足多样化的需求,“光影漫游者”以其独特的球形设计和引人注目的视觉效果,成为了移动空间解决方案的新标杆。无论是商业活动、文化展览,还是沉浸式体验,“光…

文档翻译软件哪个好?这5款文档翻译器还不错

相信大家都有过看着大量外文文档资料而无从下手的经历,曾几何时,我也和大家一样深深困于这种烦恼之中。 好在后来,被我发现了5款趁手的文档翻译器,这才从根本上解决了不少麻烦~今天借此机会也将它们一并整理出来分享给大家&#…

仪器校准周期建议多长时间一次?仪器无校准后果怎么样?

自从国内建立计量以来,仪器计量校准就一直是企业定期进行的一种设备维护工作。定期进行校准已经是企业墨守成规的习惯,但是对于仪器校准周期是多久,具体多久校准一次,大家却不是很清楚,那么仪器校准周期建议多长时间一…

以简单的例子从头开始建spring boot web多模块项目(五)-thymeleaf引擎

继续向里面加,这次是引入thymeleaf渲染引擎。 使用这个引擎的很多,主要是以下几个优点: Thymeleaf是适用于Web和独立环境的现代服务器端Java模板引擎。Thymeleaf的主要目标是为您的开发工作流程带来优雅的自然模板 -HTML可以在浏览器中正确显…

华普微邀您共聚 2024 elexcon 深圳国际电子展!

elexcon2024深圳国际电子展将于2024年8月27日至29日在深圳会展中心(福田)开幕。汇聚全球优质品牌厂商齐聚现场,打造电子全产业链创新展示、一站式采购及技术交流平台。集中展示集成电路、嵌入式系统、电源管理/功率器件、电子元件与供应链、O…

气膜馆:亲子乐园中的新兴娱乐空间—轻空间

在亲子乐园中,气膜馆作为一种新兴的娱乐空间,凭借其独特的设计和灵活的功能,成为了孩子和家长们的理想去处。这个轻盈而充满趣味的空间,不仅让孩子们尽情玩耍,也为家长提供了舒适的陪伴环境。 全天候的舒适体验 气膜馆…

AOP+ 自定义注解 +SpringElExpress自研缓存组件

AOP 自定义注解 SpringElExpress自研缓存组件 背景前置知识改造代码 背景 思考下这段代码,想想项目中是不是到处存在 先查缓存,缓存里面有,直接返回;缓存没有,查数据库,并更新到缓存 思考:如何…

你遇到过哪些触发NPE的代码场景?

你遇到过哪些触发NPE的代码场景? NPE如何处理NPE 在Java编程实践中,空指针异常(NPE)是开发过程中常见的障碍,它不仅阻碍了代码的正常运行,还常常成为系统不稳定性的根源。那么如何识别那些潜藏于代码深处的…