1. zk中存储的kafka信息
/kafka/brokers/ids存储了在线的broker id。
/kafka/brokers/topics/xxx/partitions/n/state存储了Leader是谁以及isr队列
/kafka/controller辅助Leader选举,每个broker都有一个controller,谁先在zk中注册上,谁就辅助Leader选举。
2. broker总体工作流程
1)每台broker启动后在zk中注册,即/kafka/borkers/ids
2)每台broker去抢占式注册controller,用于后面Leader选举
3)由注册的controller监听/kafka/borkers/ids节点变化
4)开始Leader选举,选举标准是以isr中存活为前提,以AR中排在前面的优先(AR是所有副本的集合,启动时会有一个固定的AR顺序,比如ar[1, 0, 2])
5)controller将选举出来的信息(Leader和isr信息)传到zk中,即/kafka/brokers/topics/xxx/partitions/n/state
6)其他broker的controller会从zk中同步相关信息
Kafka生产者发送数据到broker,数据在底层以Log方式(逻辑概念)存储,实际上是Segment(物理概念),一般1个Segment是1G,包含.log文件和.index文件,.index文件是索引,用于快速查询数据
7)如果Leader挂了,controller监听到节点变化,选举新的Leader,选举标准依然是以isr中存活为前提,以AR中排在前面的优先,最后更新Leader和isr队列信息
3. 新节点服役
新节点服役后,以前的topic所在的分区不会出现在新节点,即新节点不会分摊旧节点的存储压力。如果需要新节点参与进来,就需要进行一种类似于负载均衡的配置。先创建一个topic-to-move.json配置文件:
{
"topics": [
{"topic": "first"}
],
"version": 1
}
生成一个负载均衡的计划:
bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
上面一行是当前的分区分配,下面一行是建议的分区分配计划,创建副本存储计划increase-replication-factor.json,里面内容是上面得分建议计划。最后执行存储计划:
bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
还可以验证计划:
bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
查询这个topic的分区详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
4. 退役旧节点
退役旧节点与服役新节点有一些类似,先创建一个topic-to-move.json配置文件,与服役新节点时一样,然后生成一个计划,只不过--broker-list 改为"0,1,2",接着执行计划,验证计划,都与服役新节点一样。
最后在退役节点关闭kafka服务
bin/kafka-server-stop.sh
5. Leader选举验证
创建四个分区四个副本的topic并查看:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu2 --partitions 4 --replications-factor 4
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu2
把3号broker停掉,那么isr队列中没有3,并且4号分区的Leader变为2
再把2号干掉
再恢复3号,发现Leader未变,仅isr队列信息中新增了3号
再恢复2号
再干掉1号
这样就验证了第二节讲的选举标准: 以isr中存活为前提,以AR中排在前面的优先
6. Leader和Follower故障处理细节
LEO:Log End Offset,每个副本的最后一个offset+1
HW:high watermark,高水位线,所有副本中最小的LEO,消费者能够看到的最大的offset就是HW - 1
1)如果Follower挂了,该Follower会立即被踢出isr,isr中其他Leader和Follower正常接受/同步数据,待该Follower恢复后,会读取上次的HW,将自己高于HW的数据丢弃,从HW开始与Leader同步,等到该Follower的LEO大于等于该Partition的HW,则重新加入isr队列。
2)如果Leader挂了, Leader会立即被踢出isr,并且会选出一个新的Leader,其余的Follower会将高于HW的数据丢弃,然后与新的Leader进行同步。此时只能保证数据的一致性,不能保证数据不丢失。
7. 手动调整分区副本
如果服务器的存储能力不同,希望将数据更多的存储在空间大的服务器上,那么就不应该按照Kafka分区副本的默认均匀分配,而是需要手动调整。创建4个分区,两个副本,都存在0号和1号broker上面。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic three --partitions 4 --replications-factor 2
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
创建increase-replication-factor.json:
{
"partitions": [
{"topic": "three", "partitions": 0, "replicas": [0, 1]},
{"topic": "three", "partitions": 1, "replicas": [0, 1]},
{"topic": "three", "partitions": 2, "replicas": [1, 0]},
{"topic": "three", "partitions": 3, "replicas": [1, 0]}
],
"version": 1
}
执行存储计划:
bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
最后查看
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
以上是减少副本,增加副本也是类似,先创建一个3个分区,1个副本的topic:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic four --partitions 3 --replications-factor 1
创建increase-replication-factor.json:
{
"partitions": [
{"topic": "four", "partitions": 0, "replicas": [0, 1, 2]},
{"topic": "four", "partitions": 1, "replicas": [0, 1, 2]},
{"topic": "four", "partitions": 2, "replicas": [0, 1, 2]}
],
"version": 1
}
执行计划:
bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
8. Leader Partition自动平衡
在Leader选举验证小节中,如果2号和3号节点都挂了,然后又恢复,则Leader过于集中在0号和1号节点,而Kafka生产者和消费者都是只对Leader操作,所以0号和1号的压力会很大,造成负载不均衡。 未解决该问题,Kafka会自动再平衡,auto.leader.rebalance.enable默认设为true。
什么时机会触发再平衡呢?一个参考指标是broker的不平衡率,leader.imbalance.per.broker.percentage,默认是10%,另一个指标是负载检查的间隔时间,leader.imbalance.check.interval.seconds,默认是300秒。
不平衡率的计算:
实际生产环境中,不一定需要开启再平衡,因为上述例子中其实已经相对平衡了,但是根据规则,需要触发再平衡,因此会需要消耗大量资源。
9. 文件存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件,该log文件存储的就是Kafka生产者的数据。生产的数据不断地追加到log文件中,为防止log文件过大导致检索数据慢,Kafka采取了分片和索引的机制:每个partition分为多个segment,每个segment包括.index文件(偏移量索引文件)、.log文件(日志文件)、.timeindex文件(时间戳索引文件)。这些文件位于一个文件夹中,文件夹命名规则:topic名称+分区号。index和log文件的命名是以当前segment的第一条数据的offset来命名。
log文件和index文件详解:
10. 文件清除策略
Kafka数据默认保存7天,7天后数据自动删除或者压缩。可通过如下参数修改保存时间(从上到下优先级依次增高):
log.retention.hours
log.retention.minutes
log.retention.ms
默认检查数据是否超期的间隔时间是5分钟,可通过参数log.retention.check.interval.ms进行修改。
如果是删除数据,log.cleanup.policy=delete,基于时间删除是默认打开的,以segment中最大的时间戳作为该文件的时间戳。而基于空间大小进行删除是默认关闭的(log.retention.bytes=-1),即数据超过阈值,删除最早的数据。
如果是压缩数据,log.cleanup.policy=compact,此时对于相同key的不同value值,只保留最新的。(与之前的snappy压缩概念不同)
注意,压缩后的offset可能不是连续的,比如上图没有 offset 6,如果从offset 6开始消费,则会从7开始消费。
11. 高效读写
1)Kafka本身是分布式集群,采用分区,并行度高
2)读数据采用稀疏索引,可以快读定位数据
3)顺序写磁盘,数据以追加的方式写到log文件,这比随机写的速度要快很多,因为省去了大量的磁头寻址时间
4)采用页缓存和零拷贝技术
零拷贝:Kafka的数据加工处理操作交由Kafka生产者和消费者处理。Broker应用层不关心存储的数据,因此就不用走应用层,传输效率高。(传统数据复制方式:从磁盘中读取文件到内核缓冲区,内核读取缓冲区数据复制到用户缓冲区,用户缓冲区的数据复制到socket缓冲区,socket缓冲区数据发送到网卡,再到消费者)
页缓存:Kafka重度依赖Linux提供的页缓存功能。当上层有写操作时,操作系统只是将数据写入页缓存。当读操作发生时,从页缓存中读,如果找不到,再从磁盘中读。页缓存是把尽可能多的空闲内存当做磁盘内存来用。