zookeeper中存储的kafka信息
- /brokers/ids,记录存在的服务器id
- /brokers/topics/test/partitions/0/state,记录leader和可用副本服务器
- /comsumers,0.9版本之前存储消费者的offset信息,但是会产生zookeeper和broker的跨节点通信
- /controller 辅助选举leader。每个broker上都会有一个controller模块,controller在zookeeper注册信息,先注册者控制leader的选举
可以使用prettyZoo工具查看zookeeper信息
broker工作流程
- kafka节点启动后都会向zk注册,在
/brokers/ids
中增加注册节点 - 注册controller,先注册先生效
- 先注册的controller开始将领brokers节点的变化
- controller决定了leader的选举。规则为:在isr(in-sync replica set,保持同步的leader follower集合)中存活为前提,按照ar(所有副本)中排在前面的优先
- 选举完成后,contoller将信息上传到zk
- 其他controller节点从zk同步信息
- 生产者向broker发送信息到topic,follower主动和leader同步信息。在底层使用log的方式存储(逻辑),实际以segment(1G大小)形式存储,具体又体现为.log和.index文件
- 如果此时leader挂了,则controller从zk中获取信息并重新选举leader
- controller更新leader和isr信息
broker的参数
- replica.lag.time.max.ms,ISR 中如果 Follower 长时间未向 Leader 发送通 信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s。
- auto.leader.rebalance.enable,默认是 true。 自动 Leader Partition 平衡。
- leader.imbalance.per.broker.percentage,默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
- leader.imbalance.check.interval.seconds,默认值 300 秒。检查 leader 负载是否平衡的间隔时 间。
- log.segment.bytes,Kafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分 成块的大小,默认值 1G。
- log.index.interval.bytes,默认 4kb,kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引
- log.retention.hours,Kafka 中数据保存的时间,默认 7 天。
- log.retention.minutes,Kafka 中数据保存的时间,分钟级别,默认关闭。
- log.retention.ms,Kafka 中数据保存的时间,毫秒级别,默认关闭。
- log.retention.check.interval.ms,检查数据是否保存超时的间隔,默认是 5 分钟。
- log.retention.bytes,默认等于-1,表示无穷大。超过设置的所有日志总 大小,删除最早的 segment。
- log.cleanup.policy ,默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略。
- num.io.threads ,默认是 8。负责写磁盘的线程数。整个参数值要占 总核数的 50%。
- num.replica.fetchers ,副本拉取线程数,这个参数占总核数的 50%的 1/3 num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。
- log.flush.interval.messages ,强制页缓存刷写到磁盘的条数,默认是 long 的最 大值,9223372036854775807。一般不建议修改, 交给系统自己管理。
- log.flush.interval.ms, 每隔多久,刷数据到磁盘,默认是 null。一般不建 议修改,交给系统自己管理。
节点的服役和退役
手动查看日志过于繁琐,使用dozzle,确实好用
docker run --name dozzle -it --rm --volume=/var/run/docker.sock:/var/run/docker.sock:ro -p 8888:8080 amir20/dozzle:latest
新节点加入集群后,自动在zk中注册id
- 配置hostname和ip地址
- 配置kafka的配置文件brokerid
- 删除已经存在的kafka的data和log信息
先启动zk
docker run -d --name my-zookeeper -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes zookeeper:3.7
使用docker启动kafka节点
$ docker run -d --name my-kafka0 -p 9092:9092 --link my-zookeeper \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=my-zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`ifconfig eth0 | grep 'inet ' | awk '{print $2}'`:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes wurstmeister/kafka
$ docker run -d --name my-kafka1 -p 9093:9092 --link my-zookeeper \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=my-zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`ifconfig eth0 | grep 'inet ' | awk '{print $2}'`:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes wurstmeister/kafka
$ docker run -d --name my-kafka2 -p 9094:9092 --link my-zookeeper \
-e KAFKA_BROKER_ID=2 \
-e KAFKA_ZOOKEEPER_CONNECT=my-zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`ifconfig eth0 | grep 'inet ' | awk '{print $2}'`:9094 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes wurstmeister/kafka
创建test的topic,然后服役节点4,再次查看topic信息
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --create --partitions 3 --replication-factor 3
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test
Topic: test TopicId: D0vOlTsARxqmnkPfEt3vlA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 发送数据
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
查看zk信息
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 1] get /controller
{"version":1,"brokerid":0,"timestamp":"1688285831004"}
[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}
增加节点3,查看topic的分布
$ docker run -d --name my-kafka3 -p 9095:9092 --link my-zookeeper \
-e KAFKA_BROKER_ID=3 \
-e KAFKA_ZOOKEEPER_CONNECT=my-zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://`ifconfig eth0 | grep 'inet ' | awk '{print $2}'`:9095 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes wurstmeister/kafka
[zk: localhost:2181(CONNECTED) 14] ls /brokers/ids
[0, 1, 2, 3]
但是发现原来的数据并未存储在新结点上,需要reassign操作
$ vim topics-to-move.json
{
"topics": [
{"topic": "test"}
],
"version": 1
}
生成负载均衡计划
$ kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
开始拓展
$ vim increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
$ kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --execute
$ kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test-0 is complete.
Reassignment of partition test-1 is complete.
Reassignment of partition test-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic test
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test
Topic: test TopicId: D0vOlTsARxqmnkPfEt3vlA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 3,0,1 Isr: 1,0,3
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
Topic: test Partition: 2 Leader: 2 Replicas: 1,2,3 Isr: 2,1,3
退役旧节点是一样的思路,首先将节点上的数据迁出,然后下线即可
kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
replicas副本
基本概念
Kafka 副本作用:
- 提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会 增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。 AR = ISR + OSR
- ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader
- OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本
Leader选举流程
之前在解释broker的工作流程时已经提到了,需要注意选举的前提
- 启动节点后会主动在zk的ids中注册
- 节点先在controller中注册的为leader
- controller选择副本的leader,要求在isr中存货,ar中排在前面的优先
follower和leader故障处理
相关概念
- (副本级别)LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
- (分区级别)HW(High Watermark):所有副本中最小的LEO
follower故障处理
如果follower故障则:
- follower发生故障后会被临时踢出ISR
- 期间leader和其他follower继续接受数据
- 待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始 向Leader进行同步
- 等该Follower的LEO大于等于该Partition的HW,即 Follower追上进度之后,可以重新加入ISR
leader故障处理
-
Leader发生故障之后,会从ISR中选出一个新的Leader
-
为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据
-
无法保证数据丢失和重复,只能保证副本数据之间的一致性
生产调优策略
分区副本的分配
当分区数超过机器数,副本如何存储?
bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic second
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
分配原则,尽可能保证负载均衡
以上的副本分配方式的问题在于
- 生产环境中broker节点可能是异构的,因此配置和性能不一致。如果按照默认的副本分配策略进行放置,会导致数据倾斜,个别服务器的存储压力过大
解决方式为手动干预副本的放置
# 创建4个分区,每个分区2副本,全部放置在broker0和broker1上
vim increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}
]
}
执行副本存储计划
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file
increase-replication-factor.json --execute
验证副本存储计划
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file
increase-replication-factor.json --verify
leader分区的自动平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。leader分区负责生产者和消费者的请求
- 如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高
- 宕机的 broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡
和leader自动平衡的相关参数
-
auto.leader.rebalance.enable
,默认是true。 自动Leader Partition平衡 -
leader.imbalance.per.broker.percentage
, 默认是10%。每个broker允许的不平衡 的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡 -
leader.imbalance.check.interval.seconds
, 默认值300秒。检查leader负载是否平衡 的间隔时间
不平衡率的计算,1/ar
增加副本因子
无法通过命令行--alter
的方式直接增加副本数
vim increase-replication-factor.json
{
"version": 1,
"partitions": [
{
"topic": "four",
"partition": 0,
"replicas": [0, 1, 2]
},
{
"topic": "four",
"partition": 1,
"replicas": [0, 1, 2]
},
{
"topic": "four",
"partition": 2,
"replicas": [0, 1, 2]
}
]
}
执行副本计划
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file
increase-replication-factor.json --execute
文件存储
Topic是逻辑上的概念,而partition是物理上的概念
- 每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据
- Producer生产的数据会被不断追加到该log文件末端
- 为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:
- index,索引文件
- log,数据文件
- timeindex,时间戳索引文件
- 其他文件
- 这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0
index和log文件以当前segment的第一条消息的offset命名
时间索引文件用于记录文件过期时间
data存储于kafka的data路径下,查看index文件
$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 3 position: 152
查看log文件
$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping datas/first-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid:
true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid:
true
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid:
true
baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid:
true
baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid:
true
index文件和log文件
- log文件存储数据,index文件存储索引
- index为稀疏索引,每向log写入4kb数据记录一条索引。由参数
log.index.interval.bytes
控制,默认为4kb - index文件中保存的offset为相对offset,确保不会占用较大空间
日志存储的相关参数
log.segment.bytes
,默认值 1G,Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小log.index.interval.bytes
,默认4kb,每当写入了 4kb 大小的日志(.log)就往 index 文件里面记录一个稀疏索引
文件清理
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天
- log.retention.minutes,分钟
- log.retention.ms,最高优先级毫秒
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟
当日志保存时间小于x毫秒时,检查周期应当小于x毫秒
当日志超过时间限制,由两种日志清理策略 delete 和 compact
(1)delete 日志删除
log.cleanup.policy = delete
所有数据启用删除策略,删除过期数据
- 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳(如果log中有新数据,则旧数据即使超过时间限制也不会删除)
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。由参数log.retention.bytes控制,默认等于-1,表示无穷大
(2)compact日志压缩
对于相同key的不同value值,只保留最后一个版本
log.cleanup.policy = compact
所有数据启用压缩策略
压缩后的offset可能是不连续的,实际消费的offset可能变大。这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料
kafka高效读写的原因
-
Kafka 本身是分布式集群,可以采用分区技术,并行度高
-
读数据采用稀疏索引,可以快速定位要消费的数据
-
顺序写磁盘
-
页缓存 + 零拷贝技术
- PageCache页缓存,Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用
- Kafka Broker应用层不关心存储的数据,不进行数据处理只进行数据传输,直接从页缓存交付网卡
相关参数
log.flush.interval.messages
,强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。一般不建议修改,交给系统自己管理log.flush.interval.ms
每隔多久刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理