文章目录
- 1. Kafka 消费者相关概念
- 消费者和消费者组
- (1)横向伸缩消费者
- (2)横向伸缩消费者组
- 分区再平衡
- 再均衡的类型
- (1)主动再均衡
- (2)协作再均衡(增量再均衡)
- 分配分区的过程
- 群组固定成员
- 2. 创建 Kafka 消费者
- 3. 订阅主题
- 4. 轮询
- 5. 线程安全
- 6. 配置消费者
- (1)fetch.min.bytes
- (2)fetch.max.wait.ms
- (3)fetch.max.bytes
- (4)max.poll.records √
- (5)max.partition.fetch.bytes
- (6)session.timeout.ms 和 heartbeat.interval.ms √
- (7)max.poll.interval.ms √
- (8)default.api.timeout.ms
- (9)request.timeout.ms
- (10)auto.offset.reset √
- (11)enable.auto.commit √
- (12) partition.assignment.strategy √
- (13)client.id
- (14)client.rack
- (15)group.instance.id √
- (16)receive.buffer.bytes和send.buffer.bytes
- (17) offsets.retention.minutes √
- 7. 提交和偏移量
- (1)基于时间间隔自动提交
- (2)手动提交当前偏移量
- 同步提交
- 异步提交
- 同步和异步组合提交 √
- (3)提交特定的偏移量
- 8.再均衡监听器 √
- 9.从特定偏移量位置读取记录 √
- 10.退出 √
- 11.订阅主题:使用不属于任何群组的消费者
1. Kafka 消费者相关概念
消费者和消费者组
- 假设有一个应用程序,它从一个 Kafka 主题读取消息,在对消息做一些验证后再保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息、验证消息和保存结果。但过了一阵子,生产者向主题写入消息的速度超过了应用程序验证数据的速度,这时候该怎么办呢?如果只使用单个消费者来处理消息,那么应用程序会远远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题写入消息一样,也可以让多个消费者从同一个主题读取消息。
- Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者负责读取这个主题的部分消息。
(1)横向伸缩消费者
- 向群组里添加消费者是横向扩展数据处理能力的主要方式。Kafka 消费者经常需要执行一些高延迟的操作,比如把数据写到数据库或用数据做一些比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,因此可以增加更多的消费者来分担负载,让每个消费者只处理部分分区的消息,这是横向扩展消费者的主要方式。我们可以为主题创建大量的分区,当负载急剧增长时,可以加入更多的消费者。不要让消费者的数量超过主题分区的数量,因为多余的消费者只会被闲置。
- 只包含一个消费者的群组接收 4 个分区的消息:
- 包含两个消费者的群组接收 4 个分区的消息:
- 包含 4 个消费者的群组,每个消费者分配到一个分区:
- 消费者数量超过分区数量,有消费者空闲:
(2)横向伸缩消费者组
-
除了通过增加消费者数量来横向伸缩单个应用程序,我们还经常遇到多个应用程序从同一个主题读取数据的情况。实际上,Kafka 的一个主要设计目标是让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些应用场景中,我们希望每一个应用程序都能获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序都有自己的消费者群组就可以让它们获取到所有的消息。不同于传统的消息系统,横向伸缩消费者和消费者群组并不会导致 Kafka 性能下降。
-
新增一个消费者群组,每个群组都能收到所有消息:
分区再平衡
- 分区的所有权从一个消费者转移到另一个消费者的行为称为再均衡。再均衡为消费者群组带来了高可用性和伸缩性(你可以放心地添加或移除消费者)。不过,在正常情况下,我们并不希望发生再均衡。消费者群组里的消费者共享主题分区的所有权,以下情况会发生再均衡。
- 当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。
- 当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。
- 消费者正确关闭,调用 close() 方法。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。
- 消费者没有正确调用 close(),并且在一段时间内没有发送心跳,会话超时(session.timeout.ms 默认 10s),群组协调器认为它已经“死亡”,进而触发再均衡。
- 消费者死锁导致长时间等待超过 poll 设定的时间间隔(max.poll.interval.ms 默认 5 分钟),后台线程向 broker 发送“离开群组”的请求,让 broker 知道这个消费者已经“死亡”,必须进行群组再均衡,然后停止发送心跳。
- 主题发生变化会导致分区重分配。
- 管理员添加了新分区。
- 在调用 subscribe() 方法时传入一个正则表达式。如果有人创建了新主题,并且主题的名字与正则表达式匹配,那么就会立即触发一次再均衡。
再均衡的类型
- 根据消费者群组所使用的分区分配策略的不同,再均衡可以分为两种类型。
(1)主动再均衡
- 主动再均衡包含两个不同的阶段:
- 第一个阶段,所有消费者都会停止读取消息,放弃分区所有权。
- 第二个阶段,消费者重新加入消费者群组,并获得重新分配到的分区,并继续读取消息。
- 会导致整个消费者群组在一个很短的时间窗口内不可用。这个时间窗口的长短取决于消费者群组的大小和几个配置参数。
- 区间(range)、轮询(roundRobin)、黏性(sticky)策略使用的是主动再均衡。
(2)协作再均衡(增量再均衡)
- 将一个消费者的部分分区重新分配给另一个消费者,其他消费者则继续读取没有被重新分配的分区。包含两个或多个阶段。
- 第一个阶段,消费者群组首领会通知所有消费者,它们将失去部分分区的所有权,然后消费者会停止读取这些分区,并放弃对它们的所有权。
- 第二个阶段,消费者群组首领会将这些没有所有权的分区分配给其他消费者。
- 需要进行几次迭代,直到达到稳定状态,但它避免了主动再均衡中出现的“停止世界”停顿。这对大型消费者群组来说尤为重要,因为它们的再均衡可能需要很长时间。
- 协作黏性(cooperative sticky)策略支持协作再均衡。将 partition.assignment.strategy 设置为 org.apache.kafka.clients.consumer.CooperativeStickyAssignor。
分配分区的过程
- 当一个消费者想要加入消费者群组时,它会向被指定为群组协调器的 broker(不同消费者群组的协调器可能不同)发送 JoinGroup 请求。
- 第一个加入群组的消费者将成为群组首领。首领从群组协调器那里获取群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为还“活着”),并负责为每一个消费者分配分区。它使用实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者。
- 分区分配完毕之后,首领会把分区分配信息发送给群组协调器,群组协调器再把这些信息发送给所有的消费者。每个消费者只能看到自己的分配信息,只有首领会持有所有消费者及其分区所有权的信息。每次再均衡都会经历这个过程。
- 消费者会向群组协调器发送心跳,以此来保持群组成员关系和对分区的所有权关系。心跳是由消费者的一个后台线程发送的,只要消费者能够以正常的时间间隔发送心跳,它就会被认为还“活着”。如果消费者没有正确调用 close(),并且在一段时间内没有发送心跳,那么它的会话就将超时(session.timeout.ms 默认 10s),群组协调器会认为它已经“死亡”,进而触发再均衡。
- 如果一个消费者遇到了死锁导致长时间等待超过了 poll 设定的时间间隔(max.poll.interval.ms 默认 5 分钟),后台线程将向 broker 发送一个“离开群组”的请求,让 broker 知道这个消费者已经“死亡”,必须进行群组再均衡,然后停止发送心跳。
- 消费者正确关闭,调用 close() 方法。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。
群组固定成员
- 在默认情况下,消费者的群组成员身份标识是临时的。当一个消费者离开群组时,分配给它的分区所有权将被撤销;当该消费者重新加入时,将通过再均衡协议为其分配一个新的成员 ID 和新分区。
- 可以给消费者分配一个唯一的 group.instance.id,让它成为群组的固定成员。通常,当消费者第一次以固定成员身份加入群组时,群组协调器会按照分区分配策略给它分配一部分分区。当这个消费者被关闭时,它不会自动离开群组——它仍然是群组的成员,直到会话超时。当这个消费者重新加入群组时,它会继续持有之前的身份,并分配到之前所持有的分区。群组协调器缓存了每个成员的分区分配信息,只需要将缓存中的信息发送给重新加入的固定成员,不需要进行再均衡。
- 如果两个消费者使用相同的 group.instance.id 加入同一个群组,则第二个消费者会收到
错误,告诉它具有相同 ID 的消费者已存在。 - 如果应用程序需要维护与消费者分区所有权相关的本地状态或缓存,那么群组固定成员关系就非常有用。如果重建本地缓存非常耗时,那么你肯定不希望在每次重启消费者时都经历这个过程。更重要的是,在消费者重启时,消费者所拥有的分区不会被重新分配。在重启过程中,消费者不会读取这些分区,所以当消费者重启完毕时,读取进度会稍稍落后,但你要相信它们一定会赶上。
- 需注意的是,群组的固定成员在调用 close()关闭时不会主动离开群组,它们何时“真正消失”取决于 session.timeout.ms 参数。你可以将这个参数设置得足够大,避免在进行简单的应用程序重启时触发再均衡,但又要设置得足够小,以便在出现严重停机时自动重新分配分区,避免这些分区的读取进度出现较大的滞后。
2. 创建 Kafka 消费者
- 创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里。
- 第一个属性 bootstrap.servers 指定了连接 Kafka 集群的字符串。
- 另外两个属性 key.deserializer 和 value.deserializer 与生产者的 key.serializer 和 value.serializer 类似,只不过它们不是使用指定类把 Java 对象转成字节数组,而是把字节数组转成 Java 对象。
- 第 4 个属性 group.id 不是必需的,但会经常被用到。它指定了一个消费者属于哪一个消费者群组。也可以创建不属于任何一个群组的消费者,只是这种做法不太常见。
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
3. 订阅主题
- subscribe() 方法会接收一个主题列表作为参数。
- 使用 consumer.unsubscribe() 取消订阅主题。
consumer.subscribe(Collections.singletonList("customerCountries"));
- 也可以在调用 subscribe() 方法时传入一个正则表达式。正则表达式可以匹配多个主题,如果有人创建了新主题,并且主题的名字与正则表达式匹配,那么就会立即触发一次再均衡,然后消费者就可以读取新主题里的消息。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很有用。
- 如果 Kafka 集群包含了大量分区(30 000 个或更多),则需注意,主题的过滤是在客户端完成的。
- 当使用正则表达式而不是指定列表订阅主题时,消费者将定期向 broker 请求所有已订阅的主题及分区。然后,客户端会用这个列表来检查是否有新增的主题,如果有,就订阅它们。
- 如果主题很多,消费者也很多,那么通过正则表达式订阅主题就会给 broker、客户端和网络带来很大的开销。
- 在某些情况下,主题元数据使用的带宽会超过用于发送数据的带宽。另外,为了能够使用正则表达式订阅主题,需要授予客户端获取集群全部主题元数据的权限,即全面描述整个集群的权限。
consumer.subscribe(Pattern.compile("test.*"));
4. 轮询
- 消费者 API 最核心的东西是通过一个简单的轮询向服务器请求数据。
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList("customerCountries"));
try {
while (true) {
// 拉取数据
ConsumerRecords<String, String> records = consumer.poll(timeout);
// 拉取到的数据为空则跳过
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<String, String> record : records) {
try {
// 实际处理逻辑
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
} finally {
// 每处理完一条数据就记录一下偏移量
offset = record.offset() + 1;
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(offset, "no metadata"));
}
}
// 一批数据处理完后提交偏移量
consumer.commitAsync(currentOffsets, null);
}
} finally {
try {
consumer.commitSync(currentOffsets);
} fianlly {
// 确保关闭消费者
consumer.close();
}
}
- 这是一个无限循环。消费者是一个长时间运行的应用程序,通过持续轮询来向 Kafka 请求数据。
- 消费者必须持续对 Kafka 进行轮询,否则会被认为已经“死亡”,分区将被移交给群组里其它的消费者。传给 poll() 的参数是一个超时时间间隔,用于控制 poll() 的阻塞时间。如果这个参数被设置为 0 或者有可用的数据,那么 poll() 就会立即返回,否则它会等待指定的毫秒数(当消费者缓冲区里没有可用数据时)。
- poll() 方法会返回一个记录列表。列表中的每一条记录都包含了主题和分区的信息、
记录在分区里的偏移量,以及记录的键 – 值对。我们一般会遍历这个列表,逐条处理记
录。 - 在第一次调用消费者的 poll() 方法时,它需要找到 GroupCoordinator,加入群组,并接收分配给它的分区。如果触发了再均衡,则整个再均衡过程也会在轮询里进行,包括执行相关的回调。所以,消费者或回调里可能出现的错误最后都会转化成 poll() 方法抛出的异常。
- 如果超过 max.poll.interval.ms 没有调用 poll(),则消费者将被认为已经“死亡”,并被逐出消费者群组。因此,要避免在轮询循环中做任何可能导致不可预知的阻塞的操作。
5. 线程安全
- 按照规则,一个消费者使用一个线程。如果要在应用程序的同一个消费者群组里运行多个消费者,则需要让每个消费者运行在自己的线程中。最好是把消费者的逻辑封装在自己的对象里,然后用 Java 的 ExecutorService 启动多个线程,让每个消费者运行在自己的线程中。
- 在旧版本 Kafka 中,轮询方法的完整签名是 poll(long)。现在,这个签名被弃用了,新 API 的签名是 poll(Duration)。除了参数类型发生变化,方法体里的阻塞语义也发生了细微的改变。原来的方法会一直阻塞,直到从 Kafka 获取所需的元数据,即使阻塞时间比指定的超时时间还长。
6. 配置消费者
(1)fetch.min.bytes
- 消费者从服务器获取记录的最小字节数,默认是 1 字节。broker 在收到消费者的获取数据请求时,如果可用数据量小于 fetch.min.bytes 指定的大小,那么它就会等到有足够可用数据时才将数据返回。这样可以降低消费者和 broker 的负载,因为它们在主题流量不是很大的时候(或者一天里的低流量时段)不需要来来回回地传输消息。
- 如果消费者在没有太多可用数据时 CPU 使用率很高,或者在有很多消费者时为了降低 broker的负载,那么可以把这个属性的值设置得比默认值大。
- 但需要注意的是,在低吞吐量的情况下,加大这个值会增加延迟。
(2)fetch.max.wait.ms
- 让 Kafka 等到有足够多的数据时才将它们返回给消费者,feth.max.wait.ms 则用于指定 broker 等待的时间,默认是 500 毫秒。如果没有足够多的数据流入 Kafka,那么消费者获取数据的请求就得不到满足,最多会导致 500 毫秒的延迟。
- 如果要降低潜在的延迟(为了满足 SLA),那么可以把这个属性的值设置得小一些。
- 如果 fetch.max.wait.ms 被设置为 100 毫秒,fetch.min.bytes 被设置为 1 MB,那么 Kafka 在收到消费者的请求后,如果有 1 MB 数据,就将其返回,如果没有,就在 100 毫秒后返回,就看哪个条件先得到满足。
(3)fetch.max.bytes
- 指定了 Kafka 返回的数据的最大字节数(默认为 50 MB)。
- 消费者会将服务器返回的数据放在内存中,所以这个属性被用于限制消费者用来存放数据的内存大小。
- 需要注意的是,记录是分批发送给客户端的,如果 broker 要发送的批次超过了这个属性指定的大小,那么这个限制将被忽略。这样可以保证消费者能够继续处理消息。
- 值得注意的是,broker 端也有一个与之对应的配置属性,Kafka 管理员可以用它来限制最大获取数量。broker 端的这个配置属性可能很有用,因为请求的数据量越大,需要从磁盘读取的数据量就越大,通过网络发送数据的时间就越长,这可能会导致资源争用并增加 broker的负载。
(4)max.poll.records √
- 控制单次调用 poll() 方法返回的记录条数。可以用它来控制应用程序在进行每一次轮询循环时需要处理的记录条数。
(5)max.partition.fetch.bytes
- 指定了服务器从每个分区里返回给消费者的最大字节数(默认值是 1 MB)。当KafkaConsumer.poll() 方法返回 ConsumerRecords 时,从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。
- 使用这个属性来控制消费者的内存使用量会让事情变得复杂,因为你无法控制 broker 返回的响应里包含多少个分区的数据。因此,对于这种情况,建议用 fetch.max.bytes 替代,除非有特殊的需求,比如要求从每个分区读取差不多的数据量。
(6)session.timeout.ms 和 heartbeat.interval.ms √
- session.timeout.ms 指定了消费者可以在多长时间内不与服务器发生交互而仍然被认为还“活着”,默认是 10 秒。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,则会被认为已“死亡”,协调器就会触发再均衡,把分区分配给群组里的其他消费者。
- heartbeat.interval.ms 指定了消费者向协调器发送心跳的频率,session.timeout.ms 指定了消费者可以多久不发送心跳。因此,我们一般会同时设置这两个属性 heartbeat.interval.ms 必须比 session.timeout.ms 小,通常前者是后者的 1/3。
- 把 session.timeout.ms 设置得比默认值小,可以更快地检测到崩溃,并从崩溃中恢复,但也会导致不必要的再均衡。把 session.timeout.ms设置得比默认值大,可以减少意外的再均衡,但需要更长的时间才能检测到崩溃。
(7)max.poll.interval.ms √
- 指定了消费者在被认为已经“死亡”之前可以在多长时间内不发起轮询。心跳和会话超时是 Kafka 检测已“死亡”的消费者并撤销其分区的主要机制。心跳是通过后台线程发送的,而后台线程有可能在消费者主线程发生死锁的情况下继续发送心跳,但这个消费者并没有在读取分区里的数据。要想知道消费者是否还在处理消息,最简单的方法是检查它是否还在请求数据。但是,请求之间的时间间隔是很难预测的,它不仅取决于可用的数据量、消费者处理数据的方式,有时还取决于其他服务的延迟。在需要耗费时间来处理每个记录的应用程序中,可以通过 max.poll.records 来限制返回的数据量,从而限制应用程序在再次调用 poll() 之前的等待时长。但是,即使设置了max.poll.records,调用 poll() 的时间间隔仍然很难预测。于是,设置 max.poll.interval.ms 就成了一种保险措施。它必须被设置得足够大,让正常的消费者尽量不触及这个阈值,但又要足够小,避免有问题的消费者给应用程序造成严重影响。这个属性的默认值为 5 分钟。当这个阈值被触及时,后台线程将向 broker 发送一个“离开群组”的请求,让 broker 知道这个消费者已经“死亡”,必须进行群组再均衡,然后停止发送心跳。
(8)default.api.timeout.ms
- 如果在调用消费者 API 时没有显式地指定超时时间,那么消费者就会在调用其他 API 时使用这个属性指定的值。默认值是 1 分钟,因为它比请求超时时间的默认值大,所以可以将重试时间包含在内。poll() 方法是一个例外,因为它需要显式地指定超时时间。
(9)request.timeout.ms
- 消费者在收到 broker 响应之前可以等待的最长时间。如果 broker 在指定时间内没有做出响应,那么客户端就会关闭连接并尝试重连。它的默认值是 30 秒。不建议把它设置得比默认值小。在放弃请求之前要给 broker 留有足够长的时间来处理其他请求,因为向已经过载的 broker 发送请求几乎没有什么好处,况且断开并重连只会造成更大的开销。
(10)auto.offset.reset √
- 消费者在读取一个没有偏移量或偏移量无效(因消费者长时间不在线,偏移量对应的记录已经过期并被删除)的分区时该做何处理。
- 它的默认值是 latest,意思是说,如果没有有效的偏移量,那么消费者将从最新的记录(在消费者启动之后写入 Kafka的记录)开始读取。
- 另一个值是 earliest,意思是说,如果没有有效的偏移量,那么消费者将从起始位置开始读取记录。如果将 auto.offset.reset 设置为 none,并试图用一个无效的偏移量来读取记录,则消费者将抛出异常。
(11)enable.auto.commit √
- 消费者是否自动提交偏移量,默认值是 true。你可以把它设置为 false,选择自己控制何时提交偏移量,以尽量避免出现数据重复和丢失。如果它被设置为 true,那么还有另外一个属性 auto.commit.interval.ms 可以用来控制偏移量的提交频率。
(12) partition.assignment.strategy √
- PartitionAssignor 根据给定的消费者和它们订阅的主题来决定哪些分区应该被分配给哪个消费者。 kafka 消费者的消费策略以及再平衡。
- Range 分区策略
- Range 是对每个 topic 而言的。把每一个主题的若干个连续分区分配给消费者。
- 首先对同一个 topic 里面的分区按序号进行排序,并对消费者按字母进行排序 。假如有 7 个分区, 3 个消费者,分区为0 1 2 3 4 5 6,消费者为 C0 C1 C2。通过 partitions 数量 /consumer 数量来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。7 / 3 = 2 余 1 ,除不尽,那么消费者 C0 便会多消费 1 个分区。8 / 3 = 2 余 2 ,除不尽,那么 C0 和 C1 分别多 消费一个。
- 如果只是针对 1 个 topic 而言, C0 消费者多消费 1 个分区影响不是很大。但是如果有 N 多个 topic ,那么针对每 个 topic ,消费者 C0 都将多消费 1 个分区, topic 越多, C0 消 费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜!
- RoundRobin 分区策略
- RoundRobin 针对集群中 所有 Topic 而言 。 RoundRobin 轮询分区策略,是把 所有的 partition 和所有的 consumer 都列出来 ,然后 按照 hashcode 进行排序 ,最后 通过 轮询算法 来分配 partition 给到各个消费者。
- 粘性分区策略
- 设计黏性分区分配器的目的有两个:一是尽可能均衡地分配分区,二是在进行再均衡时多地保留原先的分区所有权关系,减少将分区从一个消费者转移给另一个消费者所带来的开销。如果所有消费者都订阅了相同的主题,那么黏性分配器初始的分配比例将与轮询分配器一样均衡。后续的重新分配将同样保持均衡,但减少了需要移动的分区的数量。如果同一个群组里的消费者订阅了不同的主题,那么黏性分配器的分配比例将比轮询分配器更加均衡。
- 协作黏性分区策略
- 这个分配策略与黏性分配器一样,只是它支持协作(增量式)再均衡,在进行再均衡时消费者可以继续从没有被重新分配的分区读取消息。
- Range 分区策略
- 默认值是 org.apache.kafka.clients.consumer.RangeAssignor。也可以改成 org.apache.kafka.clients.consumer.RoundRobinAssignor / StickyAssignor / CooperativeStickyAssignor。
- 使用自定义分配策略,需要把 partition.assignment.strategy 设置成自定义类的名字。
(13)client.id
- 可以是任意字符串,broker 用它来标识从客户端发送过来的请求,比如获取请求。通常被用在日志、指标和配额中。
(14)client.rack
- 所有想要发布消息的生产者必须连接到首领,但消费者可以从首领或者跟随者那里读取消息。
- 如果集群跨越了多个数据中心或多个云区域,那么让消费者从位于同一区域的副本那里获取消息就会具有性能和成本方面的优势。
- Kafka 可以将新创建的分区分配给部署在不同机架上的 broker(机架感知),确保单个分区的副本不会都位于同一个机架。要做到这一点,必须正确配置每个 broker 的 broker.rack 参数。
- 要从最近的副本获取消息,需要设置 client.rack 这个参数,用于标识客户端所在的区域。broker 的 replica.selector.class 改为org.apache.kafka.common.replica.RackAwareReplicaSelector。
(15)group.instance.id √
- 可以是任意具有唯一性的字符串,被用于消费者群组的固定名称。
(16)receive.buffer.bytes和send.buffer.bytes
- socket 在读写数据时用到的 TCP 缓冲区大小。如果它们被设置为 –1,就使用操作系统的默认值。如果生产者或消费者与 broker 位于不同的数据中心,则可以适当加大它们的值,因为跨数据中心网络的延迟一般都比较高,而带宽又比较低。
(17) offsets.retention.minutes √
- 只要消费者群组里有活跃的成员(也就是说,有成员通过发送心跳来保持其身份),群组提交的每一个分区的最后一个偏移量就会被 Kafka 保留下来,在进行重分配或重启之后就可以获取到这些偏移量。但是,如果一个消费者群组失去了所有成员,则 Kafka 只会按照这个属性指定的时间(默认为 7 天)保留偏移量。一旦偏移量被删除,即使消费者群组又“活”了过来,它也会像一个全新的群组一样,没有了过去的消费记忆。
7. 提交和偏移量
- 每次调用 poll() 方法,它总是会返回还没有被消费者读取过的记录,这意味着我们有办法来追踪哪些记录是被群组里的消费者读取过的。Kafka 不像其他 JMS 队列系统那样需要收到来自消费者的确认,相反,消费者可以用 Kafka 来追踪已读取的消息在分区中的位置(偏移量)。
- 我们把更新分区当前读取位置的操作叫作偏移量提交。与传统的消息队列不同,Kafka 不会提交每一条记录。消费者会将已成功处理的最后一条消息提交给 Kafka,并假定该消息之前的每一条消息都已成功处理。
- 那么消费者是如何提交偏移量的呢?消费者会向一个叫作 __consumer_offset 的主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。
- 消息的重复消费:最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么在发生再均衡之后,处于两个偏移量之间的消息会被重复消费。
- 消息的丢失:最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么在发生再均衡之后,处于两个偏移量之间的消息就会丢失。
(1)基于时间间隔自动提交
- 如果使用自动提交或不指定提交的偏移量,那么将默认提交 poll() 返回的最后一个位置之后的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。
- 最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为 true,那么每过 5 秒,消费者就会自动提交 poll() 返回的最大偏移量。提交时间间隔通过auto.commit.interval.ms 来设定,默认是 5 秒。与消费者中的其他处理过程一样,自动提交也是在轮询循环中进行的。消费者会在每次轮询时检查是否该提交偏移量了,如果是,就会提交最后一次轮询返回的偏移量。
- 假设使用默认的 5 秒提交时间间隔,并且消费者在最后一次提交偏移量之后 3 秒会发生崩溃。再均衡后,接管分区的消费者从最后一次提交的偏移量的位置开始读取消息。这个偏移量实际上落后了 3 秒,所以在这 3 秒内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。
- 在使用自动提交时,到了该提交偏移量的时候,轮询方法将提交上一次轮询返回的偏移量,但它并不知道具体哪些消息已经被处理过了,所以,在再次调用 poll() 之前,要确保上一次 poll() 返回的所有消息都已经处理完毕(调用 close() 方法也会自动提交偏移量)。通常情况下这不会有什么问题,但在处理异常或提前退出轮询循环时需要特别小心。
- 虽然自动提交很方便,但是没有为避免开发者重复处理消息留有余地。
(2)手动提交当前偏移量
同步提交
- 把 enable.auto.commit 设置为 false, 让 应 用 程 序 自 己 决 定 何 时 提 交 偏 移 量。 使 用commitSync() 提交偏移量是最简单可靠的方式。这个 API 会提交 poll() 返回的最新偏移量,提交成功后马上返回,如果由于某些原因提交失败就抛出异常。
- 需要注意的是,commitSync() 将会提交 poll() 返回的最新偏移量,所以,如果你在处理完所有记录之前就调用了 commitSync(),那么一旦应用程序发生崩溃,就会有丢失消息的风险(消息已被提交但未被处理)。如果应用程序在处理记录时发生崩溃,但 commitSync()还没有被调用,那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理。
- 只要没有发生不可恢复的错误,commitSync() 方法就会一直尝试直至提交成功。如果提交失败,就把异常记录到错误日志里。
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country= %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
异步提交
- 手动提交有一个缺点,在 broker 对请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
- 在提交成功或碰到无法恢复的错误之前,commitSync() 会一直重试,但 commitAsync() 不会。之所以不进行重试,是因为 commitAsync() 在收到服务器端的响应时,可能已经有一个更大的偏移量提交成功。假设我们发出一个提交偏移量 2000 的请求,这个时候出现了短暂的通信问题,服务器收不到请求,自然也不会做出响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果此时commitAsync() 重新尝试提交偏移量 2000,则有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会导致消息重复。
- commitAsync() 也支持回调,回调会在 broker 返回响应时执行。回调经常被用于记录偏移量提交错误或生成指标。
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country= %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
同步和异步组合提交 √
Duration timeout = Duration.ofMillis(100);
try {
while (!closing) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country= %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
consumer.commitSync(); // 如果直接关闭消费者,那么就没有所谓的“下一次提交”了。commitSync() 会一直重试,直到提交成功或发生无法恢复的错误。
} catch (Exception e) {
} finally {
consumer.close();
}
(3)提交特定的偏移量
- 如果 poll() 返回了一大批数据,那么为了避免可能因再均衡引起的重复消费,想要在批次处理过程中提交偏移量该怎么办?这个时候不能只是调用 commitSync() 或commitAsync(),因为它们只会提交消息批次里的最后一个偏移量。消费者 API 允许在调用 commitSync() 和 commitAsync() 时传给它们想要提交的分区和偏移量。因为一个消费者可能不止读取一个分区,你需要跟踪所有分区的偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
8.再均衡监听器 √
- 消费者会在退出和进行分区再均衡之前做一些清理工作。
- 如果知道消费者即将失去对一个分区的所有权,那么你就会马上提交最后一个已处理的记录的偏移量。可能还需要关闭文件句柄、数据库连接等。
- 在重新分配分区之后以及消费者开始读取消息之前,你可以准备或加载与分区相关的状态信息、找到正确的偏移量等。
- 消费者 API 提供了一些方法,让你可以在消费者分配到新分区或旧分区被移除时执行一些代码逻辑。所做的就是在调用 subscribe() 方法时传入 ConsumerRebalanceListener 对象。
- ConsumerRebalanceListener 有 3 个需要实现的方法。
- public void onPartitionsAssigned(Collection partitions):
- 方法会在重新分配分区之后以及消费者开始读取消息之前被调用。你可以在这个方法中准备或加载与分区相关的状态信息、找到正确的偏移量等。
- public void onPartitionsRevoked(Collection partitions):
- 方法会在消费者放弃对分区的所有权时调用——可能是因为发生了再均衡或者消费者正在被关闭。
- public void onPartitionsLost(Collection partitions):
- 只在使用了协作再均衡算法并且之前不是通过再均衡获得的分区被重新分配给其他消费者时调用(之前通过再均衡获得的分区被重新分配时会调用 onPartitions Revoked())。如果没有实现这个方法,onPartitionsRevoked() 将被调用。
- public void onPartitionsAssigned(Collection partitions):
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long lastOffset = 0;
long committedOffset = -1;
for (TopicPartition topicPartition : partitions) {
// 第一种:上次consumer提交的offset的值
// committedOffset = consumer.committed(topicPartition).offset();
// consumer.seek(topicPartition, committedOffset + 1);
// 第二种:下次consumer开始消费的offset的位置
lastOffset = consumer.position(topicPartition)
consumer.seek(topicPartition, lastOffset);
}
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("分区再均衡,提交当前偏移量:" + currentOffsets);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
}
consumer.subscribe(Collections.singletonList("customerCountries"), new HandleRebalance());
try {
while (true) {
// 拉取数据
ConsumerRecords<String, String> records = consumer.poll(timeout);
// 拉取到的数据为空则跳过
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<String, String> record : records) {
try {
// 实际处理逻辑
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
} finally {
// 每处理完一条数据就记录一下偏移量
offset = record.offset() + 1;
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(offset, "no metadata"));
}
}
// 一批数据处理完后提交偏移量
consumer.commitAsync(currentOffsets, null);
}
} finally {
try {
consumer.commitSync(currentOffsets);
} fianlly {
// 确保关闭消费者
consumer.close();
}
}
9.从特定偏移量位置读取记录 √
- 可以使用 poll() 从各个分区的最新偏移量位置读取消息,但有时候也需要从不同的偏移量位置读取消息。Kafka 提供了一些方法,可以让 poll() 从不同的位置读取消息。
- seekToBeginning(Collection tp):从分区的起始位置读取所有的消息
- seekToEnd(Collection tp):直接跳到分区的末尾读取新消息
- seek(TopicPartition tp,long offset):指定偏移量
- Kafka 还提供了用于查找特定偏移量的 API。
// 一小时前
Long oneHourEarlier = Instant.now().atZone(ZoneId.systemDefault()).minusHours(1).toEpochSecond();
// consumer.assignment(): 获取当前消费者topic、分区信息
// 构建一个 map,分区-时间
Map<TopicPartition, Long> partitionTimestampMap = consumer.assignment().stream().collect(Collectors.toMap(tp -> tp, tp -> oneHourEarlier));
// 通过时间戳查询对应的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(partitionTimestampMap);
// 重置每个分区的偏移量
for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry: offsetMap.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
10.退出 √
- 无须担心消费者在一个无限循环里轮询消息,因为我们可以让其优雅地退出。如果确定马上要关闭消费者(即使消费者还在等待一个 poll() 返回),那么可以在另一个线程中调用 consumer.wakeup()。如果轮询循环运行在主线程中,那么可以在 ShutdownHook 里调用这个方法。需要注意的是,consumer.wakeup() 是消费者唯一一个可以在其他线程中安全调用的方法。
- 调用 consumer.wakeup() 会导致 poll() 抛出 WakeupException,如果调用 consumer.wakeup() 时线程没有在轮询,那么异常将在下一次调用 poll() 时抛出。不一定要处理 WakeupException,但在退出线程之前必须调用 consumer.close()。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); ➊
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
11.订阅主题:使用不属于任何群组的消费者
- 你可能只需要用一个消费者读取一个主题所有的分区或某个分区。这个时候就不需要使用消费者群组和再均衡了,只需要把主题或分区分配给这个消费者,然后开始读取消息,并时不时地提交偏移量。(尽管为了提交偏移量仍然需要配置 group.id,但只要不调用 subscribe(),消费者就不会加入任何群组)
- 使用 consumer.partitionsFor(“topic”) 获取特定主题的分区情况。
- 使用 consumer.assign(partitions) 订阅指定的分区。
- 使用 consumer.assgin(new ArrayList<TopicPartition>()) 取消某个分区的订阅。
Duration timeout = Duration.ofMillis(100);
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}