探秘Kafka背后的幕后机关,揭示消息不丢失或重复的原理与实践经验

news2024/11/15 8:12:39

背景

相信大家在工作中都用过消息队列,特别是 Kafka 使用得更是普遍,业务工程师在使用 Kafka 的时候除了担忧 kafka 服务端宕机外,其实最怕如下这样两件事。

  • 消息丢失。下游系统没收到上游系统发送的消息,造成系统间数据不一致。比如,订单系统没有把成功状态的订单消息成功发送到消息队列里,造成下游的统计系统没有收到下单成功订单的消息,于是造成系统间数据的不一致,从而引起用户查看个人订单列表时跟实际不相符的问题。
  • 消息重复。相同的消息重复发送会造成消费者消费两次同样的消息,同样会造成系统间数据的不一致。比如,订单支付成功后会通过消息队列给支付系统发送需要扣款的金额,如果发送两次一样的扣款消息,而订单只支付了一次,而且还造成给用户余额多扣款的问题。

总结来说,这两个问题直接影响到业务系统间的数据一致性。那到底该如何避免这两个问题的发生呢?Kafka 针对这两个问题有系统的解决方案,需要服务端、客户端做相应的配置以及采取一些补偿方案。

因此,我会从生产端、服务端、消费端三个角度讲解 Kafka 是如何做到消息不丢或消息不重复的。当然,在这个过程中,为了有利于你更好的理解,在介绍的过程中我也会简单介绍一些 Kafka 的工作原理。

三种消息语义及场景

首先我要介绍一下“消息语义”的概念,这是理论基础,会有利于你更好地抓住下面解决方案的要点。

消息语义有三种,分别是:消息最多传递一次、消息最少传递一次、消息有且仅有一次传递,这三种语义分别对应:消息不重复、消息不丢、消息既不丢失也不重复。

这里的“消息传递一次”是指生产者生产消息成功,Broker 接收和保存消息成功,消费者消费消息成功。对一个消息来说,这三个要同时满足才算是“消息传递一次”。上面所说的那三种消息语义可梳理为如下。

  • 最多一次(At most once):对应消息不重复。消息最多传递一次,消息有可能会丢,但不会重复。一般运用于高并发量、高吞吐,但是对于消息的丢失不是很敏感的场景。
  • 最少一次(At least once):对应消息不丢。消息最少传递一次,消息不会丢,但有可能重复。一般用于并发量一般,对于消息重复传递不敏感的场景。
  • 有且仅有一次(Exactly once):每条消息只会被传递一次,消息不会丢,也不会重复。 用于对消息可靠性要求高,且对吞吐量要求不高的场景。

为便于你更好地对比理解和记忆,我汇总了如下一张表格:

三种消息语义各项对比表

3778f386e418f4b2c5a7411d9316c41b.jpeg


到这里,三种消息语义的定义和相关特点就介绍完了,接下来我们正式开始分析 Kafka 是如何做到消息不丢或消息不重的。

Kafka 如何做到消息不丢失?

我们先来讨论一下 Kafka 是如何做到消息不丢失的,也就是:生产者不少生产消息,服务端不丢消息,消费者也不能少消费消息

那怎么实现这kafka不丢失消息呢?就需要生产端、服务端和消费端做好以下配置

生产端:不少生产消息

以下是为了保证消息不丢失,生产端需要配置的参数和相关使用方法。

第一个,要使用带回调方法的 API,具体 API 方法如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)


使用带有回调方法的 API 时,我们可以根据回调函数得知消息是否发送成功,如果发送失败了,我们要进行异常处理,比如把失败消息存储到本地硬盘或远程数据库,等应用正常了再发送,这样才能保证消息不丢失。

第二个,设置参数 acks=-1。acks 这个参数是指有多少分区副本收到消息后,生产者才认为消息发送成功了,可选的参数值是 0、1 和 -1。

  • acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。
  • acks=1,表示生产者收到 leader 分区的响应就认为发送成功。
  • acks=-1 ,表示只有当 ISR(ISR的含义文章下面会给大家介绍) 中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果leader 副本挂了,当follower副本被选为leader副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息才能再次发送消息。


第三个,设置参数 retries=3。参数 retries 表示生产者生产消息的重试次数。 retries=3是一个建议值,一般情况下能满足足够的重试次数以至于能重试成功。但是如果重试失败了,对异常处理时就可以把消息保存到其他可靠的地方,如磁盘、数据库、远程缓存等,然后等到服务正常了再继续发送消息。


第四个,设置参数 retry.backoff.ms=300。retry.backoff.ms 指消息生产超时或失败后重试的间隔时间,单位是毫秒。如果重试时间太短,会出现系统还没恢复就开始重试的情况,进而导致再次失败,如果时间太长会造成。结合我个人经验来说,300 毫秒还是比较合适的。


只要上面四个要点配置对了,就基本可以保证生产端的生产者不少生产消息了


服务端:不丢消息

以下是为了保证服务端不丢消息,服务端需要配置的参数。

第一个,设置 replication.factor >1。replication.factor 这个参数表示分区副本的个数,这里我们要将其设置为大于 1 的数,这样当 leader 副本挂了,follower 副本还能被选为 leader 副本继续接收消息。


第二个,设置 min.insync.replicas >1

min.insync.replicas指ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量保证消息不丢。

这里我们简单介绍下 ISR。ISR 集合一个分区的副本的集合,每个分区都有自己的一个 ISR 集合。但不是所有的副本都会在这个集合里。首先 leader 副本是在 ISR 集合里的,以及如果一个follower副本的消息没落后leader副本太长时间,这个follower副本也在ISR集合里,同时如果有一个follower副本落后leader副本太长时间就从 ISR 集合里淘汰出去。也就是说ISR里的副本数量是小于等于分区的副本数量的。


第三个,设置 unclean.leader.election.enable = false。unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。如果设置成这样会有什么问题呢?下面我结合几个示意图来为你详细分析下这个问题。


假设 ISR 集合内的 follower1 副本和 ISR 集合外的 follower2 副本向 leader 副本拉取消息(如下图 1),也就是说这时 ISR 集合中就有两个副本,一个是 leader 副本,另一个是 follower1 副本,而follower2 副本由于网络或自身机器的原因已经落后 leader 副本很长时间,已经被踢出 ISR 集合。


332488d45cc1299c57aa732f70c668ae.jpeg

图 1:ISR 集合内外两个follower 副本拉取消息


突然 leader 和 follower1 这两个节点挂了(如图 2),会导致什么样的结果出现呢?


7d6580d0a60afa555db897471f6130f1.jpeg

图 2:ISR 集合内两个副本挂掉了


由于 unclean.leader.election.enable = true,而现在分区的副本能正常工作的仅仅剩下 follower2 副本,所以 follower2 最终会被选为新的 leader 副本并继续接收生产者发送的消息,如下图我们可以看到它接收了一个新的消息 5。


22fc851fcaaa1131fcbb18566ac8ac58.jpeg

图 3:follower2 副本被选为新的 leader 副本


如果这时 follower1 副本的服务恢复,又会发生什么情况呢?由于 follower 副本要拉取 leader 副本同步数据,首先要获取 leader 副本的信息,并感知到现在的 leader 副本的 LEO 比自己的还小,于是做了截断操作,这时 4 这个消息就丢了,这就造成了消息的丢失。


aa38fa4c17e41ccff38a69ea6a199e46.jpeg

图 4:follower1 副本服务恢复,消息 4 丢失


因此,我们一定要把 unclean.leader.election.enable 设置为 false,只有这样非 ISR 集合的副本才不会被选为分区的 leader 副本。但是这样做也降低了可用性,因为这个分区的副本没有 leader,就无法收发消息了,但是消息会发送到别的分区 leader 副本,也就是说分区的数量实际上减少了。


消费端:不能少消费消息

为了保证不丢失消息,消费者就不能少消费消息,该如何去实现呢?消费端需要做好如下的配置。


第一个,设置 enable.auto.commit=false。enable.auto.commit 这个参数表示是否自动提交,如果是自动提交会导致什么问题出现呢?


消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消息偏移量可以手动提交也可以自动提交。如果把参数enable.auto.commit设置为true指消息偏移量是由消费端的自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,业务处理还没进行完,这时提交了消息偏移量但是这时消费者挂了,这就造成还没进行完业务处理的消息的位移被提交了,下次再消费就消费不到这些消息,造成消息的丢失。因此,一定要设置 enable.auto.commit=false。 也就是手动提交消息偏移量。

第二步,手动提交偏移量的正确步骤。但是enable.auto.commit=false还不能完全满足消费端消息不丢的条件,还要有正确的手动提交偏移量的过程。具体如何操作呢?这里我们同样结合一个示意图来讲解,如下所示:


a6210ed9b7a1b0079135cd8b4b3ac38f.jpeg

避免少消费消息的偏移量提交方案


表示业务逻辑先对消息进行处理,再提交 offset。这种模式如果消费者在处理完消息后,提交 offset 前出现宕机,待消费者再上线时,还会处理未提交的那部分消息(这里是 2~7 这部分消息),但是这部分已经被消费者处理过了,也就是说这样做虽然避免了丢消息,但是会有重复消费的情况。

具体代码需要这么写:


List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();


Kafka 如何做到消息不重复?

接下来我们讨论 Kafka 又是如何做到消息不重复的,同样也是生产端重复生产消息,服务端重复存储消息,消费者也不能重复消费消息


生产端:

生产端发送消息后,假如遇到网络问题,无法获得响应,生产端就无法判断该消息是否成功提交到了 Kafka,而我们一般会配置重试次数,但这样会引发生产端重新发送同一条消息,从而造成消息重复的发送。


对于这个问题,Kafka 0.11.0 的版本之前并没有什么解决方案,不过从 0.11.0 的版本开始,Kafka 给每个 producer 一个唯一 ID,并且在每条消息中生成一个 sequence num,sequence num是递增且唯一的这样就能对消息去重,达到 一个生产端不重复发送一条消息。但是这个方法是有局限性的,只在一个 生产端 内生产的消息有效,如果一个消息分别在两个 producer 发送就不行了,还是会造成消息的重复发送。但是这种可能性比较小,因为消息的重试一般会在一个生产端内进行。当然,对应一个消息分别在两个 producer 发送的请求我们也有方案,只是多做一些补偿的工作,我们可以为每一个消息分配一个全局id,并把全局id存放在远程缓存或关系型数据库里。这样在发送前判断一下是否已经发送过。


服务端:

服务端不会重复的存储消息,如果有重复消息也应该是生产端重复发送造成的,所以无需特别的配置。


消费端:

第一步,enable.auto.commit=false:

同样要避免自动提交偏移量,大家可以想象一种情况,消费端拉取消息和处理消息都完成了,但是自动提交偏移量还没提交这时消费端挂了,这时候kakfa消费组开始重平衡并把分区分给了另一个消费者,由于偏移量没提交新的消费者会重复拉取消息,最终造成重复消费消息。

第二步,单纯配成手动提交同样不能避免重复消费,需要消费端用正确姿势消费。先看下图这种情况:


587173d04a5f0941f9a735e3783d34c8.jpeg


避免重复消费的偏移量提交方案


消费者拉取消息后,先提交 offset 后再处理消息。在提交 offset 之后,业务逻辑处理消息之前出现宕机,待消费者重新上线时,就无法读到刚刚已经提交而未处理的这部分消息(这里对应图中 5~8 这部分消息),这就对应了不重复消费消息但是会有丢失消息的情况。


具体代码如下:

List<String> messages = consumer.poll();consumer.commitOffset();processMsg(messages);


总结

最后这里我也简单总结下这一讲分享的主要内容。首先我们介绍了消息的三个语义及其场景,接下来我们从 Kafka 生产端、服务端和消费端三个方面具体讲解了我们到底该如何配置才能实现消息不丢失以及消息不重复。在这个过程中,我们也同步解释了一些 Kafka 的原理知识,这样能你才能知其然并知其所以然。

Kafka 中消息不丢失、不重复很重要,就我个人经验来讲,我是公司专门负责消息队列的架构师,业务的同学除了消息队列服务端宕机外,对消息的丢失和重复消息非常敏感,因为直接影响到了业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1143809.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot实现校园志愿者管理系统项目【项目源码+论文说明】

基于springboot实现校园志愿者管理系统演示 摘要 随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;校园志愿者管理系统也不例外&#xff0c;但目前国内仍都使用人工管理&#xff0c;市场规模越来越大&#xff0c;同时信息量也越来越庞大&#xff…

VMware 16开启虚拟机电脑就蓝屏W11解决方法

问题现象 解决方法 控制面板->程序->启用或关闭windows功能->勾选虚拟机平台->重启

VTKQT 3D交互---[3]降低抬高功能PushAndPull

前言&#xff1a;在医疗的应用中&#xff0c;有时会需要手动去修改模型&#xff0c;比如3-matic软件中的LocalSmoothing和PushAndPull功能。该博文主要记录降低抬高PushAndPull功能。

4 H3C网络设备模拟器

如果大家没有硬件&#xff0c;但是也想做一下组网&#xff0c;可以使用H3C提供的网络模拟器。因为我使用的是mac&#xff0c;所以将软件安装到虚拟机里。安装好之后可以看到如下结构&#xff1a; 交换机工作原理 在使用模拟器前&#xff0c;我们先学习一下交换机的工作原理。 …

深入剖析SQL与NoSQL的优劣势,帮你决定最佳数据存储方案

你是否在为系统的数据库来一波大流量就几乎打满 CPU&#xff0c;日常 CPU 居高不下烦恼?你是否在各种 NoSQL 间纠结不定&#xff0c;到底该选用哪种最好?今天的你就是昨天的我&#xff0c;这也是我写这篇文章的初衷。 作为互联网从业人员&#xff0c;我们要知道关系型数据库…

VMware16,运行虚机后E盘下就会产生一个奇怪的文件夹

问题现象&#xff1a; VMware16&#xff0c;运行虚机后E盘下就会产生一个奇怪的文件夹&#xff0c;是乱码的 问题原因&#xff1a; 虚机安装路径存在中文 解决方法&#xff1a; 删除乱码文件夹 一&#xff1a;是否有中文路径&#xff0c;有的话改为英文路径 二&#xff1…

HED边缘检测

HED边缘检测 HED边缘检测&#xff0c;得到模型后&#xff0c;可以用OPENCV调用&#xff0c;不需要其他依赖&#xff0c;C/PYTHON/ANDROID都可以实现&#xff0c;效果如下&#xff1a;

Ubuntu安装docker,并换镜像源详细教程,建议收藏

文章目录 添加docker官方的GPG密钥将docker仓库添加到apt源安装docker检查docker换源 添加docker官方的GPG密钥 sudo apt-get updatesudo apt-get install ca-certificates curl gnupgsudo install -m 0755 -d /etc/apt/keyringscurl -fsSL https://download.docker.com/linux…

GB/T28181流媒体相关协议详解

GB/T28181流媒体相关协议详解 文章目录 GB/T28181流媒体相关协议详解1 GB/T28181协议中使用的应用层协议介绍2 实时视频点播协议交互流程2.1 设备注册2.2 设备保活2.3 视频播放 总结 本文主要主要针对28181协议中视频流的部分&#xff0c;来阐述视频流通过28181协议如何进行视频…

解密RocketMq的运行机制,带你玩转分布式消息通信

一、 MQ背景&选型 消息队列作为高并发系统的核心组件之一&#xff0c;能够帮助业务系统解耦提升开发效率和系统稳定性。主要具有以下优势&#xff1a; 削峰填谷&#xff08;主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题&#xff09;系统解耦&#xf…

Faster R-CNN(2016.1)

文章目录 摘要引言过去计算proposals的算法我们提出的 相关工作Object ProposalsDeep Networks for Object Detection Faster R-CNNRegion Proposal NetworksAnchorsTranslation-Invariant AnchorsMulti-Scale Anchors as Regression References多尺度预测有两种流行的方法我们…

云计算未来展望:边缘计算、量子计算与AI

文章目录 边缘计算&#xff1a;数据处理的新时代应用领域挑战与机遇 量子计算&#xff1a;超越传统计算的新范式量子比特应用前景挑战与机遇 人工智能&#xff1a;云计算的动力云中的AI应用领域挑战与机遇 结语 &#x1f389;欢迎来到云计算技术应用专栏~云计算未来展望&#x…

Vue学习之样式汇总

Vue学习之样式汇总 一 二者左右排版 案例 说明&#xff1a;头部一左一右排版&#xff0c;内容一左一右两个排版&#xff0c;公告栏文字超过点点点显示 代码实现 说明&#xff1a; &#xff08;1&#xff09;头部实现一左一右排版需要使用一下两个样式 display: flex;justify-…

4、QtCharts 做心电图

文章目录 ui界面核心代码全部代码 ui界面 核心代码 void Dialog::slot_timer() {qreal xOffset0.f;//x的偏移量,推进的距离qreal dIncrease10;//增加量//数据for(int i0;i<10;i){m_xdIncrease;xOffsetdIncrease;m_splineSerise->append(m_x,qrand()%10);//根据实际情况删…

IDEA新建maven项目,使用mybatis操作数据库完整过程

IDEA新建maven项目&#xff0c;使用mybatis操作数据库完整过程 一、IDEA新建maven项目二、配置mybatis三、创建表对应实体类四、创建mapper接口五、使用mybatis操作数据库 前提&#xff1a; 这个教程是在maven项目中使用mybatis进行数据库操作&#xff0c;不是在spring boot项目…

51单片机复位电容计算与分析(附带Proteus电路图)

因为iC x (dU/dt).在上电瞬间&#xff0c;U从0变化到U,所以这一瞬间就是通的&#xff0c;然后这就是一个直流回路&#xff0c;因为电容C直流中是断路的&#xff0c;所以就不通了。 然后来分析一下这个电容的电压到底是能不能达到单片机需要的复位电压。 这是一个线性电容&…

听力检测为什么要在标准化的隔声屏蔽系统中进行?

作者兰明&#xff0c;医学硕士&#xff0c;听力学博士&#xff0c;听觉健康门诊主任 美国国家研究委员会;;行为、认知和感官科学委员会联合出版的听力损失确定社会保障福利的资格一书中关于测试环境的要求如下&#xff1a; 行动建议4-4 测试环境 听力学评估是在受控的声学环境中…

基于springboot实现休闲娱乐代理售票平台系统项目【项目源码+论文说明】

基于springboot实现休闲娱乐代理售票系统演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把休闲娱乐代理售票管理与现在网络相结合&#xff0c;利用java技术建设休闲娱乐代理售票系统&#xff0c;实现休闲娱乐代理售票的信息化。则对于进一步提高休闲娱乐代理售票管理发…

随写 - GPT使用时机

感慨科技的进步&#xff0c;还记得15年的时候初中&#xff0c;需要写一篇什么读后感&#xff0c;东抄西凑一篇500字的语句不通顺的文章交上去&#xff0c;那时候什么文库啥的都不需要会员&#xff0c;直接复制就行了。 现在问一问GPT什么都出来了。 1.EDA设计 由于课程结束&am…

基于springboot实现校园疫情防控系统项目【项目源码+论文说明】

基于springboot实现校园疫情防控系统演示 摘要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&am…