应用程序使用KafkaConsumer向Kafka订阅主题,并从订阅的Topic上接收消息。
要想知道如何从Kafka读取消息,需要先了解消费者和消费者组的概念。
1、消费者和消费者组
原因:假设我们有一个应用程序需要从一个Kafka Topic中读取消息并验证,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅Topic 并开始接收消息,然后验证消息并保存结果。过了一阵,生产者往Topic写入消息的速度超过了应用程序验证数据的速度,这时候怎么办?如果只使用单个消费者处理消息,应用程序会远远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的分区写入消息一样,我们也可以使用多个消费者从一个主题读取消息,对消息进行分流。
Kafka消费者从属于消费者组,一个群组里的消费者订阅的时同一个主题,每个消费者接收主题一部分分区的消息。
假设主题T1 有4个Partition,我们创建了消费者C1,它是群组G1里唯一的消费者,我们用它订阅主题T1。消费者C1将收到主题T1全部四个分区的消息。
如果在群组G1里新增一个消费者C2,那么每个消费者将分别从两个分区接收消息。假设消费者C1接收分区0和2的消息,消费者C2接收分区1和3的消息。
如果群组G1有4个消费者,那么每个消费者可以分配到一个分区。
如果群组G1中,再添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会闲置,不会接收到任何消息。
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka消费者会经常做一些高延迟的操作,比如把数据写到数据库或者HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。
2、消费者群组和分区再均衡
我们上一节了解到,群组里的消费者共同读取主题的分区。一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或者发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用和伸缩性,我们可以放心的添加或者删除消费者。不过在正常的情况下,我们并不希望发生这样的行为。因为在再均衡期间,消费者无法读取消息,造成整个消费者组一小段时间不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的Broker(不同的群组可以有不同的协调器)发送心跳,来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器就会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,挂掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
心跳行为在最近版本中的变化
在0.10.1版本里,kafka社区引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样,发送心跳的频率(也就是消费者群组用于检测发生崩溃的消费者或不再发送心跳的消费者的时间)与消息轮询的频率(由处理消息所花费的时间来确定)之间就是相互独立的。
在新版本的Kafka中,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行轮询,这样可以避免出现活锁,比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配置与session.timeout.ms是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。
分配分区是怎样的一个过程
当消费者要加入群组时,它会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为"群主"。群主从协调器哪里获得群组的成员列表(列表里包含最近所有发过心跳的消费者),并负责给每一个消费者分配分区。它使用了一个实现了PartitionAssignor接口的类来决定那些分区应该被分配给那个消费者。
后面会介绍Kafka的分配策略。