kafka集群工作机制

news2025/1/11 4:03:38

一、kafka在zookeeper上的元数据解释

kafka中的broker要选举Controller角色来管理整个kafka集群中的分区和副本状态。一个Topic下多个partition要选举Leader角色和客户端进行交互数据

Zookeeper客户端工具: prettyZoo。 下载地址:https://github.com/vran-dev/PrettyZoo/releases

对于kafka还有个问题,就是kafka集群的每个broker都会注册在zookeeper的临时节点/broker/ids/{BrokerId} ,如果集群节点服务器是非正常关机,zookeeper上面对应broker的节点不会删除,再次启动broker往zookeeper会报错

二、Controller Broker选举机制

通过在zookeeper上创建一个/controller的临时节点,写入当前启动的broker信息,其它的服务器无法写入了,写入成功的作为Controller,写入的内容如下:

{"version":1,"brokerid":0,"timestamp":"1661492503848"}

Controller会与zookeeper保持一个长连接,如果属于Controller角色的broker宕机,zookeeper长时间检测不到心跳就会删除/controller节点,其它broker就会监听到并重新竞争/controller

​ 选举产生的Controller节点,就会负责监听Zookeeper中的其他一些关键节点,触发集群的相关管理工作。例如:

  • 监听Zookeeper中的/brokers/ids节点,感知Broker增减变化。
  • 监听/brokers/topics,感知topic以及对应的partition的增减变化。
  • 监听/admin/delete_topic节点,处理删除topic的动作。

​ 另外,Controller还需要负责将元数据推送给其他Broker。

三、Leader Partition选举机制

  • AR: Assigned Repllicas。 表示Kafka分区中的所有副本(存活的和不存活的)
  • ISR: 表示在所有AR中,服务正常,保持与Leader同步的Follower集合。如果Follower长时间没有向Leader发送通信请求(超时时间由replica.lag.time.max.ms参数设定,默认30S),那么这个Follower就会被提出ISR中。(在老版本的Kafka中,还会考虑Partition与Leader Partition之间同步的消息差值,大于参数replica.lag.max.messages条就会被移除ISR。现在版本已经移除了这个参数。)
  • OSR:表示从ISR中踢出的节点。记录的是那些服务有问题,延迟过多的副本。

其中,AR和ISR比较关键,可以通过kafka-topics.sh的--describe指令查看。

[oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic disTopic
Topic: disTopic TopicId: vX4ohhIER6aDpDZgTy10tQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: disTopic Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: disTopic Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: disTopic Partition: 2    Leader: 2       Replicas: 0,2   Isr: 2,0
        Topic: disTopic Partition: 3    Leader: 2       Replicas: 2,0   Isr: 2,0

这个结果中,AR就是Replicas列中的Broker集合。而这个指令中的所有信息,其实都是被记录在Zookeeper中的。

 接下来,Kafka设计了一套非常简单高效的Leader Partition选举机制。在选举Leader Partition时,会按照AR中的排名顺序,靠前的优先选举。只要当前Partition在ISR列表中,也就是是存活的,那么这个节点就会被选举成为Leader Partition。

例如,我们可以设计一个实验来验证一下LeaderPartiton的选举过程。

#1、创建一个备份因子为3的Topic,每个Partition有3个备份。
[oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 3 --partitions 4 --topic secondTopic 
Created topic secondTopic.
#2、查看Topic的Partition情况 可以注意到,默认的Leader就是ISR的第一个节点。
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic                                       
Topic: secondTopic      TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4       ReplicationFactor: 3    Configs: 
        Topic: secondTopic      Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: secondTopic      Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: secondTopic      Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2
        Topic: secondTopic      Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,0,2
#3、在worker3上停掉brokerid=2的kafka服务。
[oper@worker3 kafka_2.13-3.2.0]$ bin/kafka-server-stop.sh 
#4、再次查看SecondTopic上的Partiton分区情况
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic                                       
Topic: secondTopic      TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4       ReplicationFactor: 3    Configs: 
        Topic: secondTopic      Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0
        Topic: secondTopic      Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1
        Topic: secondTopic      Partition: 2    Leader: 1       Replicas: 2,1,0 Isr: 1,0
        Topic: secondTopic      Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,0

从实验中可以看到,当BrokerId=2的kafka服务停止后,2号BrokerId就从所有Partiton的ISR列表中剔除了。然后,Partition2的Leader节点原本是Broker2,当Broker2的Kafka服务停止后,都重新进行了Leader选举。Parition2预先评估的是Replicas列表中Broker2后面的Broker1,Broker1在ISR列表中,所以他被最终选举成为Leader。

​ 当Partiton选举完成后,Zookeeper中的信息也被及时更新了。

/brokers/topics/secondTopic: {"partitions":{"0":[1,0,2],"1":[0,2,1],"2":[2,1,0],"3":[1,2,0]},"topic_id":"W3mXDtj1RsWmsEhQrZjN5g","adding_replicas":{},"removing_replicas":{},"version":3}
/brokers/topics/secondTopic/partitions/0/state: {"controller_epoch":20,"leader":1,"version":1,"leader_epoch":2,"isr":[1,0]}

Leader Partitoin选举机制能够保证每一个Partition同一时刻有且仅有一个Leader Partition,这样分配Leader Partition是有问题的,Leader Partition用于接收客户端请求,这样分配显然是分配不均,导致Broker1过于繁忙

四、Leader Partition自动平衡机制

Kafka会尽量将Leader Partition分配到不同的Broker节点上

Kafka在进行Leader Partition自平衡时的逻辑是这样的:他会认为AR当中的第一个节点就应该是Leader节点。这种选举结果成为preferred election 理想选举结果。Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition比例高于leader.imbalance.per.broker.percentage阈值时,就会触发一次Leader Partiton的自平衡。

这是官方文档的部分截图。

image.png

这个机制涉及到Broker中server.properties配置文件中的几个重要参数:

#1 自平衡开关。默认true
auto.leader.rebalance.enable
Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.
Type:	boolean
Default:	true
Valid Values:	
Importance:	high
Update Mode:	read-only
#2 自平衡扫描间隔
leader.imbalance.check.interval.seconds
The frequency with which the partition rebalance check is triggered by the controller
Type:	long
Default:	300
Valid Values:	[1,...]
Importance:	high
Update Mode:	read-only
#3 自平衡触发比例
leader.imbalance.per.broker.percentage
The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.

Type:	int
Default:	10
Valid Values:	
Importance:	high
Update Mode:	read-only

这几个参数可以到broker的server.properties文件中修改。但是注意要修改集群中所有broker的文件,并且要重启Kafka服务才能生效。

​ 另外,你也可以通过手动调用kafka-leader-election.sh脚本,触发一次自平衡。例如:

# 启动worker3上的Kafka服务,Broker2上线。
# secondTopic的partion2不是理想状态。理想的leader应该是2
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic
Topic: secondTopic      TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4       ReplicationFactor: 3    Configs: 
        Topic: secondTopic      Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: secondTopic      Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: secondTopic      Partition: 2    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
        Topic: secondTopic      Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,0,2
# 手动触发所有Topic的Leader Partition自平衡        
[oper@worker1 bin]$ ./kafka-leader-election.sh --bootstrap-server worker1:9092 --election-type preferred  --topic secondTopic --partition 2
Successfully completed leader election (PREFERRED) for partitions secondTopic-2
# 自平衡后secondTopic的partition2就变成理想状态了。
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic                                       
Topic: secondTopic      TopicId: W3mXDtj1RsWmsEhQrZjN5g PartitionCount: 4       ReplicationFactor: 3    Configs: 
        Topic: secondTopic      Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: secondTopic      Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: secondTopic      Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2
        Topic: secondTopic      Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,0,2

​ 但是要注意,这样Leader Partition自平衡的过程是一个非常重的操作,因为要涉及到大量消息的转移与同步。并且,在这个过程中,会有丢消息的可能。所以在很多对性能要求比较高的线上环境,会选择将参数auto.leader.rebalance.enable设置为false,关闭Kafka的Leader Partition自平衡操作,而用其他运维的方式,在业务不繁忙的时间段,手动进行Leader Partiton自平衡,尽量减少自平衡过程对业务的影响。

五、Partition故障恢复机制

当Leader Partition对应broker宕机了如何选举新的Leader Partition接收客户端请求

先理解如下两个参数

  • LEO(Log End Offset): 每个Partition的最后一个Offset

​ 这个参数比较好理解,每个Partition都会记录自己保存的消息偏移量。leader partition收到并记录了生产者发送的一条消息,就将LEO加1。而接下来,follower partition需要从leader partition同步消息,每同步到一个消息,自己的LEO就加1。通过LEO值,就知道各个follower partition与leader partition之间的消息差距。

  • HW(High Watermark): 一组Partiton中最小的LEO。

​ follower partition每次往leader partition同步消息时,都会同步自己的LEO给leader partition。这样leader partition就可以计算出这个HW值,并最终会同步给各个follower partition。leader partition认为这个HW值以前的消息,都是在所有follower partition之间完成了同步的,是安全的。这些安全的消息就可以被消费者拉取过去了。而HW值之后的消息,就是不安全的,是可能丢失的。这些消息如果被消费者拉取过去消费了,就有可能造成数据不一致。

image.png

​ 也就是说,在所有服务都正常的情况下,当一个消息写入到Leader Partition后,并不会立即让消费者感知。而是会等待其他Follower Partition同步。这个过程中就会推进HW。当HW超过当前消息时,才会让消费者感知。比如在上图中,4号往后的消息,虽然写入了Leader Partition,但是消费者是消费不到的。

这跟生产者的acks应答参数是不一样的

​ 当服务出现故障时,如果是Follower发生故障,这不会影响消息写入,只不过是少了一个备份而已。处理相对简单一点。Kafka会做如下处理:

  1. 将故障的Follower节点临时提出ISR集合。而其他Leader和Follower继续正常接收消息。
  2. 出现故障的Follower节点恢复后,不会立即加入ISR集合。该Follower节点会读取本地记录的上一次的HW,将自己的日志中高于HW的部分信息全部删除掉,然后从HW开始,向Leader进行消息同步。
  3. 等到该Follower的LEO大于等于整个Partiton的HW后,就重新加入到ISR集合中。这也就是说这个Follower的消息进度追上了Leader。

image.png

​ 如果是Leader节点出现故障,Kafka为了保证消息的一致性,处理就会相对复杂一点。

  1. Leader发生故障,会从ISR中进行选举,将一个原本是Follower的Partition提升为新的Leader。这时,消息有可能没有完成同步,所以新的Leader的LEO会低于之前Leader的LEO。
  2. Kafka中的消息都只能以Leader中的备份为准。其他Follower会将各自的Log文件中高于HW的部分全部清理掉,然后从新的Leader中同步数据。
  3. 旧的Leader恢复后,将作为Follower节点,进行数据恢复。

image.png

​ 在这个过程当中,Kafka注重的是保护多个副本之间的数据一致性。但是这样,消息的安全性就得不到保障。例如在上述示例中,原本Partition0中的4,5,6,7号消息就被丢失掉了。

六、HW一致性保障-Epoch更新机制

有了HW机制后,各个Partiton的数据都能够比较好的保持统一。但是,实际上,HW值在一组Partition里并不是总是一致的。

​ Leader Partition需要计算出HW值,就需要保留所有Follower Partition的LEO值。

​ 但是,对于Follower Partition,他需要先将消息从Leader Partition拉取到本地,才能向Leader Partition上报LEO值。所有Follower Partition上报后,Leader Partition才能更新HW的值,然后Follower Partition在下次拉取消息时,才能更新HW值。所以,Leader Partiton的LEO更新和Follower Partition的LEO更新,在时间上是有延迟的。这也导致了Leader Partition上更新HW值的时刻与Follower Partition上跟新HW值的时刻,是会出现延迟的。这样,如果有多个Follower Partition,这些Partition保存的HW的值是不统一的。当然,如果服务一切正常,最终Leader Partition还是会正常推进HW,能够保证HW的最终一致性。但是,当Leader Partition出现切换,所有的Follower Partition都按照自己的HW进行数据恢复,就会出现数据不一致的情况

image.png

​ 因此,Kafka还设计了Epoch机制,来保证HW的一致性。

  1. Epoch是一个单调递增的版本号,每当Leader Partition发生变更时,该版本号就会更新。所以,当有多个Epoch时,只有最新的Epoch才是有效的,而其他Epoch对应的Leader Partition就是过期的,无用的Leader。
  2. 每个Leader Partition在上任之初,都会新增一个新的Epoch记录。这个记录包含更新后端的epoch版本号,以及当前Leader Partition写入的第一个消息的偏移量。例如(1,100)。表示epoch版本号是1,当前Leader Partition写入的第一条消息是100. Broker会将这个epoch数据保存到内存中,并且会持久化到本地一个leader-epoch-checkpoint文件当中。
  3. 这个leader-epoch-checkpoint会在所有Follower Partition中同步。当Leader Partition有变更时,新的Leader Partition就会读取这个Epoch记录,更新后添加自己的Epoch记录。
  4. 接下来其他Follower Partition要更新数据时,就可以不再依靠自己记录的HW值判断拉取消息的起点。而可以根据这个最新的epoch条目来判断。

image.png

​ 这个关键的leader-epoch-checkpoint文件保存在Broker上每个partition对应的本地目录中。这是一个文本文件,可以直接查看。他的内容大概是这样样子的:

[oper@worker1 disTopic-0]$ cat leader-epoch-checkpoint 
0
1
29 2485991681

其中

第一行版本号

第二行表示下面的记录数。这两行数据没有太多的实际意义。

从第三行开始,可以看到两个数字。这两个数字就是epoch 和 offset。epoch就是表示leader的epoch版本。从0开始,当leader变更一次epoch就会+1。offset则对应该epoch版本的leader写入第一条消息的offset。可以理解为用户可以消费到的最早的消息offset。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1062402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单调队列---数据结构与算法

简介 队列也是一种受限制的线性表和栈相类似,栈是先进后出,而队列是先进先出,就好像一没有底的桶,往里面放东西,如图 在这里也是用数组来实现队列,用数组实现的叫做顺序队列 队列的数组模拟 const int N…

网络安全的发展方向是什么?网络安全学什么内容

前言 不少小伙伴开始学习网络安全技术,但却不知道学习网络安全能找什么工作?网络安全是现下较为火热的职业岗位,吸引了许多企业和个人对网络安全技术的青睐。学习网络安全的人越来越多,网络安全也有很多发展方向。那么如何选择网…

mysql日期月份相关函数

从给定日期提取最后一天: 要知道2017年12月的最后日期,可以按以下方式执行LAST_DAY()函数:用法:输出: 2017-12-31 从给定的日期时间中提取最后一天: 要使用日期时间格式了解月份的最后日期,可以按以下方式…

目标检测算法改进系列之Backbone替换为NextViT

NextViT介绍 由于复杂的注意力机制和模型设计,大多数现有的视觉Transformer(ViTs)在现实的工业部署场景中不能像卷积神经网络(CNNs)那样高效地执行,例如TensorRT 和 CoreML。这带来了一个明显的挑战&#…

Html+Css+Js计算时间差,返回相差的天/时/分/秒(从未来的一个日期时间到当前日期时间的差)。

Html部分 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title></title><link rel"stylesheet" type"text/css" href"css/index.css" /><script src"js/index.js" t…

库函数的模拟实现

前言&#xff1a; 在上一篇文章中我们了解到了一些库函数的使用&#xff0c;为了加深我们对库函数的理解&#xff0c;我们来模拟实现一下这些库函数的用法。 这是上一篇文章的链接&#xff1a; http://t.csdnimg.cn/r7SKNhttp://t.csdnimg.cn/r7SKN 1.模拟实现strlen 模拟…

【C++】运算符重载 ⑥ ( 一元运算符重载 | 后置运算符重载 | 前置运算符重载 与 后置运算符重载 的区别 | 后置运算符重载添加 int 占位参数 )

文章目录 一、后置运算符重载1、前置运算符重载 与 后置运算符重载 的区别2、后置运算符重载添加 int 占位参数 上 2 2 2 篇博客 【C】运算符重载 ④ ( 一元运算符重载 | 使用 全局函数 实现 前置 自增运算符重载 | 使用 全局函数 实现 前置 - - 自减运算符重载 )【C】运算符…

算法题:盛最多水的容器

这个题目乍一看就是双指针&#xff0c;没想到官方解答也是双指针&#xff0c;我在官方的基础上优化了一下下&#xff0c;左右两边各一个指针&#xff0c;每次移动短的那一头的时候&#xff0c;不是移动一格&#xff0c;而是找到比短的那一头要长一点的&#xff0c;再进行比较。…

文心一言 VS 讯飞星火 VS chatgpt (106)-- 算法导论10.1 4题

四、用go语言&#xff0c;重写 ENQUEUE 和 DEQUEUE 的代码&#xff0c;使之能处理队列的下溢和上溢。 文心一言&#xff1a; 在 Go 语言中&#xff0c;我们可以使用切片来实现队列。下面是如何实现 ENQUEUE 和 DEQUEUE 操作&#xff0c;同时处理队列的下溢和上溢的情况&#…

坦克世界WOT知识图谱三部曲之爬虫篇

文章目录 关于坦克世界1. 爬虫任务2. 获取坦克列表3. 获取坦克具体信息结束语 关于坦克世界 《坦克世界》(World of Tanks, WOT)是我在本科期间玩过的一款战争网游&#xff0c;由Wargaming公司研发。2010年10月30日在俄罗斯首发&#xff0c;2011年4月12日在北美和欧洲推出&…

目标检测算法改进系列之Backbone替换为RepViT

RepViT简介 轻量级模型研究一直是计算机视觉任务中的一个焦点&#xff0c;其目标是在降低计算成本的同时达到优秀的性能。轻量级模型与资源受限的移动设备尤其相关&#xff0c;使得视觉模型的边缘部署成为可能。在过去十年中&#xff0c;研究人员主要关注轻量级卷积神经网络&a…

整体网络架构p22

1. 两次卷积&#xff0c;一次池化。得到一个三维特征图&#xff0c;然后让三维的特征图&#xff0c;三个值进行相乘拉成特征向量&#xff0c;把得到的结果需要靠全连接层。 带参数计算才算一层 算conv的个数FC全连接层就得到卷积神经网络的层数 FC:全连接层 2. 3.reset网络&a…

连接查询-多表联合查

一、连接查询的分类 根据表的连接方式&#xff0c;连接查询分为内连接、外连接和全连接。 内连接&#xff1a; 等值连接非等值连接自连接外连接&#xff1a; 左外连接&#xff08;左连接&#xff09;右外连接&#xff08;右连接&#xff09; 全连接 二、笛卡尔积 交叉连接也…

番外--Task1:

""" 重置root管理员密码添加yum源测试软件包安装成功 """ step1: 在shell界面输入重启命令&#xff1b; step2: 重启过程中出现此界面&#xff0c;快速按键盘‘e’, 进入系统内核程序&#xff1b; step3: 在系统内核程序内&#xff0c…

【MATLAB源码-第42期】基于matlab的人民币面额识别系统(GUI)。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 基于 MATLAB 的人民币面额识别系统设计可以分为以下步骤&#xff1a; 1. 数据收集与预处理 数据收集&#xff1a; 收集不同面额的人民币照片&#xff0c;如 1 元、5 元、10 元、20 元、50 元和 100 元。确保在不同环境、不…

【算法学习】-【双指针】-【盛水最多的容器】

LeetCode原题链接&#xff1a;盛水最多的容器 下面是题目描述&#xff1a; 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。…

Net相关的各类开源项目

Net相关的各类开源项目 WPFHandyControlLive-ChartsWPFDeveloperswpf-uidesignStylet WebScheduleMasterYiShaAdminBlog.CoreNebula.AdminNewLife.CubeOpenAuth UnityuGUIUnityCsReferenceEpitomeMyUnityFrameWorkKSFrameworkTowerDefense-GameFramework-Demo 通用ClientServer…

目标检测算法改进系列之Backbone替换为InceptionNeXt

InceptionNeXt 受 Vision Transformer 长距离依赖关系建模能力的启发&#xff0c;最近一些视觉模型开始上大 Kernel 的 Depth-Wise 卷积&#xff0c;比如一篇出色的工作 ConvNeXt。虽然这种 Depth-Wise 的算子只消耗少量的 FLOPs&#xff0c;但由于高昂的内存访问成本 (memory…

机器学习必修课 - 编码分类变量 encoding categorical variables

1. 数据预处理和数据集分割 import pandas as pd from sklearn.model_selection import train_test_split导入所需的Python库 !git clone https://github.com/JeffereyWu/Housing-prices-data.git下载数据集 # Read the data X pd.read_csv(/content/Housing-prices-data/t…

CTFshow Web入门 文件上传

目录 web151 web152 web153 web154 web155 web156 web157 web158、web159 web160 web161 web162 web163 web164 web165 web166 web167 web168 web169 web170 web151 1. 写马改后缀为png上传&#xff0c;抓包修改文件信息 回显路径&#xff0c;蚁剑连接 2. …