Kafka 位移

news2025/1/14 18:39:15

Consumer位移管理机制

Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。

消息格式

位移主题就是普通的Kafka主题。也是一个内部主题,但它的消息格式却是Kafka自己定义的KV对(Key和Value分别表示消息的键值和消息体),用户不能修改,Kafka Consumer有API去提交位移,也就是向位移主题写消息。不要自己写个Producer随意向该主题发送消息。

主题消息的Key中应该保存标识Consumer的字段,也就是Consumer GroupGroup ID,标识唯一的Consumer Group,因为Consumer提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key中还应该保存 Consumer要提交位移的分区

总结:位移主题的Key中应该保存3部分内容:<Group ID,主题名,分区号>

还有2种格式:

        1. 用于保存Consumer Group信息的消息,用来注册Consumer Group

        2. tombstone消息,即墓碑消息,也称delete mark:用于删除Group过期位移甚至是删除Group的消息。

位移主题的创建

当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。

分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50,因此Kafka会自动创建一个50分区的位移主题。Broker端另一个参数offsets.topic.replication.factor 控制副本数,默认为3。所以:如果位移主题是Kafka自动创建的,那么该主题的分区数是50,副本数是3。

提交位移(Committing Offsets)

Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。当Consumer发生故障重启之后,就能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍

从用户的角度来说,位移提交分为自动提交手动提交从Consumer端的角度来说,位移提交分为同步提交异步提交

Kafka Consumer提交位移的方式有两种:自动提交位移手动提交位移

手动提交位移

enable.auto.commit 如果值是false,则为手动提交,它能够把控位移提交的时机和频率可以使用Kafka Consumer API的consumer.commitSync等方法,当调用这些方法时,Kafka会向位移主题写入相应的消息。

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

调用consumer.commitSync()方法的时机,是在处理完了poll()方法返回的所有消息之后。如果过早提交了位移,就可能会出现消费数据丢失的情况。它还也有一个缺陷,就是在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,影响整个应用程序的TPS。

Kafka社区为手动提交位移提供了另一个API方法:KafkaConsumer#commitAsync() ,这是一个异步操作。调用commitAsync()之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于它是异步的,Kafka提供了回调函数(callback),在实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法:

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}

commitAsync是否能够替代commitSync呢?

        答案是不能。commitAsync的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过 期”或不是最新值了。因此,异步提交的重试其实没有意义,所以commitAsync是不会重试的。 

自动提交位移

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer 定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。但是没法把控Consumer端的位移管理。

 一旦设置了enable.auto.commit为true,Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer每5秒自动提交一次位移。现在,我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后,所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了,故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。 

自动提交位移问题:

自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。

假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移=100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢?对于同一个Key的两条消息M1M2,如果M1的发送时间早于 M2,那么M1就是过期消息。Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起

图中位移为0、2和3的消息的Key都是K1。Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。 

Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。这个后台线程叫LogCleaner

参考:Kafka 核心技术与实战 (geekbang.org)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1875188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows MSVC编译安装libcurl

$ git clone https://github.com/curl/curl.git $ cd curl/winbuild依照curl/winbuild/README.md的指示&#xff0c; 启动visual studio的命令行工具&#xff0c;这里要注意别选错. 如果要编译出x64版本的libcurl&#xff0c;就用x64的命令行工具&#xff1b;如果要编译出x86…

VSCode插件开发经验小结

从零基础接手DX扩展开发维护&#xff0c;到完成DX扩展从O2平台迁移到 VSCode 平台&#xff0c;现在也积累了一些经验&#xff0c;本文将对这一过程中的学习经历做一个简单小结&#xff0c;也希望可以通过本文帮助想要开发 VSCode 扩展的同学可以更快速的上手。 VSCode (Visual …

福昕阅读器再打开PDF文件时,总是单页显示,如何设置打开后就自动显示单页连续的模式呢

希望默认进入连续模式 设置方法 参考链接 如何设置使福昕阅读器每次启动时不是阅读模式 每次启动后都要退出阅读模式 麻烦_百度知道 (baidu.com)https://zhidao.baidu.com/question/346796551.html#:~:text%E5%9C%A8%E3%80%90%E5%B7%A5%E5%85%B7%E3%80%91%E9%87%8C%E6%9C%89%E…

使用ROS2的RCLCPP客户端库来实现话题通信

1.创建发布者目录文件 cd d2lros2/ mkdir -p chapt3/chapt3_ws/src cd chapt3/chapt3_ws/src 2.创建发布者节点 ros2 pkg create example_topic_rclcpp --build-type ament_cmake --dependencies rclcpp 3.新建发布者类 touch example_topic_rclcpp/src/topic_publisher_01.…

[CAN] Intel 格式与 Motorola 格式的区别

编码格式 数据传输规则一、Intel 格式编码二、Motorola 格式编码三、分析总结🙋 前言 CAN 总线信号的编码格式有两种定义:Intel 格式与 Motorola 格式。究竟两种编码格式有什么样的区别呢?设计者、dbc 文件编辑者或者测试人员又该如何判断两种格式,并进行有效正确的配置和解…

下载旧版本vscode及扩展,离线下载远程linux服务器插件

背景 工作的内网没有网络&#xff0c;无法使用网络来下载插件和vscode软件&#xff0c;且有远程linux服务器需求&#xff0c;linux服务器中lib相关库比较旧且无法更新&#xff0c;所以需要选择一个旧版本的vscode&#xff0c;相应插件也需要选择旧版本的 旧版本vscode下载 没…

SQL 29 计算用户的平均次日留存率题解

问题截图如下&#xff1a; SQL建表代码&#xff1a; drop table if exists user_profile; drop table if exists question_practice_detail; drop table if exists question_detail; CREATE TABLE user_profile ( id int NOT NULL, device_id int NOT NULL, gender varchar…

金融科技如何以细颗粒度服务提升用户体验与满意度

在金融科技迅速发展的当下&#xff0c;各种技术手段被广泛应用于提升用户体验与满意度。这些技术手段不仅提供了更为精准、个性化的服务&#xff0c;还通过优化操作流程、提升服务效率等方式&#xff0c;显著改善了用户的金融生活。以下将详细探讨金融科技如何运用这些技术手段…

短视频哪个软件好用?成都柏煜文化传媒有限公司

短视频哪个软件好用&#xff1f;一文带你了解各大平台特色 随着移动互联网的飞速发展&#xff0c;短视频已经成为现代人生活中不可或缺的一部分。市面上涌现出众多短视频平台&#xff0c;它们各具特色&#xff0c;满足了不同用户的需求。那么&#xff0c;短视频哪个软件好用呢…

Python学习笔记五

1.当循环执行完整后&#xff0c;就会执行else里面的代码 s0 i1 while i<100:sii1 else:print(s) 当循环不完整就会如下 s0 i1 while i<100:sii1if s6:break; else:print(s) 2. 实现密码匹配&#xff0c;可以输入三次&#xff0c;若输入三次错误会退出&#xff0c;或者输…

Linux高并发服务器开发(六)线程

文章目录 1. 前言2 线程相关操作3 线程的创建4 进程数据段共享和回收5 线程分离6 线程退出和取消7 线程属性&#xff08;了解&#xff09;8 资源竞争9 互斥锁9.1 同步与互斥9.2 互斥锁 10 死锁11 读写锁12 条件变量13 生产者消费者模型14 信号量15 哲学家就餐 1. 前言 进程是C…

vue3-openlayers 图标闪烁、icon闪烁、marker闪烁

本篇介绍一下使用vue3-openlayers 图标闪烁、icon闪烁、marker闪烁 1 需求 图标闪烁、icon闪烁、marker闪烁 2 分析 图标闪烁、icon闪烁、marker闪烁使用ol-animation-fade组件 3 实现 <template><ol-map:loadTilesWhileAnimating"true":loadTilesWh…

PyScript:在浏览器中释放Python的强大

PyScript&#xff1a;Python代码&#xff0c;直接在网页上运行。- 精选真开源&#xff0c;释放新价值。 概览 PyScript是一个创新的框架&#xff0c;它打破了传统编程环境的界限&#xff0c;允许开发者直接在浏览器中使用Python语言来创建丰富的网络应用。结合了HTML界面、Pyo…

美国总统对决影响比特币价格

刚刚&#xff0c;2024 年首场总统辩论之后&#xff0c;政治格局发生了翻天覆地的变化&#xff0c;数字货币市场也感受到了这种震动。这场辩论的时间安排史无前例&#xff0c;交锋激烈&#xff0c;在民主党内部引发了一系列猜测和战略。正如我们的 CNN 快报民意调查和摇摆州焦点…

STM32人体心电采集系统

资料下载地址&#xff1a;STM32人体心电采集系统 1、项目功能介绍 此项目主要实现了以STM32为核心的人体心电采集系统软硬件的设计。软件设计过程是在STM32上移植的uCGUI做图形界面&#xff0c;并如实显示采集到的心电波形信号&#xff0c;有SD卡存储和USB数据传输功能。 2、实…

1.SQL注入-数字型

SQL注入-数字型(post) 查询1的时候发现url后面的链接没有传入1的参数。验证为post请求方式&#xff0c;仅显示用户和邮箱 通过图中的显示的字段&#xff0c;我们可以猜测传入数据库里面的语句&#xff0c;例如&#xff1a; select 字段1,字段2 from 表名 where id1; 编辑一个…

RabbitMQ 的经典问题

文章目录 前言一、防止消息丢失1.1 ConfirmCallback/ReturnCallback1.2 持久化1.3 消费者确认消息 二、防止重复消费三、处理消息堆积四、有序消费消息五、实现延时队列六、小结推荐阅读 前言 当设计和运维消息队列系统时&#xff0c;如 RabbitMQ&#xff0c;有几个关键问题需…

“Hello, World!“ 历史由来

布莱恩W.克尼汉&#xff08;Brian W. Kernighan&#xff09;—— Unix 和 C 语言背后的巨人 布莱恩W.克尼汉在 1942 年出生在加拿大多伦多&#xff0c;他在普林斯顿大学取得了电气工程的博士学位&#xff0c;2000 年之后取得普林斯顿大学计算机科学的教授教职。 1973 年&#…

海南聚广众达电子商务咨询有限公司专业电商服务代名词

在数字化浪潮席卷全球的今天&#xff0c;电子商务行业日新月异&#xff0c;各大平台纷纷崭露头角。其中&#xff0c;抖音电商以其独特的短视频直播模式&#xff0c;迅速崛起成为电商领域的新星。而在这股浪潮中&#xff0c;海南聚广众达电子商务咨询有限公司凭借其专业的服务和…

华为实训案例

案例下载 案例内包含空拓扑图、配置完整的拓扑、以及步骤脚本文档&#xff0c;可按需下载。 拓扑图 任务清单 &#xff08;一&#xff09;基础配置 根据附录1拓扑图、附录2地址规划表、附录3设备编号表&#xff0c;配置设备接口及主机名信息。 将所有终端超时时间设置为永不…