学习笔记
- 五、Kafka Broker
- 5.1、在zookeeper的服务端存储的Kafka相关信息
- 5.2、Kafka Broker 总体工作流程
- 5.3、Kafka Broker 节点服役和退役
- 5.3.1、节点服役
- 5.3.2、节点退役
- 5.4、Kafka Broker 副本
- 5.4.1、副本信息
- 5.4.3、Leader 选举流程
- 5.4.3、 Leader 和 Follower 故障处理细节
- 5.4.4、Leader Partition 负载平衡
- 5.4.5、增加副本因子
- 5.5、文件存储
- 5.6、文件清理策略
- 5.6.1、delete 日志删除
- 5.6.2、compact 日志压缩
- 5.7、高效读写数据
- 六、Kafka 消费者
- 6.1 Kafka 消费方式
- 6.2、Kafka 消费者工作流程
- 6.2.1、消费者总体工作流程
- 6.2.2、消费者组原理
- 6.2.2.1、消费者组
- 6.2.2.2、消费者组初始化流程
- 6.2.2.3、消费者组详细消费流程
- 6.3、Kafka 消费者 Api
- 6.3.1、独立消费者案例(订阅主题)
- 6.3.2、 独立消费者案例(订阅分区)
- 6.4、分区的分配以及再平衡
- 6.4.1、分配Range
- 6.4.2、RoundRobin 以及再平衡
- 6.4.3、Sticky 以及再平衡
- 6.5、 offset 位移
- 6.5.1 offset 的默认维护位置
- 6.5.2、自动提交 offset
- 6.5.3、手动提交 offset
- 6.5.4、指定 Offset 消费
- 6.5.5、指定时间消费
- 6.6、 数据积压
五、Kafka Broker
5.1、在zookeeper的服务端存储的Kafka相关信息
- /kafka/brokers/ids
[0,1,2] 记录有哪些服务器 - /kafka/brokers/topics/first/partitions/0/state
{“leader”:1 ,“isr”:[1,0,2] } 记录谁是Leader,有哪些服务器可用 - /kafka/controller
{“brokerid”:0} 辅助选举Leader
5.2、Kafka Broker 总体工作流程
5.3、Kafka Broker 节点服役和退役
5.3.1、节点服役
- 新增结点
192.168.3.37 broker.id 为 3 - 创建负载均衡计划
创建一个要均衡的主题topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}
- 执行负载均衡计划【–broker-list】
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.34:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
- 创建副本存储计划
increase-replication-factor.json
把负载均衡输出的Proposed partition reassignment configuration复制到这里
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.34:9092 --reassignment-json-file increase-replication-factor.json --execute
- 验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.34:9092 --reassignment-json-file increase-replication-factor.json --verify
5.3.2、节点退役
- 创建一个要均衡的主题(和服役的一样)
- 创建执行计划【broker-list】
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.34:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
- 创建并执行副本存储计划(和服役的一样)
5.4、Kafka Broker 副本
5.4.1、副本信息
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会
增加磁盘存储空间,增加网络上数据传输,降低效率。 - Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,
然后 Follower 找 Leader 进行同步数据。 - Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
AR = ISR + OS
ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
5.4.3、Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
5.4.3、 Leader 和 Follower 故障处理细节
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
- Follower故障
(1) Follower发生故障后会被临时踢出ISR
(2)这个期间Leader和Follower继续接收数据
(3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
(4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了 - Leader故障
(1) Leader发生故障之后,会从ISR中选出一个新的Leader
(2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
5.4.4、Leader Partition 负载平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
- auto.leader.rebalance.enable,默认是true。
自动Leader Partition 平衡 - leader.imbalance.per.broker.percentage,默认是10%。
每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 - leader.imbalance.check.interval.seconds,默认值300秒。
检查leader负载是否平衡的间隔时间。
5.4.5、增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
手动增加副本存储:
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
vim increase-replication-factor.json
- 输入如下内容:
{"version":1,
"partitions":[
{"topic":"four","partition":0,"replicas":[0,1,2]},
{"topic":"four","partition":1,"replicas":[0,1,2]},
{"topic":"four","partition":2,"replicas":[0,1,2]}
]
}
- 执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.34:9092 --reassignment-json-file increase-replication-factor.json --execute
5.5、文件存储
Topic是逻辑上的概念,而partition是物理上的概念
每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。
Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。
每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
.index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,
因此能将offset的值控制在固定大小
5.6、文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
5.6.1、delete 日志删除
将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment,log.retention.bytes,默认等于-1,表示无穷大
。
5.6.2、compact 日志压缩
compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
压缩后的offset可能是不连续的,比如合并后没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
集里就保存了所有用户最新的资料。
- log.cleanup.policy = compact 所有数据启用压缩策略
5.7、高效读写数据
- Kafka 本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘
- 页缓存 + 零拷贝技术
六、Kafka 消费者
6.1 Kafka 消费方式
Pull(拉)模 式:consumer采用从broker中主动拉取数据。Kafka采用这种方式
push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的
消费速率。
pull模式不足之处是,如 果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据
6.2、Kafka 消费者工作流程
6.2.1、消费者总体工作流程
一个消费者可以消费多个分区数据
每个分区的数据只能由消费者组中一个消费者消费
每个消费者的offset由消费者提交到Kafka系统主题保存
6.2.2、消费者组原理
6.2.2.1、消费者组
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
6.2.2.2、消费者组初始化流程
- coordinator:辅助实现消费者组的初始化和分区的分配。
coordinator节点选择 = groupid的hashcode值 % 50( 50是__consumer_offsets的默认分区数量)
6.2.2.3、消费者组详细消费流程
6.3、Kafka 消费者 Api
6.3.1、独立消费者案例(订阅主题)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ceshi");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
6.3.2、 独立消费者案例(订阅分区)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPartition {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(必须),名字可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
6.4、分区的分配以及再平衡
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range +CooperativeSticky。Kafka可以同时使用多个分区分配策略。
6.4.1、分配Range
- 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
- 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
- 注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。
- 容易产生数据倾斜
0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
6.4.2、RoundRobin 以及再平衡
RoundRobin 针对集群中所有Topic而言。
- RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
6.4.3、Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
Sticky类似于Range 但分区随机 例如C1消费1,2,4或者0,4,7 不一定就是顺序的0,1,2
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
0 号消费者挂掉后,0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
6.5、 offset 位移
6.5.1 offset 的默认维护位置
从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets
6.5.2、自动提交 offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
5s
自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
6.5.3、手动提交 offset
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 同步提交 offset
consumer.commitSync();
// 异步提交 offset
consumer.commitAsync();
6.5.4、指定 Offset 消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
//保证分区分配方案已经制定完华
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 100 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 100);
}
每次测试完,需要修改消费者组名;
6.5.5、指定时间消费
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition,
offsetAndTimestamp.offset());
}
}
6.6、 数据积压
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。