ApacheKafka中的设计

news2024/11/5 18:33:24

文章目录

  • 1、介绍
    • 1_Kafka&MQ场景
    • 2_Kafka 架构剖析
    • 3_分区&日志
    • 4_生产者&消费者组
    • 5_核心概念总结
    • 6_顺写&mmap
    • 7_Kafka的数据存储形式
  • 2、Kafka的数据同步机制
    • 1_高水位(High Watermark)
    • 2_LEO
    • 3_高水位更新机制
    • 4_副本同步机制解析
    • 5_消息丢失问题
    • 6_数据不一致问题
    • 6_Leader Epoch 机制
  • 3、总结


1、介绍

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka是一种高吞吐量、低延迟和高可扩展的分布式发布订阅消息系统,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息。


1_Kafka&MQ场景

Apache Kafka提供了消息的订阅与发布的消息队列,一般用作系统间解耦、异步通信、削峰填谷等作用。同时Kafka又提供了Kafka streaming插件包实现了实时在线流处理。相比较一些专业的流处理框架不同,Kafka Streaming计算是运行在应用端,具有简单、入门要求低、部署方便等优点。

  • 消息队列 Message Queue:
    在这里插入图片描述

  • Kafka Streaming 流处理(插件,其它运行在服务端的流计算框架——storm、flink、spark stream):

    在这里插入图片描述

2_Kafka 架构剖析

消息队列作为一种在分布式和大数据开发中不可或缺的中间件。在分布式开发或者大数据开发中通常使用消息队列进行缓冲、系统间解耦和削峰填谷等业务场景,常见的消息队列工作模式大致会分为两大类:

  • 至多一次:消息生产者将数据写入消息系统,然后由消费者负责去拉去消息服务器中的消息,一旦消息被确认消费之后 ,由消息服务器主动删除队列中的数据,这种消费方式一般只允许被一个消费者消费,并且消息队列中的数据不允许被重复消费传统MQ。

    在这里插入图片描述

  • 没有限制:同上诉消费形式不同,生产者发不完数据以后,该消息可以被多个消费者同时消费,并且同一个消费者可以多次消费消息服务器中的同一个记录。主要是因为消息服务器一般可以长时间存储海量消息常见大数据领域——Kafka是这种。

    在这里插入图片描述


Kafka集群以Topic形式负责分类集群中的Record,每一个Record属于一个Topic

在这里插入图片描述

每个Topic底层都会对应一组分区的日志用于持久化Topic中的Record。同时在Kafka集群中,Topic的每一个日志的分区都一定会有1个Borker担当该分区的Leader其他的Broker担当该分区的follower(如下图 —— 一个Broker既是当前分区的 Leader,也是其他分区的 follower )。Leader负责分区数据的读写操作,follower负责同步改分区的数据(备份)

在这里插入图片描述

这样如果分区的Leader宕机,改分区的其他follower会选取出新的leader继续负责该分区数据的读写。如下图—— 当broker-0下线后,broker-2身兼 partition0partition1的 Leader。

在这里插入图片描述

其中集群的Leader的监控和Topic的部分元数据是存储在Zookeeper中。


3_分区&日志

Kafka中所有消息通过Topic为单位进行管理,每个Kafka中的Topic通常会有多个订阅者,负责订阅发送到改Topic中的数据。Kafka负责管理集群中每个Topic的一组日志分区数据

生产者将数据发布到相应的Topic。负责选择将哪个记录分发送到Topic中的哪个Partition。例如可以round-robin方式完成此操作,然而这种仅是为了平衡负载。也可以根据某些语义分区功能(例如基于记录中的Key)进行此操作——比如,使用订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中。

在这里插入图片描述

每组日志分区是一个有序的不可变的日志序列(只能保证分区内部先进先出——局部FIFO,不同分区之间无法保证————无法保证全局的有序,因此Kafka不是严格意义上的有序),分区中的每一个Record都被分配了唯一的序列编号称为是 offset,Kafka 集群会持久化所有发布到Topic中的Record信息,改Record的持久化时间是通过配置文件指定,默认是168小时(7天)

log.retention.hours=168

Kafka底层会定期的检查日志文件,然后将过期的数据从log中移除,由于Kafka使用硬盘存储日志文件,因此使用Kafka长时间缓存一些日志文件是不存在问题的。

Kafka中对Topic实现日志分区的有以下目的:

  • 首先,它们允许日志扩展到超出单个服务器所能容纳的大小。每个单独的分区都必须适合托管它的服务器,但是一个Topic可能有很多分区,因此它可以处理任意数量的数据。

  • 其次每个服务器充当其某些分区的Leader,也可能充当其他分区的Follwer,因此群集中的负载得到了很好的平衡。

在消费者消费Topic中数据的时候,每个消费者会维护本次消费对应分区的偏移量——offset,消费者会在消费完一个批次的数据之后,会将本次消费的偏移量提交给Kafka集群(其实Kafka消费者真正提交的是下一次读取的偏移量位置),因此对于每个消费者而言可以随意的控制改消费者的偏移量。

在这里插入图片描述

因此在Kafka中,消费者可以从一个topic分区中的任意位置读取队列数据,由于每个消费者控制了自己的消费的偏移量,因此多个消费者之间彼此相互独立。


4_生产者&消费者组

消费者使用Consumer Group名称标记自己,并且发布到Topic的每条记录都会传递到每个订阅Consumer Group中的一个消费者实例。

如果所有Consumer实例都具有相同的Consumer Group,那么Topic中的记录会在改ConsumerGroup中的Consumer实例进行均分消费;如果所有Consumer实例具有不同的ConsumerGroup,则每条记录将广播到所有Consumer Group进程。

更常见的是,我们发现Topic具有少量的 Consumer Group,每个Consumer Group可以理解为一个 “逻辑的订阅者”

每个Consumer Group均由许多Consumer实例组成,以实现可伸缩性和容错能力。这无非就是发布—订阅模型,其中订阅者是消费者的集群而不是单个进程。

这种消费方式Kafka会将Topic 按照分区的方式均分 给一个Consumer Group下的实例,如果ConsumerGroup下有新的成员介入,则新介入的Consumer实例会去接管ConsumerGroup内其他消费者负责的某些分区,同样如果一下ConsumerGroup下的有其他Consumer实例宕机,则改由 ConsumerGroup 其他实例接管。如果消费组中的消费者数量大于分区数量,多余的消费者将处于空闲状态——如果其他实例宕机,将会接管工作。

在这里插入图片描述

由于Kafka的Topic的分区策略,因此Kafka仅提供分区中记录的有序性,也就意味着相同Topic的不同分区记录之间无顺序。

因为针对于绝大多数的大数据应用和使用场景, 使用分区内部有序或者使用key进行分区策略已经足够满足绝大多数应用场景。

但是,如果您需要记录全局有序,则可以通过只有一个分区Topic来实现,尽管这将意味着每个ConsumerGroup只有一个Consumer进程。


5_核心概念总结

Broker(代理):

  • Kafka 集群由多个 Broker 组成。每个 Broker 是 Kafka 的一个实例,负责存储数据和处理读写请求。Kafka 的高可用性和可扩展性来自于多个 Broker 的协同工作。

Topic(主题):

  • Topic 是 Kafka 中的数据分类方式。数据在 Kafka 中是以 Topic 为单位进行组织的。生产者将数据发送到 Topic,消费者则从 Topic 中读取数据。Topic 可以被划分为多个分区(Partition),每个分区是一个有序的、不可变的消息日志。

Partition(分区):

  • Topic 可以分为多个分区,每个分区都是一个独立的日志文件。分区使得 Kafka 可以在多个 Broker 之间分散数据,提高吞吐量和并发处理能力。每个分区的消息是有序的,但不同分区之间没有全局顺序 ——可以只使用一个分区(不分区)来保证全局 FIFO。

Producer(生产者):

  • 生产者是向 Kafka 发送数据的客户端。生产者将消息写入到特定的 Topic,Kafka 会根据 Topic 的分区策略将消息分发到对应的分区中。

Consumer(消费者):

  • 消费者从 Kafka 中读取消息。消费者可以是单独的客户端,也可以是多个消费者组成的消费者组。Kafka 保证消费者组内的每个消费者从 Topic 的不同分区读取数据,从而实现负载均衡。

Zookeeper(扎克斯):

  • Zookeeper 是 Kafka 用于集群管理和协调的工具。它负责 Broker 的元数据管理、选举和状态监控等任务。Kafka 集群的元数据和配置信息都保存在 Zookeeper 中。

Offset(偏移量):

  • 偏移量是 Kafka 中消息的唯一标识,用于追踪消费者的读取进度。每个分区中的消息都有一个唯一的偏移量,消费者可以根据偏移量来获取消息

6_顺写&mmap

Kafka的特性之一就是高吞吐率,但是Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,但是Kafka即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。Kafka会把收到的消息都写入到硬盘中,防止丢失数据。为了优化写入速度Kafka采用了两个技术——顺序写入和MMFile

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。这样省去了大量的内存开销以及节省了IO寻址的时间。但是单纯的使用顺序写入,Kafka的写入性能也不可能和内存进行对比,因此Kafka的数据并不是实时的写入硬盘中 。

Kafka充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap)也称为内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page实现文件到物理内存的直接映射。完成MMP映射后,用户对内存的所有操作会被操作系统自动的刷新到磁盘上,极大地降低了IO使用率。

在这里插入图片描述

既然生产者写入数据得到了优化,那么数据的读取呢?

Kafka服务器在响应客户端读取的时候,底层使用ZeroCopy技术,直接将磁盘无需拷贝到用户空间,而是直接将数据通过内核空间传递输出,数据并没有抵达用户空间 —— 以操作系统为界限。

传统IO操作

  1. 用户进程调用read等系统调用向操作系统发出IO请求,请求读取数据到自己的内存缓冲区中。自己进入阻塞状态。
  2. 操作系统收到请求后,进一步将IO请求发送磁盘。
  3. 磁盘驱动器收到内核的IO请求,把数据从磁盘读取到驱动器的缓冲中。此时不占用CPU。当驱动器的缓冲区被读满后,向内核发起中断信号告知自己缓冲区已满。
  4. 内核收到中断,使用CPU时间将磁盘驱动器的缓存中的数据拷贝到内核缓冲区中。
  5. 如果内核缓冲区的数据少于用户申请的读的数据,重复步骤3、步骤4,直到内核缓冲区的数据足够多为止。
  6. 将数据从内核缓冲区拷贝到用户缓冲区,同时从系统调用中返回。完成任务

在这里插入图片描述

DMA读取(直接存储器访问——内存读取的协处理器)

  1. 用户进程调用read等系统调用向操作系统发出IO请求,请求读取数据到自己的内存缓冲区中。自己进入阻塞状态。
  2. 操作系统收到请求后,进一步将IO请求发送DMA。然后让CPU干别的活去。
  3. DMA进一步将IO请求发送给磁盘。
  4. 磁盘驱动器收到DMA的IO请求,把数据从磁盘读取到驱动器的缓冲中。当驱动器的缓冲区被读满后,向DMA发起中断信号告知自己缓冲区已满。
  5. DMA收到磁盘驱动器的信号,将磁盘驱动器的缓存中的数据拷贝到内核缓冲区中。此时不占用CPU。这个时候只要内核缓冲区的数据少于用户申请的读的数据,内核就会一直重复步骤3跟步骤4,直到内核缓冲区的数据足够多为止。
  6. 当DMA读取了足够多的数据,就会发送中断信号给CPU。
  7. CPU收到DMA的信号,知道数据已经准备好,于是将数据从内核拷贝到用户空间,系统调用返回。

在这里插入图片描述

跟IO中断模式相比,DMA模式下,DMA就是CPU的一个代理,它负责了一部分的拷贝工作,从而减轻了CPU的负担。DMA的优点就是:中断少,CPU负担低。

常规IO与 ZeroCopy 在网络环境下的工作:

常规IO:

在这里插入图片描述

ZeroCopy:

在这里插入图片描述

总结:

一般方案:

  1. 文件在磁盘中数据被copy到内核缓冲区
  2. 从内核缓冲区copy到用户缓冲区
  3. 用户缓冲区copy到内核与socket相关的缓冲区。
  4. 数据从socket缓冲区copy到相关协议引擎发送出去

Zero拷贝:

  1. 文件在磁盘中数据被copy到内核缓冲区
  2. 从内核缓冲区copy到内核与socket相关的缓冲区。
  3. 数据从socket缓冲区copy到相关协议引擎发送出去

7_Kafka的数据存储形式

我们上面说过 Kafka 分区中的数据内部有序,且是以日志的形式存储的,但是具体是怎么样的呢?

在这里插入图片描述

  • 一个topic由多个分区组成
  • 一个分区(partition)由多个segment(段)组成
  • 一个segment(段)由多个文件组成(log、index、timeindex)

综上,其实一个分区对应多个.log日志文件,而且是按照一个个文件块进行存储的——每个块、块和块之间都是有序的。

接下来,我们来看一下Kafka中的数据到底是如何在磁盘中存储的。

  • Kafka中的数据是保存在 log.dirs配置参数指定的数据目录中
  • 消息是保存在以:「主题名-分区ID」的文件夹中的
  • 数据文件夹中包含以下内容:

在这里插入图片描述

这些分别对应:

文件名说明
00000000000000000000.index索引文件,根据offset查找数据就是通过该索引文件来操作的
00000000000000000000.log日志数据文件
00000000000000000000.timeindex时间索引
leader-epoch-checkpoint持久化每个partition leader对应的LEO (log end offset、日志文件中下一条待写入消息的offset)
  • 每个日志文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都以0000000000000000000.log开始
  • 默认的每个日志文件最大为「log.segment.bytes =102410241024」1G
  • 为了简化根据offset查找消息,Kafka日志文件名设计为开始的偏移量

写入消息:

  • 新的消息总是写入到最后的一个日志文件中
  • 该文件如果到达指定的大小(默认为:1GB)时,将滚动到一个新的文件中

读取消息:

  • 根据「offset」首先需要找到存储数据的 segment 段(注意:offset指定分区的全局偏移量)
  • 然后根据这个「全局分区offset」找到相对于文件的「segment段offset」
  • 最后再根据 「segment段offset」读取消息
  • 为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就是使用简单的二分查找

删除消息:

  • 在Kafka中,消息是会被定期清理的。一次删除一个segment段的日志文件
  • Kafka的日志管理器,会根据Kafka的配置,来决定哪些文件可以被删除

2、Kafka的数据同步机制

Kafka的Topic被分为多个分区,分区是是按照Segments存储文件。分区日志是存储在磁盘上的日志序列,Kafka可以保证分区里的事件是有序的。

其中Leader负责对应分区的读写、Follower负责同步分区的数据,0.11 版本之前Kafka使用highwatermarker机制保证数据的同步,但是基于highwatermarker的同步数据可能会导致数据的不一致或者是乱序。在Kafka数据同步有以下概念。

  • LEO:log end offset 标识的是每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO.

  • HW: high watermarker称为高水位线,所有HW之前的的数据都理解是已经备份的,当所有节点都备份成功,Leader会更新水位线。

  • ISR:In-sync-replicas,kafka的leader会维护一份处于同步的副本集和,如果在replica.lag.time.max.ms时间内系统没有发送fetch请求,或者已然在发送请求,但是在该限定时间内没有赶上Leader的数据就被剔除ISR列表。在Kafka-0.9.0版本剔除replica.lag.max.messages消息个数限定,因为这个会导致其他的Broker节点频繁的加入和退出ISR。

1_高水位(High Watermark)

水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:

在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。

具体如下图所示:

在这里插入图片描述

图中标注“Completed”的蓝色部分代表已完成的工作,标注“In-Flight”的红色部分代表正在进行中的工作,两者的边界就是水位线。

在 Kafka 的世界中,水位的概念有一点不同,它是用消息位移来表征的

作用:

在 Kafka 中,高水位的作用主要有 2 个。

  • 1)定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  • 2)帮助 Kafka 完成副本同步。

在这里插入图片描述

我们假设这是某个分区 Leader 副本的高水位图。首先,请你注意图中的“已提交消息”和“未提交消息”。

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。

注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。

2_LEO

图中还有一个日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。

注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15

高水位和 LEO 是副本对象的两个重要属性。 Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位

3_高水位更新机制

实际上,除了保存一组高水位值和 LEO 值=之外,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本(也称为远程副本)的 LEO 值。

在这里插入图片描述

在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。

为什么要在 Broker 0 上保存这些远程副本呢?

其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。

更新机制如下表:

更新对象更新时机
Broker 1 上的 Follow 副本 LEOFollower 副本从 Leader 副本拉取消息,写入到本地磁盘后,会更新其 LEO 值。
Broker 0 上Leader 副本 LEOLeader副本接收到生产者发送的消息,写入到本地磁盘后,会更新其LEO值。
Broker 0 上远程副本 LEOFollower副本从eader副本拉取消息时,会告诉L eader副本从哪个位移处开始拉取。L eader副本会使用这个位移值来更新远程副本的L EO。
Broker 1 上Follower副本高水位Follower副本成功更新完LEO之后,会比较其LEO值与Leader副本发来的高水位值,并用两者的较小值去更新它自己的高水位。
Broker 0上Leader副本高水位主要有两个更新时机: 一个是Leader副本更新其LEO之后;另一个是更新完远程副本LEO之后。具体的算法是:取 Leader副本和所有与Leader同步的远程副本LEO中的最小值

Leader 副本

处理生产者请求的逻辑如下:

  • 1)写入消息到本地磁盘。
  • 2)更新分区高水位值。
    • 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)。
    • 获取 Leader 副本高水位值:currentHW。
    • 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}。

处理 Follower 副本拉取消息的逻辑如下:

  • 1)读取磁盘(或页缓存)中的消息数据。
  • 2)使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值。
  • 3)更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。

Follower 副本

从 Leader 拉取消息的处理逻辑如下:

  • 1)写入消息到本地磁盘。
  • 2)更新 LEO 值。
  • 3)更新高水位值。
    • 获取 Leader 发送的高水位值:currentHW。
    • 获取步骤 2 中更新过的 LEO 值:currentLEO。
    • 更新高水位为 min(currentHW, currentLEO)。

4_副本同步机制解析

首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。

在这里插入图片描述

当生产者给主题分区发送一条消息后,状态变更为:

在这里插入图片描述

此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。

Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:

在这里插入图片描述

这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:

在这里插入图片描述

在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步

5_消息丢失问题

从刚才的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。

在这里插入图片描述

开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。

现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的。还记得吧,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。

当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。

6_数据不一致问题

依赖HW的概念实现数据同步,还存在数据不一致的问题。如下图:

在这里插入图片描述

我们还是使用与上面相似的开始情况进行举例。开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。

我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位(HW)还未更新。副本A的HW在 1 的位置,B 在 0 的位置。

就在此时,A、B同时宕机,并且副本 B先启动成功,执行日志截断操作并成为新的 Leader——副本B的HW为0。

副本B成为新的 Leader 后接收到了一条新的请求写入 3,并更新水位线完成,此时 B的HW变为1,并且副本A还在故障中。

副本A故障恢复后发现自己与 Leader (副本额B)高水位线一致,数据同步完成。最终,数据不一致问题产生。

6_Leader Epoch 机制

社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  • 1)Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  • 2)起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

在这里插入图片描述

任意一个Leader持有一个LeaderEpoch。该LeaderEpoch这是一个由Controller管理的32位数字,存储在Zookeeper的分区状态信息中,并作为LeaderAndIsrRequest的一部分传递给每个新的Leader。Leader接受Producer请求数据上使用LeaderEpoch标记每个Message。然后,该LeaderEpoch编号将通过复制协议传播,并用于替换HW标记,作为消息截断的参考点。

在这里插入图片描述

场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。

现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。

这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。

数据不一致的问题也类似,当副本B成功启动时,经过 Leader Epoch 地判断不再进行数据的截断。

3、总结

Apache Kafka 是一个强大的分布式流处理平台,凭借其高吞吐量、低延迟和高可扩展性,在大数据处理和实时数据流领域中发挥了重要作用。无论是日志收集、数据集成还是实时流处理,Kafka 都提供了可靠、高效的解决方案。了解 Kafka 的核心概念和工作原理,对于构建现代化的数据处理架构和实现高效的实时数据流管理至关重要。

参考:https://www.lixueduan.com/posts/kafka/12-hw-leader-epoch/。

多种消息队列对比:

特性ActiveMQRabbitMQKafkaRocketMQ
所属社区/公司ApacheMozilla Public LicenseApacheApache/Ali
成熟度成熟成熟成熟比较成熟
生产者-消费者模式支持支持支持支持
发布-订阅支持支持支持支持
REQUEST-REPLY支持支持-支持
API完备性低(静态配置)
多语言支持支持JAVA优先语言无关支持,JAVA优先支持
单机呑吐量万级(最差)万级十万级 十万级(最高)
消息延迟-微秒级毫秒级 -
可用性高(主从)高(主从)非常高(分布式)
消息丢失-理论上不会丢失 -
消息重复-可控制理论上会有重复-
事务支持不支持支持支持
文档的完备性
提供快速入门
首次部署难度-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2114787.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis典型应用 - 分布式锁

文章目录 目录 文章目录 1. 什么是分布式锁 2. 分布式锁的基本实现 3. 引入过期时间 4. 引入校验Id 5. 引入 watch dog(看门狗) 6. 引入redlock算法 工作原理 Redlock的优点: 总结 1. 什么是分布式锁 在一个分布式系统中,也可能会出现多个节点访问一个共…

QT 编译报错:C3861: ‘tr‘ identifier not found

问题: QT 编译报错:C3861: ‘tr’ identifier not found 原因 使用tr的地方所在的类没有继承自 QObject 类 或者在不在某一类中, 解决方案 就直接用类名引用 :QObject::tr( )

关于易优cms自定义字段不显示的问题

今天在该易优cms自定义字段&#xff0c;折腾了大半天没显示出来&#xff0c;原来是改错对方了。 主要引用的时候 要放在list标签内&#xff0c;不要看文档&#xff0c;把addfields 放在list标签上 例如 {eyou:list loop8} <li><a href"{$field.arcurl}">…

基于yolov8的电动车佩戴头盔检测系统python源码+onnx模型+评估指标曲线+精美GUI界面

【算法介绍】 基于YOLOv8的电动车佩戴头盔检测系统利用了YOLOv8这一先进的目标检测模型&#xff0c;旨在提高电动车骑行者的安全意识&#xff0c;减少因未佩戴头盔而导致的交通事故风险。YOLOv8作为YOLO系列的最新版本&#xff0c;在检测速度和精度上均进行了优化&#xff0c;…

✨机器学习笔记(一)—— 监督学习和无监督学习

1️⃣ 监督学习&#xff08;supervised learning&#xff09; ✨ 两种主要类型的监督学习问题&#xff1a; 回归&#xff08;regression&#xff09;&#xff1a;predict a number in infinitely many possible outputs. 分类&#xff08;classification&#xff09;&#xff1…

C#串口助手初级入门

1.创建项目 修改项目名称与位置&#xff0c;点击创建 2.进入界面 在视图中打开工具箱&#xff0c;鼠标拖动&#xff0c;便可以在窗口添加控件&#xff0c;右边可以查看与修改属性 3.解决方案资源管理器 发布之前&#xff0c;需要修改相关的信息&#xff0c;比如版本号&#x…

Lombok jar包引入和用法

大家好&#xff0c;今天分享一个在编写代码时的快捷方法。 当我们在封装实体类时&#xff0c;会使用set、get等一些方法。如下图&#xff0c;不但费事还影响代码的美观。 那么如何才能减少代码的冗余呢&#xff0c;首先lib中导入lombok的jar包并添加库。 此处我已导入&#xf…

插件:清理maven错误缓存.bat

插件&#xff1a;https://pan.baidu.com/s/1nHIxHoo1C4MvFlW7QbZe5Q?pwd7zenhttps://pan.baidu.com/s/1nHIxHoo1C4MvFlW7QbZe5Q?pwd7zen没错误缓存时&#xff1a; 有错误缓存时&#xff1a;

真实案例分享:零售企业如何避免销售数据的无效分析?

在零售业务的数据分析中&#xff0c;无效分析不仅浪费时间和资源&#xff0c;还可能导致错误的决策。为了避免这种情况&#xff0c;企业必须采取策略来确保他们的数据分析工作能够产生实际的商业价值。本文将通过行业内真实的案例&#xff0c;探讨零售企业如何通过精心设计的数…

springboot数据库连接由localhost改成IP以后访问报错500(2024/9/7

步骤很详细&#xff0c;直接上教程 情景复现 一.没改为IP之前正常 二.改完之后报错 问题分析 SQL没开启远程连接权限 解决方法 命令行登入数据库 mysql -u root -p切换到对应数据库 use mysql;设置root用户的连接权限允许其他IP连接数据库 update user set host % whe…

jmeter执行python脚本,python脚本的Faker库

jmeter安装 jython的插件jar包 通过如下地址下载jython-standalone-XXX.jar包并放到jmeter的XXX\lib\ext目录下面 Downloads | JythonThe Python runtime on the JVMhttps://www.jython.org/download.html 重启jmeter在JSR223中找到jython可以编写python代码执行 python造数据…

一种快速生成CSV的方法

事情是这个样子的 在QQ群在聊把如何100万数据导出成CSV文件&#xff1f;会不会很慢&#xff1f; 俺回了一句“现在的机器性能好&#xff0c;没啥问题”。 然后大家开始谈论机器的配置了。哎&#xff0c;俺的机器配置有点差。 然后俺就进行了一个测试。 测试数据 数据定义…

【C++二分查找】2439. 最小化数组中的最大值

本文涉及的基础知识点 C二分查找 LeetCode2439. 最小化数组中的最大值 给你一个下标从 0 开始的数组 nums &#xff0c;它含有 n 个非负整数。 每一步操作中&#xff0c;你需要&#xff1a; 选择一个满足 1 < i < n 的整数 i &#xff0c;且 nums[i] > 0 。 将 num…

C++ | Leetcode C++题解之第392题判断子序列

题目&#xff1a; 题解&#xff1a; class Solution { public:bool isSubsequence(string s, string t) {int n s.size(), m t.size();vector<vector<int> > f(m 1, vector<int>(26, 0));for (int i 0; i < 26; i) {f[m][i] m;}for (int i m - 1; …

.Net6/.Net8(.Net Core) IIS中部署 使用 IFormFile 上传大文件报错解决方案

描述 最近使用.Net6 WebAPI IFormFile对象接收上传文件时大于30MB(兆)的文件就会报错 原因分析 IIS上传文件有大小默认限制大约28.6MB 解决办法 .无论是Net6还是.Net8写法都一样 方法一&#xff1a;IIS可视化操作 1.打开Internet Information Services (llS)管理器&…

Banana Pi BPI-SM9 AI 计算模组采用算能科技BM1688芯片方案设计

产品概述 香蕉派 Banana Pi BPI-SM9 16-ENC-A3 深度学习计算模组搭载算能科技高集成度处理器 BM1688&#xff0c;功耗低、算力强、接口丰富、兼容性好。支持INT4/INT8/FP16/BF16/FP32混合精度计算&#xff0c;可支持 16 路高清视频实时分析&#xff0c;灵活应对图像、语音、自…

LeetCode --- 413周赛

题目列表 3274. 检查棋盘方格颜色是否相同 3275. 第 K 近障碍物查询 3276. 选择矩阵中单元格的最大得分 3277. 查询子数组最大异或值 一、检查棋盘方格颜色是否相同 题目给定两个字符串来表示两个方格的坐标&#xff0c;让我们判断这两个方格的颜色是否相同&#xff0c;这…

C++——关联式容器(2):AVL树(平衡二叉树)

2.AVL树 2.1 AVL树的概念 在学习了二叉搜索树后&#xff0c;我们发现了二叉搜索树可以根据大小比较来进行类似于折半查找的操作&#xff0c;使得搜索时间复杂度达到logn的水准。但是在面对极端情况下&#xff0c;如近似有序的序列&#xff0c;那么整棵树的时间复杂度就有可能退…

【Godot4.3】多边形的斜线填充效果基础实现

概述 图案&#xff08;Pattern&#xff09;填充是一个非常常见的效果。其中又以斜线填充最为简单。本篇就探讨在Godot4.3中如何使用Geometry2D和CanvasItem的绘图函数实现斜线填充效果。 基础思路 Geometry2D类提供了多边形和多边形以及多边形与折线的布尔运算。按照自然的思…

Spring-@Bean的处理流程

Bean前置知识 1 需要再Configuration Class中才能被解析 2 静态Bean也就是标注在static方法上的 实例Bean标注在普通方法上的 所有的Bean在创建之前都会变成BeanDefinition,其中有这样两个属性&#xff1a; setFactoryMethodName&#xff1a;静态方法 setFactoryBeanName&…