Kafka入门(六)

news2024/9/28 7:17:20

下面聊聊Kafka中的Offset位移

1、Offset位移概述

在引入Kafka服务后,当consumer消费完数据后需要进行位移提交,那么提交的位移数据究竟存储到那里,有以何种方式进行存储?
Kafka旧版本(<0.8)是重度依赖Zookeeper来实现各种各样的协调管理,在旧版本的consumer group是把位移保存在Zookeeper中,减少broker端状态存储开销,鉴于Zookeeper的存储架构设计来说,它不适合频繁写更新,而consumer group的位移提交又是频繁写操作,这样会拖慢Zookeeper集群的性能,于是在Kafka新版本中,社区重新设计了consumer group的位移管理方式,采用了将位移保存在Kafka内部,就出现了大名鼎鼎的_consumer_offsets。

1.1、_consumer_offsets

_consumer_offsets是用来保存Kafka consumer提交的位移信息,且它是由Kafka自动创建的,和普通的Topic相同,它的消息格式也是Kafka自己定义的,无法进行修改。

_consumer_offsets消息格式可以简单理解为一个K-V对,Key和Value分别表示消息的键值和消息体。_consumer_offsets存储consumer的位移信息,在Kafka中consumer数量很多,这时通过公共且唯一的Group ID来标识属于那个consumer,consumer提交位移是在分区的维度进行,这时通过分区号来标识要提交的位移分区。

总结,_consumer_offsets主题的Key应该包含3部分内容:<Group ID,主题名,分区号>,_consumer_offsets主题的Value可以简单认为存储的是offset值,当然还有其他一些元数据。

_consumer_offsets消息格式:
在这里插入图片描述

1.2、_consumer_offsets创建过程

当Kafka集群中的第一个consumer启动时,Kafka会自动创建_consumer_offsets。前面说过,它和普通Topic相同,它也有对应的分区数,若是由Kafka自动创建,这个依赖broker端参数offsets.topic.num.partitions(默认值为50),因此Kafka会自动创建一个有50个分区的_consumer_offsets。这就是在Kafka日志路径下看到很多_consumer_offsets-xxx这样目录的原因,既然有分区数,比如就会有对应的副本数,这个依赖broker端参数offsets.topic.replication.factor(默认值为3)。
总结,_consumer_offsets由Kafka自动创建,该Topic的分区数是50,副本数是3,而具体Group的消费情况要存储到那个Partition,根据abs(GroupId.hashCode())%NumPartitions来计算,这样就可以保证consumer offset信息与consumer group对应的coordinator处于同一个broker节点上。

2、数据测试

2.1、创建Topic

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test_1

2.2、生产数据

public class KafkaProducerTest {

    public static final String bootStrap = "localhost:9092";
    public static final String topic = "test_1";
    public static final String key = "test";

    public static void main(String[] args) {
        // 1、配置客户端参数
        Properties properties = new Properties();
        // 指定生产者客户端连接Kafka集群所需的broker地址列表,具体的内容格式为host1:port1,host2:port2
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrap);
        // key序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 配置重试次数,10次之后抛异常,可以在回调中处理
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 配置客户端id
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.1");

        // 2、构造KafkaProducer客户端实例
        KafkaProducer kafkaProducer = new KafkaProducer(properties);

        // 3、同一个key的消息放到同一个分区,不指定key则均衡分布,消息分区的选择是在客户端进行的
        for (int i = 0; i < 100; i++) {
            try {
                String message = "hello world " + i;
                ProducerRecord producerRecord = new ProducerRecord(topic, key, message);
                Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
                List<PartitionInfo> partitionsFor = kafkaProducer.partitionsFor(topic);
                for (PartitionInfo partitionInfo : partitionsFor) {
                    System.out.println(partitionInfo);
                }
                RecordMetadata recordMetadata = future.get();
                System.out.println(recordMetadata.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

2.3、消费者

public class KafkaConsumerTest {

    public static final String bootStrap = "localhost:9092";
    public static final String topic = "test_1";
    public static final String groupId = "group_1";

    public static void main(String[] args) {
        // 1、配置客户端参数
        Properties properties = new Properties();
        // 设置offset初始位置
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 指定消费者客户端连接Kafka集群所需的broker地址列表,具体的内容格式为host1:port1,host2:port2
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrap);
        // key序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 关闭自动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 自动提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // session会话响应时间,超过这个时间kafka可以选择放弃消费或消费下一条消息
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // 配置消费组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 配置客户端id
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.1");

        // 2、构造KafkaConsumer客户端实例
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);

        // 3、订阅主题
        kafkaConsumer.subscribe(Collections.singletonList(topic));

        // 4、消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
                for (ConsumerRecord<String, String> item : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", item.offset(), item.key(), item.value());
                }
                // 手动提交
                kafkaConsumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaConsumer.close();
        }
    }

}

2.4、查看Offset数据

在这里插入图片描述
从上面可以看出指定的key为"test"的消息被分配到test_1的3分区下,并且当前的消费的offset是100。

3、重新消费

在Kafka新版本中位移偏移量会保存在Kafka内部的Topic(_consumer_offsets)中,该topic默认有50个partition,每个partition有3个副本,通过abs(GroupId.hashCode())%NumPartitions来确定某个消费者组已消费的offset保存到那个分区中。
重新消费数据时的方式如下:

  • 修改偏移量offset
  • 通过consumer.subscribe()指定偏移量offset
  • 通过auto.offset.reset指定偏移量offset
  • 通过指定时间来消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/381795.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python学习】批量从含有多列数据的txt文件中提取某个数据,并存入csv文件

批量从含有多列数据的txt文件中提取某个数据&#xff0c;并存入csv文件任务需求与解读代码实现导入相关库提取txt文件的三列数据存为列表按条件提取某个数据存入字典将字典写入csv文件任务需求与解读 昨天收到一个需求&#xff0c;希望能将电化学工作站的数据文件(.bin后缀)转…

欧文数据建模师 erwin Data Modeler Crack

欧文数据建模师 erwin Data Modeler 是一款屡获殊荣的数据建模工具&#xff0c; 用于查找、可视化、设计、部署和标准化高质量的企业数据资产。从任何地方发现和记录任何数据&#xff0c;以在大规模数据集成、主数据管理、元数据管理、大数据、商业智能和分析计划中实现一致性、…

kubernetes--安全沙箱运行容器gVisor

gVisor介绍 所知&#xff0c;容器的应用程序可以直接访问Linux内核的系统调用&#xff0c;容器在安全隔离上还是比较弱&#xff0c;虽然内核在不断的增强自身的安全特性&#xff0c;但由于内核自身代码极端复杂&#xff0c;CVE漏洞层出不穷。 所以要想减少这方面安全风险&#…

MATLAB | 这些花里胡哨的热图怎么画

好早之前写过一个绘制相关系数矩阵的代码&#xff0c;但是会自动求相关系数&#xff0c;而且画出来的热图只能是方形&#xff0c;这里写一款允许nan值出现&#xff0c;任意形状的热图绘制代码&#xff0c;绘制效果如下&#xff1a; 如遇到bug请后台提出&#xff0c;并去gitee下…

Spring Boot+Vue前后端分离项目练习02之网盘项目利用token进行登陆验证

1.添加依赖 首先需要添加jwt对应的依赖。 <dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency>2.添加配置 JWT由三部分构成&#xff0c;分别是 header, pa…

详解数据结构中的顺序表的手动实现,顺序表功能接口【数据结构】

文章目录线性表顺序表接口实现尾插尾删头插头删指定位置插入指定位置删除练习线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列…

Freemarker动态模板渲染flyingsaucer将html转PDF(多页固定头尾)

目录一、序言二、CSS样式控制打印模板三、代码示例1、pom.xml2、application.yml3、PdfGenerationController4、Freemarker模板内容四、展示效果一、序言 一般正常来说&#xff0c;生成PDF的操作都是通过将HTML转成PDF&#xff0c;HTML动态渲染可以借助模板引擎&#xff0c;如…

从外行到外包,从手工测试到知名互联大厂测开,我经历了什么...

本人本科就读于某普通一本院校&#xff08;非985&#xff0c;211&#xff09;&#xff0c;经管类专业&#xff0c;从大四实习到15年毕业后前两年一直在从事自己专业相关的工作。17年时决定想要转业从事计算机相关领域工作&#xff0c;在17年9月的一个机遇大跨度转行到测试行业&…

vue子组件监听父组件数据变化并作出改变(亲测有效)

vue子组件监听父组件数据变化并作出改变&#xff08;亲测有效&#xff09; 1. 问题 1.1 封装组件时经常会遇到子组件需要根据父组件数据变化并执行对应的操作逻辑 1.2 监听方法中加了deep、immediate 等参数监听数组/对象还是没有生效 1.3 类型table组件需要根据父组件数据…

Java多线程学习——线程的创建、Thread类以及多线程状态

文章目录学习目标一、认识线程1、线程是什么&#xff1f;2、为什么要有线程3、进程和线程的区别二、Thread类以及常见方法1.创建线程的几种方式2、Thread类属性及方法2.1、Thread的常见构造方法2.2、Thread的常见属性3、线程的中断-interrupt()中断一个线程&#xff1a;4、等待…

前端面试题 —— 浏览器原理(一)

目录 一、进程与线程的概念 二、如何实现浏览器内多个标签页之间的通信? 三、浏览器资源缓存的位置有哪些&#xff1f; 四、对浏览器内核的理解 五、常见的浏览器内核比较 六、浏览器的主要组成部分 七、渲染过程中遇到 JS 文件如何处理&#xff1f; 八、什么情况会阻塞…

【C语言】动态内存管理

我们之前开辟的空间&#xff0c;大小固定&#xff0c;且在申明数组的时候&#xff0c;必须指定数组的长度。但是有时候我们需要的空间大小在程序运行的时候才知道&#xff0c;这就得动态内存开辟出马了。 目录 1.malloc和free 2.calloc 3.realloc 4.常见动态内存错误 5.经…

TCP 握手过程 三次 四次

蛋老师视频 SYN 同步 ACK 确认 FIN 结束 核心机制是确定哪些请求或响应需要丢弃 SYN、ACK、FIN 通过 1/0 设置开启/关闭 开启SYN后&#xff0c;报文中会随机生成 Sequence序号 用于校验 &#xff08;应用可能发起多个会话&#xff0c;可以区分&#xff09; 服务器的同步序…

2023版D盾防火墙v2.1.7.2,主动防御保护,以内外保护的方式 防止网站和服务器给入侵。限制了常见的入侵方法,让服务器更安全

v2.1.7.2 (20230107) 2023-1-7 1.修正PHP一处文件检测的bug。 2.修正某些情况下无法文件加白问题。 v2.1.7.2 2022-10-13 1.针对aspx的样本加入了新的识别。 2.针对上传 doc格式文件提示“上传格式不符” 的修正。 3.工具“HTTPS安全”,把 TLS 1.1 和 TLS 1.0 设置为默认不选中…

杰理AD16N简介

一、概述&#xff1a; AD16N是杰理新出的一个MP3解码芯片&#xff0c;是高集成度的 32 位通用音频 SOC&#xff0c; 集成 40KByte SRAM&#xff0c; 时钟源可选内部 RC 或外部12MHz 晶振&#xff0c; 最高主频可达 160MHz&#xff1b; 主要是替代AC109N系列和AC608N、AC104N系列…

Python爬虫书写时遇到的问题汇总

文章目录python的xpath插件需要的库下载出现问题懒加载python 爬取图片,网址都正确但是下不下来的原因:爬取下来的文字包含Windows不能识别的特殊字符selenium的find_element_by_id()出现的问题爬虫信息写入mysql时的1045号错误python的xpath插件需要的库下载出现问题 ERROR: C…

MySQL特殊语法insert into ... on duplicate key update ...

一、前言 在日常开发中&#xff0c;经常会遇到这样的需求&#xff1a;查看某条记录是否存在&#xff0c;不存在的话创建一条新记录&#xff0c;存在的话更新某些字段。 比如下列伪代码&#xff1a; $row mysql_query($result);if($row){mysql_execute(update ...);}else{my…

MongoDB复习

目录 1.docker安装 2.mondo概念解析 3.数据库操作 4.基本数据类型 5. 适合使用场景 6.对集合操作 7.常用操作 1.docker安装 docker pull mongo:latest docker run -d --restartalways -p 27017:27017 --name mymongo -v /data/db:/data/db -d mongo docker exec -it m…

【SpringBoot高级篇】SpringBoot集成Sharding-JDBC分库分表

【SpringBoot高级篇】SpringBoot集成Sharding-JDBC分库分表Apache ShardingSphere分库分表分库分表的方式垂直切分垂直分表垂直分库水平切分水平分库水平分表分库分表带来的问题分库分表中间件Sharding-JDBCsharding-jdbc实现水平分表sharding-jdbc实现水平分库sharding-jdbc实…

数据结构-考研难点代码突破(查找算法 - 散列表(哈希表)C++实现除留余数法拉链法哈希)

文章目录1. 哈希表与解决哈希冲突的方法2. C实现除留余数法拉链法哈希1. 哈希表与解决哈希冲突的方法 散列表(Hash Table)&#xff0c;又称哈希表。是一种数据结构。 特点&#xff1a;数据元素的关键字与其存储地址直接相关。 关键字通过散列函数&#xff08;哈希函数&#…