4. Kafka Broker
4.1 kafka Broker工作流程
4.1.1 Zoopkeeper存储的Kafka信息
- 启动Zookeeper客户端
- 通过ls命令查看kafka相关信息
在Zookeeper的服务端存储的Kafka相关信息
- /kafka/brokers/ids [0,1,2] 记录那些服务器
- /kafka/brokers/topics/first/partitions/0/state {“leader”:1,“isr”:[1,0,2]} 记录谁是leader,有哪些服务器可用
- /kafka/controller {“brokerid”:0} 辅助选择Leader
可借助prettyZoo可视化工具查看具体信息:
4.1.2 Kafka Broker总体工作流程
4.2 生产经验-节点服役与退役
4.2.1 服务新节点
- 新节点准备
- 关闭一台旧服务器,执行克隆操作
- 开启新服务器的kafka,并i需改IP地址
- 在新服务器上修改主机名。
- 重启新旧服务器上的ksfka
- 修改新服务器kafka的broker.id
- 删除新服务器上的logs和datas文件
- 启动旧服务器上kafka集群
- 单独启动新服务器的kafka
- 执行负载均衡操作
- 创建一个要均衡的主题
- 生成一个负载均衡的计划
- 创建副本存储计划(所有副本存储在broker0,broker1,broker2,broker3中)。
- 执行副本计划
- 验证
4.2.2 退役旧节点
- 执行负载均衡操作:先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
- 创建一个要均衡的主题
- 创建执行计划
- 创建副本存储计划(所有副本存储在broker0,broker1,broker2中)。
- 关闭新的kafka服务
4.3 Kafka副本
4.3.1 副本基本信息
- Kafka副本作用:提高数据可靠性
- Kafka默认副本1个,生成环境一般配置2个,保证数据可靠性;多副本会增加磁盘空间,网络速率,降低效率
- Kafka副本分为:Leader和Follower。生产者只会把数据发往Leader,然后同步给Follower
- Kafka分区中所有副本统称AR(Assigned Repllicas)
AR = ISR + OSR
ISR:表示和leader保持同步的follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,由replicas.lag.time.max.ms设置,默认30s。Leader发送故障,就会从ISR中选举新Leader。
OSR:表示Follower与Leader副本同步时,延迟过多的副本。
4.3.2 Leader选举流程
Kafka集群中有一个broker的Controller会被选举未Controller Leader,负责管理集群broker的上下限,所有topic的分区副本分配和Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。
4.3.3 Leader和Follower故障处理细节
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset+1
HW(High Watermark):所有副本中最小的LEO
- Follower故障
- Follower故障发生后会被临时踢出ISR
- 这个期间LeaderheFollower继续接收数据
- 待该Follower恢复,Follower会读取本地磁盘记录上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
- 等该Follower的LEO>=该Partition的HW,即Follower追上Leader之后,就可重新加入ISR了。
- Leader故障
- Leader发生故障后,会从ISR中选出新的Leader
- 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或不重复。
4.3.4 分区副本配置
如果kafka服务器只有4个节点,那么设置kafka的分区数>服务器数,在kafka中如何分配?
- 创建16个分区3个副本:/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second --partitions 16 -replication -factor 3
- 查看分区和副本:bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second
4.3.5 生成经验-手动调整分区副本
在生产环境中,每台服务器配置和性能不一样,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器压力大。所以需要手动调整分区副本存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。
- 创建新的topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --parititions4 --replication-factor 2 --topic three- 查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three- 创建副本存储计划(所有副本都指定存储在broker0,broker1中)
vim increase-replication-factor.json
{
“version”:1
“partitions”:[{“topic”:“three”,“partition”:0,“replicas”:[0,1]},
{“topic”:“three”,“partition”:1,“replicas”:[0,1]},
{“topic”:“three”,“partition”:2,“replicas”:[1,0]},
{“topic”:“three”,“partition”:3,“replicas”:[1,0]},]
}- 执行副本存储计划
bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --execute- 验证副本存储计划
bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --verify- 查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three
4.3.6 生成经验-Leader Partition 负载均衡
如果有broker宕机,会导致LP集中在几台broker上,其他宕机broker重启后都是follower partition,读写请求低,造成集群负载不平衡。
- auto.Leader.rebalance.enable,默认true。 自动LeaderPartition平衡。
- leader.imbalance.per.broker.percentage,默认10%。每个broker允许的不平衡的leader的比率。如果每个broker超值,控制器会触发leader的平衡。
- leader.imbalance.check.interval.seconds,默认300s,检查leader负载是否平衡的间隔时间。
如下,假设集群只有一个主题:
Topic:xuyu partition:0 Leader:0 Replicas:3,0,2,1 Isr:3,0,2,1
Topic:xuyu partition:1 Leader:1 Replicas:1,2,3,0 Isr:1,2,3,0
Topic:xuyu partition:2 Leader:2 Replicas:0,3,1,2 Isr:0,3,1,2
Topic:xuyu partition:3 Leader:3 Replicas:2,1,0,3 Isr:2,1,0,3
针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是leader,所以不平衡数+1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡。
broker1的不平衡为0,不需要再平衡。
4.3.7 生成经验-增加副本因子
生产环境中,由于某个主题重要等级需要提升,需要增加副本,副本数需要先指定计划,然后根据计划执行。
- 创建topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic four - 手动增加副本存储
创建副本存储计划
vim increase-replication-factor.json
{“version”:1,“partitions”:[{“topic”:“four”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:2,“replicas”:[0,1,2]}]}
执行:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file-increase-replication-factor.json --execute
4.4 Kafka文件存储
4.4.1 文件存储机制
- Topic数据的存储机制
topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,该文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低,kafka采用分片和索引机制,将每个partition分为多个segment。每个segment包括:“index文件、log文件、timeindex文件等”。这些文件在一个文件下,该文件命名规则:topic名+分区序号。
- 思考:topic数据到底存储在什么位置?
- 启动生产者发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first - 查看localhost的/opt/module/kafka/datas/first路径上的文件,windows看log文件
- 直接查看log日志,是乱码
- 通过工具查看index和log信息
kafka-run-classs.sh kafka.tools.DumpLogSegments --files ./000000.index
- index文件和log文件详解
注意:
- index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
- index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
4.4.2 文件清楚策略
kafka中日志默认保存7天,可通过一下参数调整:log.retenion.hours(保存时间),log.retention.check.interval.ms(检测时间)。如果超时,kafka会提供delete和compact两种删除方式。
- delete:
- log.cleanup.policy=delete所有数据启用
基于时间:默认打开,以segment中所有记录中最大时间戳作为该文件时间戳。
基于大小:默认关闭,超过设置大小,删除最早的segment,log.retention.bytes。默认-1,表示无穷大。
- compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
- log.cleanup.policy=compact所有数据启用压缩策略。
压缩后offset不连续,需要拿到大于一个当前offset的对应消息进行消费。
注意:此种只适合特殊场景,比如key为id,val为值,通过压缩,整个消息集群种保存了所有用户最新的资料。
4.5 Kafka高效读写数据
- kafka本身是分布式集群,可采用分区技术,并行度高
- 读数据采用稀疏索引,可快读定位要消费的数据
- 顺序写磁盘:kafka的producer生产数据,要写入到log文件中,写的过程是追加到文件末端,为顺序写,
- 页缓存+零拷贝
零拷贝:kafka的数据加工处理操作交由kafka生产者和kafka消费者处理。kafkaBroker应用层不关心存储的数据,所以不走应用层,效率高。
PageCache页缓存:kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读时,从PageCacha中查找,找不到再去磁盘。
5. Kafka消费者
5.1 Kafka消费方式
pull拉模式(Kafka采用):consumer采用从broker中主动拉取数据,因为每个消费者处理能力不同。
push推模式:由于由broker决定消息频率,消费者难适应,处理不足。
pull缺点:如果没有数,消费者进入循环空数据。
5.2 Kafka消费者工作流程
5.2.1 消费者总体工作流程
5.2.2 消费者组原理
消费者组Consumer Group(GC):消费者组由多个consumer组成。组中消费者groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组初始化流程:
- coordinator:辅助实现消费者组的初始化和分区分配
coordinator节点选择=groupid的hashcode值%50(__consumer_offsets的分区数量)。eg:groupid的哈希为1,1%50=1,那么其主题1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有消费者提交offset的时候就往这个分区去提交offset。
消费者组详细消费流程:
5.3.1 独立消费者案例(订阅主题)
- 需求:创建独立消费者,消费first主题数据。
注意:在消费者API中必须配置消费者组id,命令行启动消费者不填写消费者组id会被自动填写随机消费者组id。 - 实现步骤
public class CustomConsumer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//2 定义消费主题first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3 消费数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
5.3.1 独立消费者案例(订阅分区)
需求:创建一个独立消费者,消费first主题0号分区的数据。
实现步骤:
//2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0)); //0号分区
kafkaConsumer.assign(topicPartitions);
5.3.1 消费者组案例
需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
不同消费者配置相同消费者组id即可,消费者底层会自动分区。
properties.put(ConsumerConfig.GROUP_ID_CONFIG,“test”);
5.4 生产经验-分区的分配以及再平衡
- 一个消费者组有消费者组成,一个topic有多个partition组成,到底由哪个消费者消费哪个partition的数据呢?
- kafka有4种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可配置partition.assignment.strategy修改。默认Range+CooperativeSticky。可多选。
5.4.1 Range以及再平衡
- 分区分配策略之range
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。通过partition数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么全面几个消费者将会多消费1个分区。
注意:如果只是针对一个topic而言,c0消费者多消费1个分区影响不大。但是如果有N个topic,那么每个topic,消费者C0都将多消费1个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区。 容易产生数据倾斜 - Range分区分配策略案例
- 修改主题first为7个分区
- bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic first --partitions 7
注意: 分区数只能增加,不能减少。 - 复制CustomConsumer类,创建CustomConsumer2,3。这样可由三个消费者CustomConsumer1,2,3组成消费者组,组名都为"test",同时启动三个消费者。
- 启动CustomProducer,发送500个消息。
consumer0消费到0,1,2分区的随数据,consumer1消费到3,4分区的数据,consumer2消费到5,6分区数据 - 关闭CustomConsumer,在心跳(45s)内再次发送数据
consumer1先消费3,4分区数据,consumer2消费到5,6分区数据,在心跳断开后consumer1继续消费0,1,2分区数据 - 关闭CustomConsumer心跳(45s)后,再次发送数据
consumer1先消费0,1,2,3,4分区数据,consumer2消费到5,6分区数据
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。
5.4.2 RoundRobin以及再平衡
- RoundRobin分区策略原理
RoundRobin针对集群种所有topic而言的。
RoundRobin轮询分区策略是把所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。
2.RoundRobin分区分配策略案例
- 同上,将CustomConsumer,1,2中消费者代码中修改策略为RoundRobin。
//properties.put(ConsumerConifg.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, “org.apache.kafka.clients.consumer.RoundRobinAssignor”); - 重启3个消费者,发消息
consumer0消费到0,3,6分区的随数据,consumer1消费到1,4分区的数据,consumer2消费到2,5分区数据 - 关闭CustomConsumer,在心跳(45s)内再次发送数据
consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据 - 关闭CustomConsumer心跳(45s)后,再次发送数据
consumer1先消费1,3,5分区数据,consumer2消费到0,2,4,6分区数据
5.4.3 Sticky以及再平衡
粘性分区定义:即在执行一次新分配之前,考虑上一次分配,尽量少的调整分配变动。
粘性分区从0.11.x引入,首先会尽量均衡的放置分区到消费者上面,在出现统一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变。
需求:设置first主题,7个分区,3个消费者,采用粘性分区消费,再停掉一个观察。
- 修改分区策略
ArrayList<String> startegys = new ArrayList<>();
startegys。add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
- 同上
consumer0消费到0,1分区的随数据,consumer1消费到4,5,6分区的数据,consumer2消费到2,3分区数据,但是重启后会改变。
关闭CustomConsumer,在心跳(45s)内再次发送数据
consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据
关闭CustomConsumer心跳(45s)后,再次发送数据
consumer1先消费1,0,4,6分区数据,consumer2消费到2,3,5分区数据
5.5 offset位移
5.5.1 offset的默认维护位置 **
0.9版本之后,consumer默认将offset保存在卡夫卡一个内置的topic中,该topic为__consumer_offsets
0.9之前,consumer默认将offset保存在Zookeeper
consumer_offsets主题采用k-v存储数据,key为group.id+topic+分区号,value为当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,即每个group.id+topic+分区号保留最新数据。
- 消费offset案例
- consumer_offsets为kafka的topic,那就可以通过消费者消费
- 在配置文件中config/consumer.properties中添加exclude.internal.topics=false,默认true,表示不能消费系统主题。为了查看,设为false。
- 创建新的topic:bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic offsetTest --partitions 2 --replcation-factor 2
- 启动生产者往offsetTest发数据。bin/kafka-console-producer.sh --topic offsetTest --bootstrap-server node1:9092
- 启动消费者消费offsetTest数据:bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic offsetTest --group test(指定消费者组名称,更好观察数据存储位置)
- 查看消费者消费主题consumer_offsets:bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --consumer.config config/consumer.properties --formatter “kafka.coordinatior.group.GroupMetadataManager$OffsetsMessageFromatter” --from-beginning
5.5.2 自动提交offset
- enable.auto.commit:是否开启自动提交offset功能,默认true
- auto.commit.interval.mx:自动提交offset的时间间隔。默认5s
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
5.5.3 手动提交offset
- commitSync(同步提交):必须等待offset提交完成,再去消费下一批数据
- commitAsync(异步提交):发送offset请求就开启下一批数据消费
相同点:都会将本次提交的一批数据最高的偏移量提交
不同点:同步阻塞,直到提交成功,有重试机制,异步没有失败重试,会提交失败。
//手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//手动提交offset
kafkaConsumer.commitSync();
kafkaConsumer.commitASync();
5.5.4 指定offset进行消费
auto.offset.reset=earliest | latest | none 默认latest
当kafka中没有初始偏移量(第一次)或不存在(被删除)时怎么办?
- earliest:自动将偏移量重置为最早的偏移量 --from-beginning
- latest:自动重置为最新值
- none:没找到先前偏移量,向消费者抛出异常
- 任意指定offset位置开始消费(注:每次执行完,要修改消费者组名)
//指定位置offset消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//保证分区分配方案已经指定完成
while(assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition, 100);
}
5.5.5 指定时间进行消费
需求:在生产环境中,会遇到消费数据异常,想重新按照时间消费。
//指定位置offset消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//保证分区分配方案已经指定完成
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//时间转offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<TopicPartition, Long>();
for (TopicPartition topicPartition : assignment) {
topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); //一天前
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
5.5.6 漏消费和重复消费
重复消费:已经消费了数据,但是offset没提交(consumer挂了,再次重启会从上次offset消费)
漏消费:先提交offset后消费(手动消费后,当offset被提交,数据还没落库,消费者线程被kill,导致内存数据丢失)
5.6 生产经验-消费者事务
消费者事务需要将kafka消费端将消费过程和提交offset过程做原子绑定。此时需要将kafka的offset保存到支持事务的自定义介质(mysql)
5.7 生产经验-数据积压(消费者提高吞吐量)
- 如果消费能力不足,则增加topic分区数,并且提升消费者组的消费者数量,消费者数=分区数。(两者缺一不可)
- 如果下游数据处理不及时:提高每批次拉取数量(500+)。批次拉取数据少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据(50m),也会积压。
回顾总结
1.zk存储的信息:broker.ids、leaders、controller
2.工作理财
3.服役:
4.退役
5.副本:
副本好处:提高可靠性,生产环境一般2个默认1个,有ledaer和follower
isr,ar,controller选举(第一次随机),leader挂了(leo,hw多删少补),follow挂了
副本分配:负载均衡,保证数据分配
手动副本分配:指定计划、执行计划,验证计划
leader partition的负载均衡 10%
手动增加副本因子
6.存储机制
broker topic partitions log segment 稀疏索引(4KB) 时间戳
7.删除数据
默认7天 删除策略(删除,压缩)
8.高效读写
集群 分区 (提高生产和消费同步,海量数据打散) 稀疏索引 顺序读写 零拷贝和页缓存(Linux内核的缓存)
9.消费者
消费流程 消费者组 分区分配策略(range、轮询、粘性) 再平衡(45s) 订阅主题(可多个)
10.offset
存储在系统主题 自动提交5s 手动提交(同异步) 指定offset消费 按照时间消费 漏,重复消费
11.事务
生产-集群-消费-下游(mysql)
12.数据积压
增加分区,消费者 生产-集群(4个参数) 消费者(2个参数)