在使用 Kafka 进行消息处理时,消费者的位置提交是一个非常重要的环节。它决定了消费者在下次启动时从哪里开始读取消息。今天,我们就来深入探讨一下 Kafka 消费者位置提交方式有哪些,以及在什么场景下使用。
一、Kafka 消费者位置提交的重要性
在 Kafka 中,消费者会不断地从主题(Topic)的分区(Partition)中读取消息。为了保证在消费者崩溃或重新启动后能够继续从上次停止的位置读取消息,消费者需要定期提交自己的位置信息。如果不进行位置提交,消费者在重新启动后可能会从头开始读取消息,导致重复处理已经处理过的消息,或者错过一些新的消息。
二、Kafka 消费者位置提交方式
-
自动提交
- Kafka 消费者可以配置为自动提交位置信息。当消费者拉取一批消息后,经过一定的时间间隔或者消息数量达到一定阈值时,消费者会自动提交当前的位置信息。
- 自动提交的优点是简单方便,不需要开发者手动干预。但是,它也存在一些缺点。例如,如果在自动提交之前消费者崩溃了,那么可能会导致一些消息被重复处理。
-
手动提交
- 手动提交位置信息需要开发者在代码中显式地调用提交方法。手动提交可以分为同步提交和异步提交两种方式。
- 同步提交:消费者会等待提交操作完成后才继续处理下一批消息。这种方式可以确保位置信息被正确提交,但是可能会影响消费者的性能,特别是在提交操作比较耗时的情况下。
- 异步提交:消费者会在后台异步地提交位置信息,不会阻塞当前的消息处理。这种方式可以提高消费者的性能,但是如果在提交操作完成之前消费者崩溃了,那么可能会导致位置信息丢失。
三、不同提交方式的适用场景
-
自动提交
- 适用于对消息处理的准确性要求不高,但是对性能要求较高的场景。例如,一些实时数据分析系统,可能更关注处理的速度,而对消息的重复处理不太敏感。
- 自动提交也适用于一些简单的应用场景,开发者不想花费太多时间在位置提交的管理上。
-
手动提交(同步)
- 适用于对消息处理的准确性要求非常高的场景。例如,在金融交易系统中,每一笔交易都必须被准确处理,不能出现重复处理或漏处理的情况。
- 当消费者需要在提交位置信息之前进行一些额外的处理,如数据验证、事务处理等,同步提交可以确保这些处理完成后再提交位置信息。
-
手动提交(异步)
- 适用于对性能要求较高,同时又希望在一定程度上保证消息处理的准确性的场景。例如,一些高并发的 Web 应用,需要快速处理大量的用户请求,同时又要确保消息不会被重复处理。
- 异步提交可以在不影响消息处理性能的情况下,尽可能地保证位置信息的正确提交。
四、Java 代码示例
自动提交示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class AutoCommitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
手动同步提交示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class ManualSyncCommitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交
Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
for (TopicPartition partition : consumer.assignment()) {
long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)));
}
}
} finally {
consumer.close();
}
}
}
手动异步提交示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class ManualAsyncCommitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 异步提交
Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
for (TopicPartition partition : consumer.assignment()) {
long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)), (offsets1, exception) -> {
if (exception!= null) {
System.err.println("Error committing offsets: " + exception.getMessage());
}
});
}
}
} finally {
consumer.close();
}
}
}
五、总结
Kafka 消费者的位置提交方式有自动提交和手动提交两种,手动提交又分为同步提交和异步提交。不同的提交方式适用于不同的场景,开发者需要根据实际需求选择合适的提交方式。在选择提交方式时,需要考虑消息处理的准确性、性能要求以及应用场景的特点等因素。
文章(专栏)将持续更新,欢迎关注公众号:服务端技术精选。欢迎点赞、关注、转发。
个人小工具程序上线啦,通过公众号(服务端技术精选)菜单【个人工具】即可体验,欢迎大家体验后提出优化意见!500 个访问欢迎大家踊跃体验哦~