Kafka-服务端-副本机制

news2025/1/10 11:25:48

Kafka从0.8版本开始引入副本(Replica)的机制,其目的是为了增加Kafka集群的高可用性。

Kafka实现副本机制之后,每个分区可以有多个副本,并且会从其副本集合(Assigned Replica,AR)中选出一个副本作为Leader副本,所有的读写请求都由选举出的Leader副本处理。

剩余的其他副本都作为Follower副本,Follower副本会从Leader副本处获取消息并更新到自己的Log中。

我们可以认为Follower副本是Leader副本的热备份。

一般情况下,同一分区的多个副本会被均匀地分配到集群中的不同Broker上,当Leader副本的所在的Broker出现故障后,可以重新选举新的Leader副本继续对外提供服务。

通过这种方式提高了Kafka集群的可用性。

副本

在一个分区的Leader副本中会维护自身以及所有Follower副本的相关状态,而Follower副本只维护自己的状态。

此外,还有“本地副本”和“远程副本”两个概念需要读者注意,“本地副本”是指副本对应的Log分配在当前的Broker上,“远程副本”则是指副本对应的Log分配在其他的Broker上,在当前Broker上仅仅维护了副本的LEO等信息。

一个副本是“本地副本”还是“远程副本”与它是Leader副本还是Follower副本没有直接联系,如图所示。

在这里插入图片描述

分区

Kafka服务端使用Replica表示副本以及Replica中维护的信息,其中的partition字段指向了副本所属的分区。

服务端使用Partition表示分区,Partition负责管理每个副本对应的Replica对象,进行Leader副本的切换,ISR集合的管理以及调用日志存储子系统完成写入消息,它还提供了一些其他的辅助方法。

Partition中的核心字段的含义如下所述。

  • topic和partitionld:此Partition对象代表的Topic名称和分区编号。
  • localBrokerld:当前Broker的id,可以与replicald比较,从而判断指定的Replica对是否表示本地副本。
  • logManager:当前Broker上的LogManager对象。
  • zkUtils:操作ZooKeeper的辅助类。
  • leaderEpoch:Leader副本的年代信息。
  • leaderReplicaldOpt:该分区的Leader副本的id。
  • inSyncReplicas:Set[Replica]类型,该集合维护了该分区的ISR集合,ISR集合是AR集合的子集。
  • assignedReplicaMap:Pool[Int,Replica]类型,维护了该分区的全部副本的集合(AR集合)的信息。

Partition中的方法按照功能可以划分为下列五类。

  • 获取(或创建)Replica:getOrCreateReplica方法。
  • 副本的Leader/Follower角色切换:makeLeader方法和makeFollower方法。
  • ISR集合管理:maybeExpandIsr方法和maybeShrinkIsr()方法。
  • 调用日志存储子系统完成消息写入:appendMessagesToLeader()方法。
  • 检测HW的位置:checkEnoughReplicasReachOffset方法

创建副本

getOrCreateReplica()方法主要负责在AR集合(assignedReplicaMap)中查找指定副本的Replica对象,如果查找不到则创建Replica对象并添加到AR集合中管理。

如果创建的是Local Replica,还会创建(或恢复)对应的Log并初始化(或恢复)HW。

HW与Log.recoveryPoint类似,也会需要记录到文件中保存,在每个lig目录下都有一个replication-offset-checkpoint文件记录了此目录下每个分区的HW。

在ReplicaManager启动时会读取此文件到highWatermarkCheckpoints这个Map中,之后会定时更新replication-offset-checkpoint文件。

ISR集合管理

Partition除了对副本的Leader/Follower角色进行管理,还需要管理ISR集合。

随着Follower副本不断与Leader副本进行消息同步,Follower副本的LEO会逐渐后移,并最终追赶上Leader副本的LEO,此时该Follower副本就有资格进入ISR集合。

Partition.maybeExpandIsr()方法实现了扩张ISR集合的功能,其调用栈如图所示,它是在updateFollowerLogReadResults()方法中被调用的,在前面介绍DelayedFetch的处理流程时提到过此方法的功能,该方法用于处理来自Follower的FetchRequest。

追加消息

在分区中,只有Leader副本能够处理读写请求。

Partition.appendMessagesToLeader方法提供了向Leader副本对应的Log中追加消息的功能。

在前面介绍的DelayedProduce处理流程中,ReplicaManager.appendToLocalLog()方法就是基于此方法实现的。

ReplicaManager

在一个Broker上可能分布着多个Partition的副本信息,ReplicaManager的主要功能是管理一个Broker范围内的Partition信息。

ReplicaManager的实现依赖于前面介绍的日志存储子系统、DelayedOperationPurgatory、KafkaScheduler等组件,底层依赖于Partition和Replica。

在这里插入图片描述

ReplicaManager中各个字段的含义和功能如下所述。

  • logManager:LogManager对象,对分区的读写操作都委托给底层的日志存储子系统。
  • scheduler:KafkaSchedule对象,用于执行ReplicaManager中的周期性定时任务。在ReplicaManager中总共有三个周期性任务,它们分别是highwatermark-checkpoint任务、isr-expiration任务、isr-change-propagation任务。
  • controllerEpoch:记录KafkaController的年代信息,当重新选举Controller Leader时该字段值会递增。之后,在ReplicaManager处理来自KafkaController的请求时,会先检测请求中携带的年代信息是否等于controllerEpoch字段的值,这就避免接收旧Controller Leader发送的请求。这种设计方式在分布式系统中比较常见。
  • localBrokerld:当前Broker的id,主要用于查找Local Replica。
  • allPartitions:Pool[(String,Int),Partition]类型,其中保存了当前Broker上分配的所有Partition信息。这里需要注意Pool的valueFactory,当从Pool查找不到指定key时,则使用valueFactory创建一个默认value值放入Pool并返回。
  • replicaFetcherManager:在ReplicaFetcherManager中管理了多个ReplicaFetcherThread线程,ReplicaFetcherThread线程会向Leader副本发送FetchRequest请求来获取消息,实现Follower副本与Leader副本同步。ReplicaFetcherManager对象在ReplicaManager初始化时被创建,后面会详细介绍ReplicaFetcherManager与ReplicaFetcherThread的功能。
  • highWatermarkCheckpoints:Map[String,OffsetCheckpoint]类型,用于缓存每个log目录与OffsetCheckpoint之间的对应关系,OffsetCheckpoint记录了对应log目录下的replication-offset-checkpoint文件,该文件中记录了data目录下每个Partition的HW。ReplicaManager中的highwatermark-checkpoint任务会定时更新replication-offset-checkpoint文件的内容。
  • isrChangeSet:Set[TopicAndPartition]类型,用于记录ISR集合发生变化的分区信息。
  • delayedProducePurgatory、delayedFetchPurgatory:用于管理DelayedProduce和DelayedFetch的DelayedOperationPurgatory对象。
  • zkUtils:操作ZooKeeper的辅助类。

副本角色切换

在Kafka集群中会选举一个Broker成为KafkaController的Leader,它负责管理整个Kafka集群。

Controller Leader根据Partition的Leader副本和Follower副本的状态向对应的Broker节点发送LeaderAndIsrRequest,这个请求主要用于副本的角色切换,即指导Broker将其上的哪些分区的副本切换成Leader角色,哪些分区的副本切换成Follower介绍。

LeaderAndIsrRequest首先由KafkaAPis.handleLeaderAndIsrRequest()方法进行处理,其核心逻辑是通过ReplicaManager提供的becomeLeaderOrFollower方法实现的,而becomeLeaderOrFollower又依赖于上一小节介绍的Partition.makeLeader方法和makeFollower方法,上述调用关系如图所示。

在这里插入图片描述

在开始分析becomeLeaderOrFollower方法前,先来介绍一下LeaderAndIsrRequest和LeaderAndIsrResponse的格式,如图所示。在LeaderAndIsrRequest中比较重要的是partition_states集合这个字段,其中包含了每个分区的Leader副本所在的Brokerld、ISR集合、AR集合以及zk_version等信息。在LeaderAndIsrResponse的partitions集合字段中记录了每个分区的副本在当前Broker上的切换结果。

在这里插入图片描述

ReplicaManager.becomeLeaderOrFollower方法的主要逻辑是:获取(或创建)指定的Partition对象,根据partitionStates的信息对其切换成Leader/Follower的副本进行分类,并分别调用makeLeader和makeFollowers方法完成切换。

之后会启动highwatermark-checkpoint任务,然后关闭空闲的Fetcher线程,调用onLeadershipChange回调函数。

追加/读取消息

当Local Replica切换为Leader副本之后,就可以处理生产者发送的ProducerRequest,将消息写入到Log中。

在前面分析DelayedProduce的处理流程时,简单介绍了ReplicaManager.appendMessages方法,当时着重关注了与DelayedProduce相关的处理以及sendResponseCallback回调方法的实现。

这里详细分析appendToLocalLog方法的实现,它首先会检测消息要写入的Topic是否为Kafka的内部Topic(目前Kafka只有OffsetsTopic一个内部Topic),如果是内部Topic则需要检测是否允许对内部Topic进行追加,最终调用Partition.appendMessagesToLeader()方法完成消息追加。

appendToLocalLog方法的第二个参数记录了每个分区需要追加的消息集合。

消息同步

Follower副本与Leader副本同步的功能由ReplicaFetcherManager组件实现。

ReplicaFetcherManager继承了AbstractFetcherManager。

AbstractFetcherManager的继承和依赖关系如图所示。

在这里插入图片描述
在AbstractFetchManager中使用fetcherThreadMap字段(HashMap[BrokerAndFetcherld,AbstractFetcherThread]类型)管理AbstractFetcherThread,该Map的key值是BrokerAndFetcherld类型对象,其中封装了Broker的网络位置信息(brokerld、host、port等)以及对应的Fetcher线程的id。

AbstractFetcherManager中还提供了addFetcherForPartitions方法、removeFetcherForPartitions方法和shutdownldleFetcherThreads方法对fetcherThreadMap集合进行管理。

AbstractFetcherManager.addFetcherForPartitions()方法会让Follower副本从指定的offset开始与Leader副本进行同步。

该方法的参数涉及BrokerAndInitialOffset类,它其中封装了Broker的网络位置信息以及同步的起始offset。

具体的同步逻辑交由ReplicaFetcherThread线程处理。

在Follower发送ListOffsetRequest期间,新Leader可能不断追加消息,新Leader的LEO落后于Follower的LEO的场景得到改变,此时就不再需要进行截断操作了,Follower可以继续从其LEO与Leader进行同步。

这样新Leader与Follower的消息可能存在不一致的情况,如图所示。
在这里插入图片描述

关闭副本

当Broker接收到来自KafkaController的StopReplicaRequest请求时,会关闭其指定的副本,并根据StopReplicaRequest中的字段决定是否删除副本对应的Log。

在分区的副本进行重新分配、关闭Broker等过程中都会使用到此请求,但是需要注意的是,StopReplicaRequest并不代表一定会删除副本对应的Log,例如shutdown的场景下就没有必要删除Log。

而在重新分配Partition副本的场景下,就需要将旧副本及其Log删除。

先来介绍StopReplicaRequest、StopReplicaResponse的格式,如图所示。
在这里插入图片描述

StopReplicaRequest中的delete_partitions字段是一个boolean类型的值,表示是否要删除副本及其Log,partitions集合字段中记录待关闭的分区信息。

StopReplicaResponse的partitions集合记录了每个分区对应的处理结果。

API层接收到StopReplicaRequest后直接调用了ReplicaManager.stopReplicas()方法进行处理。

stopReplicas方法首先检查请求中的controllerEpoch值,之后停止指定分区的同步操作,最后遍历partitions集合根据delete_partitions的值决定是否对Log进行删除。

ReplicaManager中的定时任务

在ReplicaManager中总 共 有highwatermark-checkpoint、isr-expiration、isr-change-propagation三个定时任务。

highwatermark-checkpoint任务会周期性地记录每个Replica的HW并保存到其log目录中的replication-offset-checkpoint文件中。

isr-expiration任务会周期性地调用maybeShrinkIsr()方法检测每个分区是否需要缩减其ISR集合。

isr-change-propagation任务会周期性地将ISR集合发生变化的分区记录到ZooKeeper中。

如果检测到highwatermarkcheckpoint任务未启动,会调用startHighWaterMarksCheckPointThread方法启动highwatermark-checkpoint任务。

MetadataCache

MetadataCache是Broker用来缓存整个集群中全部分区状态的组件。

KafkaController通过向集群中的Broker发送UpdateMetadataRequest来更新其MetadataCache中缓存的数据,每个Broker在收到该请求后会异步更新MetadataCache中的数据。

MetadataCache中各字段的含义和功能如下所述。

  • cache:Map[String,Map[Int,PartitionStateInfo]]类型,记录了每个分区的状态,其中使用PartitionStatelnfo记录Partition的状态。
  • aliveBrokers:Map[Int,Broker]类型,记录了当前可用的Broker信息,其中使用Broker类记录每个存活Broker的网络位置信息(host、ip、port等)。
  • aliveNodes:Map[Int,Map[SecurityProtocol,Node]]类型,记录了可用节点的信息。

在开始分析Kafka处理UpdateMetadataRequest请求更新MetadataCache的流程之前,先来看一下UpdateMetadataRequest和UpdateMetadataResponse的格式,如图所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1407054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快速上手的AI工具-文心一言辅助学习

前言 大家好晚上好,现在AI技术的发展,它已经渗透到我们生活的各个层面。对于普通人来说,理解并有效利用AI技术不仅能增强个人竞争力,还能在日常生活中带来便利。无论是提高工作效率,还是优化日常任务,AI工…

数据管理平台Splunk Enterprise本地部署并结合内网穿透实现远程访问

文章目录 前言1. 搭建Splunk Enterprise2. windows 安装 cpolar3. 创建Splunk Enterprise公网访问地址4. 远程访问Splunk Enterprise服务5. 固定远程地址 前言 本文主要介绍如何简单几步,结合cpolar内网穿透工具实现随时随地在任意浏览器,远程访问在本地…

vue3项目中使用vite-plugin-mock

1、安装插件 npm i mockjs vite-plugin-mock --save-dev 2、修改配置文件 vite.config.js import { viteMockServe } from vite-plugin-mock plugins: [vue(), viteMockServe({ supportTs:false, logger: false, mockPath: "/mock/" // 注…

宝宝洗衣机好吗?高性价比的婴儿洗衣机推荐

随着大家生活水平的提高,越来越多人追求品质化生活。从洗衣服这件基础小事中就能看出,从比较早的解放双手,到追求衣物的洗护,再到近些年来,大人小孩衣服分区洗衣的精致生活理念。如今,洗衣机市场根据消费者…

Windows服务启动类型:自动(延迟启动)、自动、手动、禁用介绍

文章目录 Windows服务启动类型解析一、Windows服务简介1.1 Windows服务的特点无需用户交互启动时间可配置运行账户可配置 二、Windows服务启动类型详解2.1 自动(延迟启动)2.2 自动2.3 手动2.4 禁用 三、Windows服务启动类型的配置和管理3.1 使用“服务”…

ubuntu1604安装及问题解决

虚拟机安装vmbox7 虚拟机操作: 安装增强功能 sudo mkdir /mnt/share sudo mount -t vboxsf sharefolder /mnt/share第一次使用sudo提示is not in the sudoers file. This incident will be reported 你的root需要设置好密码 sudo passwd root 输入如下指令&#x…

CC工具箱使用指南:【处理面要素空洞】

一、简介 面要素在经过一系列的处理后,可能会存在空洞。 有些小空洞面积过小,人工检查很容易遗漏,于是就做了这个工具。 目的就是获取面要素的空洞,或者去除空洞获取要素的边界。 二、工具参数介绍 右键点击面要素图层&#xf…

c#算法(10)——求点到直线的距离

前言 在上位机软件开发领域,特别是机器视觉领域,经常会遇到尺寸测量的场景,比如让我们求一个点到一条直线的距离,我们已知了直线上的两个点的坐标,然后又已知了直线外的一个点的坐标,那么如何求出该直线外的一点到直线的距离呢?本文就是来讲解如何求点到直线的距离的,…

『论文阅读|2024 WACV 多目标跟踪Deep-EloU|纯中文版』

论文题目: Iterative Scale-Up ExpansionIoU and Deep Features Association for Multi-Object Tracking in Sports 论文特点: 作者提出了一种迭代扩展的 ExpansionIoU 和深度特征关联方法Deep-EIoU,用于体育场景中的多目标跟踪,旨…

【GitHub项目推荐--不错的 Java 开源项目】【转载】

1 基于 Java 的沙盒塔防游戏 Mindustry 是一款用 Java 编写的沙盒塔防游戏。玩家需要建造精密的传送带供应链,提供炮塔弹药,生产建筑材料,保护建筑并抵御敌人。也可以在跨平台多人合作游戏中与朋友一起战斗,或组队进行 PVP 比赛。…

SpringBoot项目整合MybatisPlus并使用SQLite作为数据库

文章目录 SQLite介绍搭建项目创建项目修改pom.xml SQLite查看SQLite是否安装创建数据库创建数据表IDEA连接SQLitenavicat连接SQLite数据库 后端增删改查接口实现MybatisX生成代码不会生成看这个UserUserMapperUserMapper.xml controller创建配置文件application.yaml启动类Incr…

集简云新增邮件发送功能,适用多种创意场景并提升邮件发送效率

在数字营销中,电子邮件依旧是连接企业与客户的重要桥梁。集简云深知这一点,本周推出为企业通讯打造的内置应用——集简云邮件发送,帮助用户创建充满个性化的交易电子邮件,还能通过HTML自定义代码来实现用户的创意场景。可与近千款…

springboot导出数据到excel模板,使用hutool导出数据到指定excel,java写入数据到excel模板

最近遇到一个需求,需要从数据库查询数据,写入到对应的excel导入模板中。再把导出的数据进行修改,上传。 我们项目用的是easyExcel,一顿百度搜索,不得其法。 主要是要把数据填充到指定单元格中,跟平时用到的…

【操作工具】IDEA的properties文件变为灰色的解决办法

背景 赋值了一份properties文件放到项目下面,但是里面的key都是灰色的 解决方案 去掉下面3后面对应的勾 去掉之后

《Linux C编程实战》笔记:信号的发送

信号的发送主要由函数kill、raise、sigqueue、alarm、setitimer以及abort来完成 kill函数 kill函数用来发送信号给指定的进程。 #include<sys/types.h> #include<signal.h> int kill(pid_t pid,int sig); 该函数的行为与第一个参数pid有关&#xff0c;第二个参…

幻兽帕鲁安装和开服教学

《幻兽帕鲁》游戏热度异常火爆&#xff0c;很多玩家想下载《幻兽帕鲁》和朋友玩&#xff0c;但不知道在哪里能够下载到&#xff0c;下面请看《幻兽帕鲁》下载安装教学&#xff0c;希望能够帮助大家。 幻兽帕鲁》目前仅在PC上的Steam平台发售&#xff0c;可以登录Steam搜索“幻…

Unity 解决异步分发方案

很多程序&#xff0c;包括游戏、小程序、一些AR、VR的程序&#xff0c;因为客户端体量太大&#xff0c;更新频繁都涉及到远程热更新的问题&#xff0c;解决这类问题的思路基本上是客户端解决主要功能&#xff0c;资源类放置在服务器。 下面记录下&#xff1a; 1.CDN或者云轻量…

AI嵌入式K210项目(18)-卷积人工神经网络硬件加速器 KPU

文章目录 前言一、K210的KPU二、实验过程总结 前言 K210内置了丰富的加速器&#xff0c;包括神经网络处理器 (KPU)&#xff0c;AES(高级加密加速器)&#xff0c;APU 麦克风阵列语音数据加速计算处理器&#xff0c;现场可编程 IO 阵列 (FPIOA)&#xff0c;数字摄像头接口 (DVP)…

源码篇--Redis 五种数据类型

文章目录 前言一、 字符串类型&#xff1a;1.1 字符串的编码格式&#xff1a;1.1.1 raw 编码格式:1.1.2 empstr编码格式:1.1.3 int 编码格式:1.1.4 字符串存储结构展示: 二、 list类型&#xff1a;2.1 List 底层数据支持&#xff1a;2.2 List 源码实现&#xff1a;2.3 List 结构…

水经微图系列产品新功能盘点!

水经微图&#xff0c;简称“微图”。 我们曾在直播中分享过微图APP苹果版的功能&#xff0c;本周四晚19:30我们将在另一个视频号分享盘点微图APP安卓版的详细功能&#xff0c;以及Web版近期上线的新功能功能。 微图APP安卓版 我们在《水经微图安卓版APP正式上线》一文中&…