一、Kafka消费者基本特性
消费者与消费者组的关系
消费者用一个消费者组名标记自己
一个发布在Topic上消息被分发给此消费者组中的一个消费者
- 假如所有的消费者都在一个组中,那么这就变成了队列模型,即这些消费者只有一个消费者会收到消息
- 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。每个消费者都会收到消息
上述特性总结为:Kafka的消息,只允许同一个消费者组里一个消费者消费。但是不同的消费者组之间是隔离的,互不影响的
消费者组是什么?
它是一个组,所以内部可以有多个消费者,这些消费者共用一个ID(叫做Group ID),一个组内的所有消费者共同协作,完成对订阅的topic的所有分区进行消费,其中一个topic中的一个分区只能由一个消费者消费
消费者组的特性
- 一个消费者组可以有多个消费者。
- Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
- 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费,消费者组之间不影响。
消费者分配
Kafka是如何保证一条消息在同一个组内只会被一个消费者消费的?
Kafka是基于分区来分配给组内的消费者的,也就是说,基于分区到消费者有一个映射,这个分区内的消息都会被这个消费者收到,而其他消费者没有映射关系就不会被收到了。
这是组里只有一个消费者的情况,那么所有分区的消息都会与这个消费者建立映射关系
这是组里有多个消费者的情况,但是消费者数量会小于分区数量,那么一个消费者会接收来自多个分区的消息
这是组里有多个消费者的情况,但是消费者数量大于分区数量,那么会有消费空闲,收不到分区的消息
所以最好消费者数量小于等于分区的数量,不然会导致有些消费者永远收不到消息
分区消息分配策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费,Kafka提供了3种消费者分区分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor。
RangeAssignor
对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。可以理解为平均分
计算规则
比如上述7个分区,3个comsumer,则7/3=2,余1,这个表明如果3个消费线程均分7个分区还会多出1个分区,那么这个多出的额外分区就会给前面的消费线程处理,所以它会把第一个分区先给到consumer-1消费线程消费
配置方式
prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");
RoundRobinAssignor
RoundRobinAssignor 采用轮询的方式分配分区。如果consuemrs订阅Topics都是相同的,那么partitions将会被均匀分配给每个consumer,最理想的状态是partitions数是consumers数的整数倍,这样每个consumer都有相同数量的partitions数。
计算规则
类似于斗地主发牌,第一个分区给了第一个消费者,第二个分区就给第二个消费者,一次进行下去
配置方式
prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
StickyAssignor
StickyAssignor 是 Kafka 2.4.0 版本引入的一种新的分区分配策略。它的目标是在重新分配时尽可能保持现有的分配不变,以减少重新分配带来的影响。
计算规则
StickyAssignor
会在重新分配时尽量保持现有的分区分配不变。- 如果需要重新分配,它会尽量将分区分配给已经在消费该分区的消费者,或者分配给负载较轻的消费者。
- 分配时,尽量使每个消费者的分区数量大致相等。
这里再举个例子
配置方式
prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
二、Kafka的消费安全问题
消费者线程安全问题
首先,kafka 的Java consumer是单线程的设计,准确来说是双线程,kafka新版本中KafkaConsumer变成了用户主线程和心跳线程的双线程设计
所谓用户主线程,就是你启动Consumer应用程序的main方法的那个线程,而心跳线程只负责定期发送心跳给对应的Broker,以标识消费者应用的存活性,引入心跳线程的目的还有一个:解耦真实的消息处理逻辑与消费者组成员存活性管理。
尽管多了一个心跳线程,但是实际的消息处理还是由主线程完成,所以我们还是可以认为KafkaConsumer是单线程设计的。
那为什么要采用单线程设计的思路呢?
- 新版本Consumer设计了单线程+轮询的机制,这种设计能够较好的实现非阻塞式的消息获取。(因为一旦是多线程,必然会发送阻塞等待,所以这样读取消息确保是非阻塞的)
- 单线程的设计能够简化Consumer端的设计,将处理消息的逻辑是否使用多线程的选择,由你来决定。(这里说的多线程是指,单线程依然是获取消息,这个消息要存下来,真正处理这个消息的handler可以提交给线程池去处理)
- 不论使用那种编程语言,单线程的设计都比较容易实现,并且,单线程设计的Consumer更容易移植到其它语言上。
死信队列和重试队列
重试队列
与此对应的还有一个“回退队列"的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障,实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。
死信队列
当一条消息初次消费失败,消息队列 MQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 MQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,这种正常情况下无法被消费的消息称为死信消息,存储死信消息的特殊队列称为死信队列。
消息丢失和消息重复
在消费者端,消费了消息要提交一个东西叫做offset,就是消息偏移量(也叫位移),代表我现在已经消费到哪个位置的消息了。如果我们对于位移提交控制不好可能出现消息丢失以及消息消息重复的情况
重复消费
这种情况发生在消费了数据但没有及时提交offset
比如开启了自动offset提交,consumer默认5s提交一次offset,提交offset 2s之后consumer挂了,此时已经消费了2s的消息,但是因为没有触发5s时间间隔没有告诉kafka已经消费信息,此时再启动consumer broker还是记录的5s自动提交之前的offset 此时会造成消息的重复消费
消息丢失
这种情况发生消息还没真正消费完,就提交offset了
如果将offset设置为手动提交,当offset被提交时,数据还在内存中未处理,刚好消费者宕机,offset已经提交,数据未处理,此时就算再启动consumer也消费不到之前的数据了,导致了数据漏消费
如果想要consumer精准一次消费,需要kafka消息的消费过程和提交offset变成原子操作,此时需要我们将kafka的offset持久化到其他支持事务的中间件(比如MySOL)
消息堆积
1、如果是kafka消费能力不足,考虑增加topic分区数,并且同时增加消费者组的消费者数量,因为一个partition只能被CG(消费者组)中的一个consumer消费,所以partition和consumer必须同时增加
2、如果是下游数据处理不及时,可以提高每次拉取的数量。因为批次拉取数据过少,会使得处理数据小于生产的数据
配置
fetch.max.bytes | 消费者获取服务器端一批消息最大的字节数,默认50M |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条 |
三、消费者offset(位移)管理
消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息,在kafka中,这个位置信息有个专门的术语:位移(offset)
位移类型
有两种位移
1、分区位移
生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表示,假设一个生产者向一个空分区写入了10 条消息,那么这 10 条消息的位移依次是 0、1、…、9;
2、消费位移
注意,这和上面所说的消息在分区上的位移完全不是一个概念,上面的“位移“表示的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了,而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器。
假设一个分区中有 10条消息,位移分别是0到9,某个 Consumer 应用已消费了5条消息,这就说明该 Consumer 消费了位移为0到4的5条消息,此时 Consumer 的位移是5,指向了下一条消息的位移。
至于为什么要有消费位移,很好理解,当Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍,就好像书签一样,需要书签你才可以快速找到你上次读书的位置。
位移信息存放在哪?
Kafka0.9之后kafka将offset维护在了系统topic __consumer_offsets 中,该主题有50个partition,采用K-V方式存储数据,key=groupId+topic+partition号,value即使当前的ofset值,每隔一段时间,kafka内部会对这个topic进行压缩compact操作,保留最新的offset。
位移提交方式
自动提交
Kafka 消费者在后台定期自动提交偏移量。所以有两个配置
enable.auto.commit
:设置为true
以启用自动提交,默认值为true
。auto.commit.interval.ms
:指定自动提交的间隔时间,默认值为 5000 毫秒(5 秒)。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true"); // 启用自动提交
props.put("auto.commit.interval.ms", "5000"); // 设置自动提交间隔时间为 5 秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
手动提交
消费者在处理完消息后显式地提交偏移量。需要enable.auto.commit设置为 false 以禁用自动提交。
消费者在处理完消息后,显式调用 commitSync() 或 commitAsync() 方法来提交偏移量。
commitSync() 是同步提交,会阻塞直到提交成功或抛出异常。
commitAsync() 是异步提交,不会阻塞,可以提供回调函数来处理提交结果。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 禁用自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交偏移量
consumer.commitSync();
}
} catch (CommitFailedException e) {
// 处理提交失败的情况
e.printStackTrace();
} finally {
consumer.close();
}
这个代码示例中,commitSync()是在consumer.poll()得到了大量的records之后,只要进行了xommit,那就是对这poll下来的所有records进行提交位移
考虑一种情况,如果在循环执行处理单条record中,发生了死循环或者出现了异常,但是之前的record又被处理过了,就会导致前面的这些record没被提交位移
所以有没有对单条record的commit呢?当然有,这种做法称为逐条提交
consumer.commitSync(
Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)//当前record已处理,所以提交的offset应该是下一条record,需要+1
)
);
四、分区再均衡
什么是分区再均衡?
一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息,当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取,在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡,再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为,在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用,另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
再均衡的过程
只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息,消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳,如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
总结:什么时候会触发分区再分配
- Topic中添加一个新的分区,消费者将重新分配
- 消费者关闭或者崩溃,消费者读取的分区将会分配给其他消费者
- 消费者群组中添加新的消费者,将分区重新分配
五、Kafka存储结构
Kafka存储数据,是以分区为单位的,每个分区都有自己的log文件夹,下面的文件会分段(segment)存储
为什么分区太多的时候,Kafka性能会下降?
Kafka是在硬盘上顺序存取数据的,但是分区太多,造成写数据会东一个分区西一个分区的找,演变成随机存取了,所以导致kafka性能下降