如何查看Kafka的偏移量offset

news2024/9/20 7:52:09

本文介绍三种方法查看Kafka的偏移量offset。

1. API:ConsumerRecord的offset()方法查看offset。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

前提条件

Kafka安装及基本操作,可参考:Kafka安装及基本操作

Kafka API操作,可参考:Kafka API操作

三种方法查看Kafka的偏移量offset

1. API:ConsumerRecord的offset()方法查看offset。

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class MyProducer {
    public static void main(String[] args) {
        // 1.创建kafka生产者对象
        Properties prop = new Properties();
        prop.put("bootstrap.servers","node1:9092");
        prop.put("acks","all");
        prop.put("retries","0");
        // 16k一个批量
        prop.put("batch.size", 16384);
        prop.put("linger.ms",5);
        prop.put("buffer.memory", 33554432);

        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<Object, Object> producer = new KafkaProducer<>(prop);


        // 2.使用send方法生产数据
        for (int i = 0; i < 10; i++) {
//            producer.send(new ProducerRecord<>("Hello-Kafka", Integer.toString(i), Integer.toString(i)));
            producer.send(new ProducerRecord<>("bigdata12", Integer.toString(i), Integer.toString(i)));
        }

        // 3.关闭生产者
        producer.close();

    }
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        //1.创建消费者对象
        Properties prop = new Properties();
        prop.put("bootstrap.servers","node1:9092");
        prop.put("group.id","test");
        prop.put("enable.auto.commit","true");
        prop.put("auto.commit.interval.ms","1000");
        prop.put("session.timeout.ms","30000");
        prop.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");//注意不是StringSerializer
        prop.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(prop);
        //2.消费者订阅主题
        consumer.subscribe(Arrays.asList("bigdata12"));// 将数组转为List集合

        //3.使用poll方法消费数据
        while (true){
//            ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofSeconds(5));
            ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

            for (ConsumerRecord<Object, Object> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s\n",
                        record.offset(),record.key(),record.value());
            }
        }

    }
}

测试:

IDEA中,运行消费者,再运行生产者。提示:没有topic,将自动创建。

返回IDEA的消费者控制台,输出类似如下数据

...

offset=30, key=8, value=8
offset=31, key=9, value=9

这里显示的是最后一条数据的offset=31。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class KafkaOffsetViewer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "bigdata12";
        TopicPartition partition = new TopicPartition(topic, 0);

        try {
            consumer.assign(Arrays.asList(partition));
            consumer.seekToEnd(Arrays.asList(partition));
            long offset = consumer.position(partition);
            System.out.println("Offset of partition 0 is: " + offset);
        } finally {
            consumer.close();
        }
    }
}

IDEA运行结果:

Offset of partition 0 is: 32

看到offset为32,是最新的offset值,也就是下一条数据从32开始。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

在命令行中运行以下命令:

kafka-consumer-groups --bootstrap-server <kafka-broker-list> --describe --group <consumer-group-id>

例如:

[hadoop@node1 ~]$ kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test
​
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
test            bigdata12       0          32              32              0               consumer-test-1-64d17e50-69e9-47e3-9380-f2441a09cae2 /117.189.125.24 consumer-test-1
​

看到offset为32,是最新的offset值。

感兴趣可以再使用生产者发送数据测试,看到三种查看offset方法,offset值的变化情况。

总结

1. API:ConsumerRecord的offset()方法查看offset,查看到最后一条数据的offset,最新offset=最后一条数据offset+1。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset,查到最新offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset,查到最新offset。

完成! enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1942424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenHarmony 入门——ArkUI 自定义组件之间的状态装饰器小结(一)

文章大纲 引言一、状态管理概述二、基本术语三、状态装饰器总览 引言 前面说了ArkTS 是在TypeScript基础上结合ArkUI框架扩展定制的&#xff0c;状态管理中的各种装饰器就是扩展的功能之一&#xff0c;可以让开发者通过声明式UI快速高效实现组件之间的数据同步&#xff0c;至于…

从PyTorch官方的一篇教程说开去(4 - Q-table来源及解决问题实例)

偷个懒&#xff0c;代码来自比很久之前看的书&#xff0c;当时还在用gym&#xff0c;我做了微调以升级到gymnasium当前版本&#xff0c;确保可以正常演示。如果小伙伴或者原作者看到了麻烦提一下&#xff0c;我好备注一下出处。 您的进步和反馈是我最大的动力&#xff0c;小伙…

Dav_笔记10:Using SQL Plan Management之1

SQL计划基准概述 SQL计划管理是一种预防机制&#xff0c;可以记录和评估SQL语句的执行计划。此机制可以构建SQL计划基准&#xff0c;这是一组SQL语句的已接受计划。已接受的计划已被证明表现良好。 SQL计划基准的目的 SQL计划基准的目标是保持相应SQL语句的性能&#xff0c;…

1-如何挑选Android编译服务器

前几天&#xff0c;我在我的星球发了一条动态&#xff1a;入手洋垃圾、重操老本行。没错&#xff0c;利用业余时间&#xff0c;我又重新捣鼓捣鼓代码了。在接下来一段时间&#xff0c;我会分享我从服务器的搭建到完成Android产品开发的整个过程。这些东西之前都是折腾过的&…

【JAVA】堆、栈的理解

JAVA中的堆和栈 堆和栈的简单描述栈堆 示例1示例2如何判断操作的是原始对象本身还是引用地址的变量&#xff08;个人理解&#xff0c;仅作为记录&#xff09; 引言 在Java中&#xff0c;内存管理是一个重要的概念&#xff0c;它涉及到堆&#xff08;Heap&#xff09;和栈&#…

CTFSHOW game-gyctf web2

【2020年新春战“疫”】game-gyctf web2 参考https://www.cnblogs.com/aninock/p/15408090.html 说明&#xff1a;看见网上好像没多少人写&#xff0c;刚好玩到这道题了&#xff0c;就写一下吧。 一、利用入口 常规套路发现www.zip然后进行代码审计 index可以包含update&…

05 HTTP Tomcat Servlet

文章目录 HTTP1、简介2、请求数据格式3、响应数据格式 Tomcat1、简介2、基本使用3、Maven创建Web项目4、IDEA使用Tomcat Servlet1、简介2、方法介绍3、体系结构4、urlPattern配置5、XML配置 HTTP 1、简介 HTTP概念 HyperText Transfer Protocol&#xff0c;超文本传输协议&am…

浏览器打开抽奖系统html

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>在线抽奖 随机选取 自动挑选</title> <script src"https://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script> <style> body {…

【LabVIEW作业篇 - 5】:水仙花数、数组与for循环的连接

文章目录 水仙花数数组与for循环的连接 水仙花数 水仙花数&#xff0c;是指一个3位数&#xff0c;它的每个位上的数字的3次幂之和等于它本身。如371 3^3 7^3 1^3&#xff0c;则371是一个水仙花数。 思路&#xff1a;水仙花数是一个三位数&#xff0c;通过使用for循环&#xf…

代码随想录——打家劫舍(Leetcode198)

题目链接 背包问题 class Solution {public int rob(int[] nums) {if(nums.length 0){return 0;}if(nums.length 1){return nums[0];}int[] dp new int[nums.length];dp[0] nums[0];dp[1] Math.max(nums[0], nums[1]);for(int i 2; i < nums.length; i){dp[i] Mat…

人工智能算法工程师(高级)课程5-图像生成项目之对抗生成模型与代码详解

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能算法工程师(高级)课程5-图像生成项目之对抗生成模型与代码详解。本文将介绍对抗生成模型&#xff08;GAN&#xff09;及其变体CGAN、DCGAN的数学原理&#xff0c;并通过PyTorch框架搭建完整可运行的代码&am…

Android 15 之如何快速适配 16K Page Size

在此之前&#xff0c;我们通过 《Android 15 上 16K Page Size 为什么是最坑》 介绍了&#xff1a; 什么是16K Page Size为什么它对于 Android 很坑如何测试 如果你还没了解&#xff0c;建议先去了解下前文&#xff0c;然后本篇主要是提供适配的思路&#xff0c;因为这类适配…

算法——滑动窗口(day7)

904.水果成篮 904. 水果成篮 - 力扣&#xff08;LeetCode&#xff09; 题目解析&#xff1a; 根据题意我们可以看出给了我们两个篮子说明我们在开始采摘到结束的过程中只能有两种水果的种类&#xff0c;又要求让我们返回收集水果的最大数目&#xff0c;这不难让我们联想到题目…

Java 面试相关问题(中)——并发编程相关问题

这里只会写Java相关的问题&#xff0c;包括Java基础问题、JVM问题、线程问题等。全文所使用图片&#xff0c;部分是自己画的&#xff0c;部分是自己百度的。如果发现雷同图片&#xff0c;联系作者&#xff0c;侵权立删。 1 基础问题1.1 什么是并发&#xff0c;什么是并行&#…

Python爬虫知识体系-----Urllib库的使用

数据科学、数据分析、人工智能必备知识汇总-----Python爬虫-----持续更新&#xff1a;https://blog.csdn.net/grd_java/article/details/140574349 文章目录 1. 基本使用2. 请求对象的定制3. 编解码1. get请求方式&#xff1a;urllib.parse.quote&#xff08;&#xff09;2. ur…

数驭未来,景联文科技构建高质大模型数据库

国内应用层面的需求推动AI产业的加速发展。根据IDC数据预测&#xff0c;预计2026年中国人工智能软件及应用市场规模会达到211亿美元。 数据、算法、算力是AI发展的驱动力&#xff0c;其中数据是AI发展的基石&#xff0c;中国的数据规模增长速度预期将领跑全球。 2024年《政府工…

【WAF剖析】10种XSS某狗waf绕过姿势,以及思路分析

原文&#xff1a;【WAF 剖析】10 种 XSS 绕过姿势&#xff0c;以及思路分析 xss基础教程参考&#xff1a;https://mp.weixin.qq.com/s/RJcOZuscU07BEPgK89LSrQ sql注入waf绕过文章参考&#xff1a; https://mp.weixin.qq.com/s/Dhtc-8I2lBp95cqSwr0YQw 复现 网站安全狗最新…

[数据集][目标检测]野猪检测数据集VOC+YOLO格式1000张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1000 标注数量(xml文件个数)&#xff1a;1000 标注数量(txt文件个数)&#xff1a;1000 标注…

如何查看jvm资源占用情况

如何设置jar的内存 java -XX:MetaspaceSize256M -XX:MaxMetaspaceSize256M -XX:AlwaysPreTouch -XX:ReservedCodeCacheSize128m -XX:InitialCodeCacheSize128m -Xss512k -Xmx2g -Xms2g -XX:UseG1GC -XX:G1HeapRegionSize4M -jar your-application.jar以上配置为堆内存4G jar项…

Web前端:HTML篇(二)元素属性

HTML 属性 属性是 HTML 元素提供的附加信息。 HTML 元素可以设置属性属性可以在元素中添加附加信息属性一般描述于开始标签属性总是以名称/值对的形式出现&#xff0c;比如&#xff1a;name"value"。 属性实例 HTML 链接由 <a> 标签定义。链接的地址在 href …