本文介绍三种方法查看Kafka的偏移量offset。
1. API:ConsumerRecord的offset()方法查看offset。
2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。
3. 命令行:kafka-consumer-groups.sh命令查看offset。
前提条件
Kafka安装及基本操作,可参考:Kafka安装及基本操作
Kafka API操作,可参考:Kafka API操作
三种方法查看Kafka的偏移量offset
1. API:ConsumerRecord的offset()方法查看offset。
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class MyProducer {
public static void main(String[] args) {
// 1.创建kafka生产者对象
Properties prop = new Properties();
prop.put("bootstrap.servers","node1:9092");
prop.put("acks","all");
prop.put("retries","0");
// 16k一个批量
prop.put("batch.size", 16384);
prop.put("linger.ms",5);
prop.put("buffer.memory", 33554432);
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Object, Object> producer = new KafkaProducer<>(prop);
// 2.使用send方法生产数据
for (int i = 0; i < 10; i++) {
// producer.send(new ProducerRecord<>("Hello-Kafka", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("bigdata12", Integer.toString(i), Integer.toString(i)));
}
// 3.关闭生产者
producer.close();
}
}
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
//1.创建消费者对象
Properties prop = new Properties();
prop.put("bootstrap.servers","node1:9092");
prop.put("group.id","test");
prop.put("enable.auto.commit","true");
prop.put("auto.commit.interval.ms","1000");
prop.put("session.timeout.ms","30000");
prop.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");//注意不是StringSerializer
prop.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(prop);
//2.消费者订阅主题
consumer.subscribe(Arrays.asList("bigdata12"));// 将数组转为List集合
//3.使用poll方法消费数据
while (true){
// ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofSeconds(5));
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<Object, Object> record : records) {
System.out.printf("offset=%d, key=%s, value=%s\n",
record.offset(),record.key(),record.value());
}
}
}
}
测试:
IDEA中,运行消费者,再运行生产者。提示:没有topic,将自动创建。
返回IDEA的消费者控制台,输出类似如下数据
...
offset=30, key=8, value=8
offset=31, key=9, value=9
这里显示的是最后一条数据的offset=31。
2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class KafkaOffsetViewer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "bigdata12";
TopicPartition partition = new TopicPartition(topic, 0);
try {
consumer.assign(Arrays.asList(partition));
consumer.seekToEnd(Arrays.asList(partition));
long offset = consumer.position(partition);
System.out.println("Offset of partition 0 is: " + offset);
} finally {
consumer.close();
}
}
}
IDEA运行结果:
Offset of partition 0 is: 32
看到offset为32,是最新的offset值,也就是下一条数据从32开始。
3. 命令行:kafka-consumer-groups.sh命令查看offset。
在命令行中运行以下命令:
kafka-consumer-groups --bootstrap-server <kafka-broker-list> --describe --group <consumer-group-id>
例如:
[hadoop@node1 ~]$ kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test bigdata12 0 32 32 0 consumer-test-1-64d17e50-69e9-47e3-9380-f2441a09cae2 /117.189.125.24 consumer-test-1
看到offset为32,是最新的offset值。
感兴趣可以再使用生产者发送数据测试,看到三种查看offset方法,offset值的变化情况。
总结
1. API:ConsumerRecord的offset()方法查看offset,查看到最后一条数据的offset,最新offset=最后一条数据offset+1。
2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset,查到最新offset。
3. 命令行:kafka-consumer-groups.sh命令查看offset,查到最新offset。
完成! enjoy it!