分布式 - 消息队列Kafka:Kafka 消费者的消费位移

news2025/1/10 3:16:25

文章目录

      • 01. Kafka 分区位移
      • 02. Kafka 消费位移
      • 03. kafka 消费位移的作用
      • 04. Kafka 消费位移的提交
      • 05. kafka 消费位移的存储位置
      • 06. Kafka 消费位移与消费者提交的位移
      • 07. kafka 消费位移的提交时机
      • 08. Kafka 维护消费状态跟踪的方法

01. Kafka 分区位移

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。

每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9。

02. Kafka 消费位移

对于kafka中的消费者而言,也有一个offset的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

消费位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。消费者会定期地将已经消费的消息的位移提交到Kafka集群中,以便在下一次启动时从上次消费的位置继续消费。

每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和分区位移完全不是一个概念。“分区位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器嘛。另外每个消费者有着自己的消费者位移。

03. kafka 消费位移的作用

消费者位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。它的作用主要有以下几个方面:

① 消费者可以通过记录偏移量来实现断点续传。当消费者下线或者重启时,它可以通过记录的偏移量来恢复之前的消费状态,从而避免重复消费已经处理过的消息。

② Kafka 通过偏移量来保证消息的顺序性。在同一个分区中,消息的顺序是有序的,消费者可以通过记录偏移量来保证消费的顺序性。

③ Kafka 还可以通过偏移量来实现消息的回溯。消费者可以通过指定偏移量来重新消费之前的消息,这在某些场景下非常有用,比如重新处理之前出现的错误。

总之,Kafka 消息偏移量是非常重要的一个概念,它可以帮助消费者实现断点续传、保证消息的顺序性以及实现消息的回溯等功能。

04. Kafka 消费位移的提交

消费者可以通过订阅一个或多个主题来拉取消息。当消费者调用 poll() 方法时,它会从 Kafka 集群中拉取一批消息,这些消息会被缓存在消费者的本地缓存中,等待消费者进一步处理。在消费者处理完这批消息后,它可以再次调用 poll() 方法来拉取下一批消息。如果消费者在处理消息时发生了错误,那么这批消息将会被重新拉取,直到消费者成功地处理它们为止。

因此每次调用poll()方法,它总是会返回还没有被消费者读取过的记录,这意味着我们可以追踪哪些记录是被群组里的哪个消费者读取过的。要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题 __consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

05. kafka 消费位移的存储位置

消费者默认将 offset 保存在Kafka一个内置的 topic 中,该 topic 为 __consumer_offsets。

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets

消费者会向一个叫作 __consumer_offset 的内置主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。

消费 offset 案例:

① __consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

② 创建主题 haha

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic haha
Created topic test1.

③ 启动生产者生产数据:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>

④ 启动消费者消费数据:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!

⑤ 查看消费者消费主题__consumer_offsets:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

# key是[消费者组,消费的主题,消费的分区],value中已经消费的消息在当前分区的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)

06. Kafka 消费位移与消费者提交的位移

如下图,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,不过当前消费者需要提交的消费位移并不是 x,而是 x+1,它表示下一条需要拉取的消息的位置。
在这里插入图片描述
如果使用自动提交或不指定提交的偏移量,那么将默认提交poll()返回的最后一个位置之后的偏移量,即提交比客户端从poll()返回的最后一个位置大1的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。

07. kafka 消费位移的提交时机

当前一次 poll() 操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。
在这里插入图片描述
① 如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失:

如图,如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

② 如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:

如图,如果消费完所有拉取到的消息之后才进行位移提交,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

08. Kafka 维护消费状态跟踪的方法

在Kafka中,消费者组可以通过消费者偏移量(consumer offset)来跟踪它们在分区中消费的消息。消费者偏移量是一个整数,表示消费者已经成功读取的消息的位置。当消费者读取消息时,它会将偏移量保存在内存中,以便在下一次读取消息时能够从正确的位置开始读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/886666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日一题——移动零

移动零 题目链接 思路——双指针 如果可以开辟额外的空间,那这题十分好做。我们开辟和nums同样大小的空间,将遍历数组,将非零元素从头放置,将零从后往前放置,这样就可以将所有的零放到后面,同时保证非零元…

安全狗获批成为算网融合产业及标准推进委员会伙伴单位

近日,安全狗获批成为中国通信标准化协会算网融合产业及标准推进委员会伙伴单位。 据悉,中国通信标准化协会算网融合产业及标准推进委员会,致力于算网融合、数字化转型、SDN/NFV、SD-WAN、新基建、信息安全、边缘计算、高性能计算领域及典型应…

品牌营销|所有产品都值得用 AI 再做一遍

微软 CEO Satya Nadella 曾经说过:“所有的产品都值得用 AI 重做一遍。” AI 大模型的出现,开启了一个全新的智能化时代,重新定义了人机交互。这让生成式 AI 技术变得「触手可得」,也让各行业看到 AGI 驱动商业增长的更大可能性。…

基于注册中心如何实现全链路灰度

1. 为什么需要服务发现? 2. 微服务注册中心 3. 基于注册中心如何实现全链路灰度 4. GRPC 如何结合注册中心 GRPC服务发现与全链路灰度 为什么需要服务发现? 服务拆分 配置调用 如果有很多服务怎么办? 服务注册 服务发现 注册中心的架构 配置与使用 常见的…

西瓜书之神经网络

一,神经元模型 所谓神经网络, 目前用得最广泛的一个定义是“神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它的组织能够模拟生物神经系统对真实世界物体所做出的交互反应”。 M-P神经元 M-P神经元:接收n个输入(…

存算分离实践:构建轻量、云中立的大数据平台

今天我们将分享社区用户多点 DMALL 的案例。多点 DMALL 是亚洲领先的全渠道数字零售解决方案服务商,目前已与 380 家零售企业达成合作,覆盖 6 个国家和地区。 面对 B 端客户日益增长的企业数据,存算一体的架构显得力不从心。计算资源冗余浪费…

CAS问题汇总

CAS的执行流程? CAS比较比替换的大致流程是这样的: 首先它有三个参数 : V 内存值 A 预期的旧值 B 新值比较V的值与A的值是否相等如果相等的话,则将V的值替换成B,否则就提示修改失败。 一般正常情况的话就是没有其他线程修改内存…

解决内网GitLab 社区版 15.11.13项目拉取失败

问题描述 GitLab 社区版 发布不久,搭建在内网拉取项目报错,可能提示 unable to access https://github.comxxxxxxxxxxx: Failed to connect to xxxxxxxxxxxxxGit clone error - Invalid argument error:14077438:SSL routines:SSL23_GET_S 15.11.13ht…

工业互联网产业联盟发布《2023可信工业数据流通应用案例集》

导读 随着新一代信息技术与制造业的深度融合发展,全球工业数据应用已经进入纵深发展的新阶段,数据作为新型生产要素和重要战略资源,正在制造业数字化转型过程中发挥出更大的作用。在这一进程中,工业数据的流通共享受到广泛关注。…

深入探索JavaEE单体架构、微服务架构与云原生架构

课程链接: 链接: https://pan.baidu.com/s/1xSI1ofwYXfqOchfwszCZnA?pwd4s99 提取码: 4s99 复制这段内容后打开百度网盘手机App,操作更方便哦 --来自百度网盘超级会员v4的分享 课程介绍: 🔍【00】模块零:开营直播&a…

LeetCode集

目录 1、算法1.1 排序1.1.1 冒泡排序1.1.1.1 简单交换排序1.1.1.2 冒泡排序 1.1.2 简单选择排序1.1.3 直接插入排序1.1.4 希尔排序1.1.5 堆排序1.1.6 归并排序1.1.7 快速排序 1.1 位运算/二进制1.1.1 Java中的正数、负数1.1.2 Java中的位运算1.1.3 比特位计数1.1.4 2的幂1.1.5 …

【腾讯云Cloud Studio实战训练营】Cloud Studio + iPad,让代码之舞飞扬在指尖

Cloud Studio iPad,让代码之舞飞扬在指尖 妙手偶得,开启神奇之旅立即反馈,一切尽在掌握版本控制,简单易用MetaWork 协作,摸鱼变得不再轻松 ​一直以来,开发者大多都习惯在电脑端开发,而iPad只是…

Swift 基础

工程目录 请点击下面工程名称,跳转到代码的仓库页面,将工程 下载下来 Demo Code 里有详细的注释 点击下载代码:swift-01

windows 删除桌面右键菜单多余项

Step1:打开注册表 winr输入 regedit Step2:输入以下路径,跳转到相应位置 计算机\HKEY_CLASSES_ROOT\Directory\Background\shell 即可查看当前所有的右键选项,如下图所示 Step3:删除不需要的选项 直接删掉相应的文…

javaScript:数组检测

目录 一.前言 二.数组检测方法 1.every() 2.some() 3.filter() 一.前言 数组检测是指在编程中对数组进行验证和检查的过程。数组检测可以涉及以下方面: 确定数组的存在:在使用数…

NLP中的RNN、Seq2Seq与attention注意力机制

目录 NLP自然语言处理 的RNN、Seq2Seq与attention注意力机制 RNN循环神经网络 前馈网络入门 前馈网络 循环网络 多层感知器架构示例 循环神经网络的运作原理 展开 RNN seq2seq模型 Attention(注意力机制) 总结 引用 NLP自然语言处理 的RNN、…

Vite更新依赖缓存失败,强制更新依赖缓存

使用vitets开发一段时间了,感觉并不是想象中的好用,特别是出现些稀奇古怪的问题不好解决,比如下面这个问题 上午9:50:08 [vite] error while updating dependencies: Error: ENOENT: no such file or directory, open E:/workspace-dir/node…

2023年备受欢迎的5款团队任务管理工具

任务管理是团队协作的重要环节,选择合适的团队任务管理软件可以提高工作效率、明确责任分工、加强沟通协作。在互联网领域,有许多好用的团队任务管理软件可供选择。下面介绍5款比较知名的团队任务管理软件,并从互联网场景的相关背景内容上进行…

懵了,面试官问我Redis怎么测,直接凉了...

前言 有些朋友来问我,redis要怎么测试?首先我们需要知道,redis是什么?它能做什么? redis是一个key-value类型的高速存储数据库。 redis常被用做:缓存、队列、发布订阅等。 所以,“redis要怎么测…

JavaEE初阶:多线程 - Thread 类的基本用法

上次我们了解了多线程的五种创建方法,今天来学习Thread的基本用法。 目录 run和start Thread常见的构造方法 Thread的几个常见属性 后台线程 是否存活 线程终止 1.使用标志位 2.使用Thread自带的标志 等待线程 run和start 首先需要理解Thread的run和star…