如何确保消息不会丢失

news2024/12/28 19:00:25
本篇文章大家还可以通过浏览我的博客阅读。如何确保消息不会丢失 - 胤凯 (oyto.github.io)


很多人刚开始接触消息队列的时候,最经常遇到的一个问题就是丢消息了。<!--more-->对于大部分业务来说,丢消息意味着丢数据,是完全无法接受的。

现在很多主流的消息队列都实现了完善的消息可靠性保证机制,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

所以,绝大部分丢消息的原因都是开发者不熟悉消息队列,没有正确地使用和配置造成的。下面我们一起来了解下消息队列是如何保证消息可靠传递的,只要熟知原理,就能很快知道如何配置消息队列,写出可靠的代码,避免消息丢失。

检查消息队列的方法

用消息队列最尴尬的情况不是丢消息,而是丢了消息还不知道。对于一个刚刚上线的系统,各方面肯定都不是很稳定,这个时候就特别许需要监控系统中是否有消息丢失的情况。

如果是对于一些基础设施比较完善的公司,可以使用分布式链路追踪系统,很方便地追踪每一条消息。如果没有的话,下面提供一种简答的方法,来检查是否有消息丢失的情况。

我们可以使用消息队列的有序性来验证是否有消息丢失。原理很简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序列号,在 Consumer 端来检查这个序列号的连续性。

如果没有消息队列,Consumer 端收到消息的序列号必然是递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 + 1.如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号确定丢失的是哪条消息,方便进一步排查原因和补救。

大多数消息队列的客户端都支持拦截器,可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。这样实现的好处是:消息检测的代码不会入侵到业务代码中,待系统稳定后,也方便将这部分代码检测的逻辑关闭或者删除。

如果在一个分布式系统中实现了这个检测机制,有以下几个问题需要注意:

  1. 像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的 ,只能保证队列或分区上的消息是有序的,所以我们在发送消息的时候要指定分区,并且每个分区单独检测消息序号的连续性。

  2. 如果系统中 Producer 是多实例的,由于多个 Producer 并不好协调彼此之间的发送顺序,所以每个 Producer 分别生成各自的消息序号,并且需要附加 Producer 标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

  3. Consumer 实例的数量最好是和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便在 Consumer 内检测消息序号的连续性。

确保信息可靠传递

上面讲述了如何检测消息丢失,下面再来看看什么时候会发生消息丢失,该如何避免。

消息从生产到消费完成整个过程,可以分为下面三个阶段:

  • 生产阶段:在这个阶段,消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

1、生产阶段

在生产阶段,消息队列通过最常用的请求确认机制来保证消息的可靠传递:代码中调用发送消息的方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到。客户端收到响应后,一次正常的消息发送就完成了。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到确认响应后,会自动重试,如果还是失败,就会以返回值或者异常的方式告知调用方。

在编写发送消息代码时,通过正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失了。以 Kafka 为例,我们看一下如何可靠地发送消息:

同步发送时,只要注意捕获异常即可。

partition, offset, err := producer.SendMessage(message)
if err != nil {
    fmt.Println("消息发送失败:", err)
} else {
    fmt.Printf("消息发送成功,分区:%d, 偏移:%d\n", partition, offset)
}

异步发送时,需要去异步检查返回值,并进行处理:

producer.Input() <- message
​
select {
case <-producer.Successes():
    fmt.Println("消息发送成功")
case err := <-producer.Errors():
    fmt.Println("消息发送失败:", err.Err)
}
2、存储阶段

在存储阶段,只要 Borker 不出现故障,比如进程死掉了或者服务器宕机了,就不会出现丢失消息的问题。但如果出现了的话, 还是可能会丢失消息的。

如果对于消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢失消息

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后将消息写入磁盘,再给 Producer 返回确认响应。这样即使发生宕机,由于消息已经写入磁盘,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果 Borker 是由多个节点组成的集群,需要将 Borker 集群配置成:至少消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会造成消息的丢失。

3、消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递。客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消息的消费确认响应,下次拉消息的时候还是会返回同一条消息,以此来确保消息不会在网络传输过程中丢失,也不会因为客户端执行消费逻辑中出错导致丢失。

在编写代码的过程中,需要注意,不要在收到消息后立马返回消息确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

同样,我们使用 golang 语言消费 RabbitMQ 消息为例,看看如何实现一段可靠的消费代码:

forever := make(chan bool)
​
go func() {
    for d := range msgs {
        body := d.Body
        fmt.Printf(" [x] 收到消息 %s\n", body)
​
        // 在这里处理收到的消息
        // 你可以在这里调用 database.save(body) 来保存消息
​
        fmt.Println(" [x] 消费完成")
​
        // 完成消费业务逻辑后发送消费确认响应
        d.Ack(false)
    }
}()
​
log.Printf("等待消息。要退出,请按 CTRL+C")
<-forever

正确的顺序时,先把消息保存到数据库中,然后再发送消费确认。这样如果保存消息失败了,就不会执行消费代码,下次拉取的还是这条消息,直到消费成功。

小结

这一篇文章,先讲述了在系统中,如果检查消息队列消息丢失的情况,然后分析了一条消息从发送到消费成功的整个过程,以及消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。

  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。

  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

知道这几个阶段的原理后,如果再出现丢消息的情况,可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步分析,快速找到问题的原因。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1221341.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

骨传导耳机的优缺点是什么?有什么值得入手的骨传导耳机吗?

骨传导耳机的优点还是挺多的&#xff0c;比如说&#xff1a;佩戴舒适、避免听力损伤、使用更安全灯&#xff0c;在详细了解骨传导耳机有什么优点和缺点之前&#xff0c;先来认识一下什么是骨传导耳机。 骨传导耳机是一种通过人体骨骼来传递声音的耳机&#xff0c;与传统的耳机相…

23111710[含文档+PPT+源码等]计算机毕业设计基于SpringBoot的体育馆场地预约赛事管理系统的设计

文章目录 **软件开发环境及开发工具&#xff1a;****功能介绍&#xff1a;****论文截图&#xff1a;****数据库&#xff1a;****实现&#xff1a;****代码片段&#xff1a;** 编程技术交流、源码分享、模板分享、网课教程 &#x1f427;裙&#xff1a;776871563 软件开发环境及…

嵌入式酒精壁炉:时尚生活的新宠

在这个注重品质与生活方式的时代&#xff0c;我们对于家居生活的要求早已不仅仅停留在实用性上。越来越多的人希望能够在家中营造一种时尚、温馨的氛围&#xff0c;而酒精壁炉恰好成为了这个潮流生活的代表。 如今&#xff0c;品质生活已经成为时尚的代名词。酒精壁炉以其精湛的…

图像分类系列(二) VGGNet学习详细记录

经典神经网络论文超详细解读&#xff08;二&#xff09;——VGGNet学习笔记&#xff08;翻译&#xff0b;精读&#xff09; 前言 上一篇我们介绍了经典神经网络的开山力作——AlexNet&#xff1a;经典神经网络论文超详细解读&#xff08;一&#xff09;——AlexNet学习笔记&a…

解密.locked1勒索病毒:专家级策略保护您的数据免受勒索攻击

导言&#xff1a; 在当今数字化的世界中&#xff0c;勒索病毒的威胁日益严峻。.locked1 勒索病毒作为其中的一种&#xff0c;采用高级的加密算法对用户文件进行加密&#xff0c;要求支付赎金以获取解密密钥。本文91数据恢复将介绍如何面对.locked1 勒索病毒&#xff0c;有效恢…

Python 3.12 版本有什么变化?

在前不久&#xff0c;python 3.12 正式发布了&#xff0c;那到底更新了哪些内容呢&#xff1f;一起来看看。 # 改善报错信息 来自官方标准库的模块现在可以在报NameError时提示问题原因&#xff0c;比如 >>> sys.version_info Traceback (most recent call last):Fi…

SpringBoot2—基础篇

目录 快速上手SpringBoot • SpringBoot入门程序开发 基于Idea创建SpringBoot工程&#xff08;一&#xff09; 基于官网创建SpringBoot工程&#xff08;二&#xff09; 基于阿里云创建SpringBoot工程&#xff08;三&#xff09; 手工创建Maven工程修改为SpringBoot工程&…

线程状态及线程之间通信

线程状态概述 当线程被创建并启动以后&#xff0c;它既不是一启动就进入了执行状态&#xff0c;也不是一直处于执行状态。在线程的生命周期中&#xff0c; 有几种状态呢&#xff1f;在 java.lang.Thread.State 这个枚举中给出了六种线程状态&#xff1a; 线程状态 导致状态发生…

Shopee活动名称怎么填写好?Shopee活动名称设置注意事项——站斧浏览器

虾皮活动名称的设定不仅是一个技巧性的问题&#xff0c;更是一门艺术。通过合理的活动名称设计&#xff0c;可以吸引更多的消费者参与活动&#xff0c;增加活动的曝光度和影响力。 shopee活动名称怎么填写好 简洁明了&#xff1a;活动名称应该尽量简洁明了&#xff0c;能够一…

北邮22级信通院数电:Verilog-FPGA(10)第十周实验 实现移位寄存器74LS595

北邮22信通一枚~ 跟随课程进度更新北邮信通院数字系统设计的笔记、代码和文章 持续关注作者 迎接数电实验学习~ 获取更多文章&#xff0c;请访问专栏&#xff1a; 北邮22级信通院数电实验_青山如墨雨如画的博客-CSDN博客 目录 一.代码部分 二.管脚分配 三.实现过程讲解及效…

上机练习 8: DataFrame 综合练习

记录一下做的练习题 目录 1)自定义一个 Series 并命名为 s1&#xff0c;自定义索引值&#xff0c;采用随机数作为其中数据尝试使用 s1.sum(计算其中所有数据的和,使用 s.mean(计算其中所有数据的平均值。 2)创建一个形状为4*6的 DataFrame 并命名为 df1,并指定行索引为[“a”…

多媒体领域顶会ACM MM 2023 获奖论文一览

ACM 国际多媒体会议是计算机科学领域中多媒体领域的顶级会议&#xff0c;属于CCF A类。今年的ACM MM 2023 已于2023年10月29日至11月2日在加拿大渥太华举行。 ACM MM会议专注于推动多媒体研究和应用&#xff0c;其研究领域广泛涉及触觉、视频、VR/AR、音频、语音、音乐、传感器…

23届计科,想找Java开发之类,真的是很难吗?

23届计科&#xff0c;想找Java开发之类&#xff0c;真的是很难吗&#xff1f; 你的投递信息(投递多少家&#xff0c;如何跟hr打招呼&#xff0c;已读不回如何应对等)都亮- -下才能知道问题出在 哪。最近很多小伙伴找我&#xff0c;说想要一些Java的资料&#xff0c;然后我根据…

asp.net在线考试系统+sqlserver数据库

asp.net在线考试系统sqlserver数据库主要技术&#xff1a; 基于asp.net架构和sql server数据库 功能模块&#xff1a; 首页 登陆 用户角色 管理员&#xff08;对老师和学生用户的增删改查&#xff09;&#xff0c;老师&#xff08;题库管理 选择题添加 选择题查询 判断题添加…

电商野路子:非标品中转仓项目

相信很多人之前都做过拼多多电商&#xff0c;抖音直播电商&#xff0c;淘宝虚拟电商&#xff0c;也做过淘宝传统电商&#xff0c;在童话看来&#xff0c;这些平台都已严重内卷&#xff0c;已经不再适合普通人进场了。凭你一没经验&#xff0c;二没背景&#xff0c;三没资源&…

成功解决:文档根元素 “mapper“ 必须匹配 DOCTYPE 根 “null“

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 文章目录 前言错误信息解决方法 前言 错误…

vscode 配置 lua

https://luabinaries.sourceforge.net/ 官网链接 主要分为4个步骤 下载压缩包&#xff0c;然后解压配置系统环境变量配置vscode的插件测试 这里你可以选择用户变量或者系统环境变量都行。 不推荐空格的原因是 再配置插件的时候含空格的路径 会出错&#xff0c;原因是空格会断…

零代码编程:用ChatGPT批量转换多个视频文件夹到音频并自动移动文件夹

有很多个视频文件夹&#xff1a; 要全部转成音频&#xff0c;然后复制到另一个文件夹。 在ChatGPT中输入如下提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个批量将Mp4视频转为Mp3音频的任务&#xff0c;具体步骤如下&#xff1a; 打开文件夹&#xff1a;…

基于JavaWeb+SpringBoot+掌上社区疫苗微信小程序系统的设计和实现

基于JavaWebSpringBoot掌上社区疫苗微信小程序系统的设计和实现 源码获取入口前言主要技术系统设计功能截图Lun文目录订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种…

PaddleClas学习2——使用PPLCNet模型对车辆朝向进行识别(python)

使用PPLCNet模型对车辆朝向进行识别 1. 配置PaddlePaddle,PaddleClas环境2. 准备数据2.1 标注数据格式2.2 标注数据3. 模型训练3.1 修改配置文件3.2 训练、评估4 模型预测1. 配置PaddlePaddle,PaddleClas环境 安装:请先参考文档 环境准备 配置 PaddleClas 运行环境。 2. 准…