kafka系列之消费后不提交offset情况的分析总结

news2025/1/12 20:46:13

概述
每当我们调用Kafka的poll()方法或者使用@KafkaListener(其实底层也是poll()方法)时,它都会返回之前被写入Kafka的记录,即我们组中的消费者还没有读过的记录。 这意味着我们有一种方法可以跟踪该组消费者读取过的记录。 如前所述,Kafka的一个独特特征是它不会像许多JMS队列那样跟踪消费过的记录。 相反,它允许消费者使用Kafka跟踪每个分区中的位置(偏移)。

我们将更新分区中当前位置的操作称为提交(commits)。

那么消费者是如何提交偏移量(offset)的呢? 它向Kafka生成一条消息,指向一个特殊的 __consumer_offsets主题,包含每个分区需要提交的偏移量。 但是,如果消费者崩溃或新的消费者加入消费者群体,这将触发重新平衡(rebalance)。 在重新平衡之后,可以为每个消费者分配一组新的分区而不是之前处理的分区。 然后消费者将读取每个分区的已提交偏移量并从那里继续。

如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么最后处理的偏移量与提交的偏移量之间的消息将被处理两次,如下图:
在这里插入图片描述

如果提交的偏移量大于客户端实际处理的最后一条消息的偏移量,那么消费者组将忽略上次处理的偏移量与提交的偏移量之间的所有消息,如下图:
在这里插入图片描述
自动提交(Automatic Commit)
提交偏移量的最简单方法是允许消费者来完成。 如果配置 enable.auto.commit=true,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量。 五秒间隔是默认值,可通过设置auto.commit.interval.ms来控制。 就像消费者中的其他机制一样,自动提交由poll loop驱动。 无论您何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量。
虽然这个选取很方便,但是它也有一定的不足。
请注意,默认情况下,自动提交每五秒钟发生一次。 假设我们在最近的提交之后三秒钟并且触发了重新平衡。 在重新平衡之后,所有消费者将从最后提交的偏移开始消费。 在这种情况下,偏移量是三秒钟之前偏移量,因此在这三秒内到达的所有事件将被处理两次。 可以将提交间隔配置为更频繁地提交并减少记录将被复制的窗口,但是不可能完全消除它们。
启用自动提交后,对poll的调用将始终提交上一轮询返回的最后一个偏移量。 它不知道实际处理了哪些事件,因此在再次调用poll()之前,始终处理完poll()返回的所有事件至关重要, 因为和poll()一样,close()方法也会自动提交偏移量。

自动提交很方便,但它们不能给开发人员足够的控制以避免重复的消息。

故最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。

问题:消费者在消费消息的过程中,配置参数spring.kafka.listener .ackMode设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?

spring.kafka.listener.ackMode:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。

ackMode是个枚举类型:

  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME
    每次间隔ackTime的时间去commit
  • COUNT
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。最终也是批量提交。
  • MANUAL_IMMEDIATE
    每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交

参考Kafka系列之SpringBoot集成Kafka

————————————————————————————————————————————————————————————

首先简单的介绍一下消费者对topic的订阅。客户端的消费者订阅了topic后,如果是单个消费者,那么消费者会顺序消费这些topic分区中的数据,如果是创建了消费组有多个消费者,那么kafak的服务端将这些topic平均分配给每个消费者。比如有2个topic,每个topic有2个分区,总共有4个分区,如果一个消费组开了四个消费者线程,那么每个消费者将被分配一个分区进行消费。一般建议是一个消费组里的消费者的个数与订阅的topic的总分区数相等,这样可以达到最高的吞吐量。如果消费者的个数大于订阅的topic的总分区,那么多出的消费者将分配不到topic的分区,等于是白白创建了一个消费者线程,浪费资源。

我们进入正题,对开头提出的问题的总结如些:
  
注意:以下情况均基于kafka的消费者关闭自动提交offset的条件下。亦是基于同一个消费者组的情况,因为不同的消费者组之间,他们彼此的offset偏移量是完全独立的。

  1. 如果消费端在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

  2. 如果在消费的过程中有几条或者一批数据数据没有提交offset(比如异常情况程序没有走到手动提交的代码),后面其他的消息消费后正常提交offset至服务端,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序或者rebalance也不会重新消费。

  3. 消费端如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你没有提交offset的消息时你新增或者减少消费端,此时会发生rebalance现象,即可再次消费到这个未提交offset的数据,产生重复消费问题。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在发生rebalance现象之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1897457.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自闭症儿童的治疗方法有哪些?

身为星贝育园自闭症儿童康复学校的资深教育者,我深知自闭症谱系障碍(ASD)儿童的教育与治疗需要一个全面、个性化的方案。在星贝育园,我们致力于为孩子们提供一个充满爱与理解的环境,采用多种科学验证的教育方法&#x…

【Linux】动态库的制作与使用

💐 🌸 🌷 🍀 🌹 🌻 🌺 🍁 🍃 🍂 🌿 🍄🍝 🍛 🍤 📃个人主页 :阿然成长日记 …

asp.net公交司机管理系统-计算机毕业设计源码96696

摘 要 公交司机是公交运输系统中的重要组成部分,他们的管理和运营对于公交运输的正常运行和服务质量起着至关重要的作用。本文提出了一种基于C#(asp.net)的公交司机管理系统。该系统利用计算机技术和网络通信技术,实现了公交司机信…

Ollama:本地大模型运行指南_ollama运行本地模型

Ollama 简介 Ollama 是一个基于 Go 语言开发的可以本地运行大模型的开源框架。 官网:ollama.com/ GitHub 地址:github.com/ollama/olla… Ollama 安装 【一一AGI大模型学习 所有资源获取处一一】 ①人工智能/大模型学习路线 ②AI产品经理入门指南 ③…

【 香橙派 AIpro评测】大语言模型实战教程:香橙派 AIpro部署LLMS大模型实站(保姆级教学)

引言 OrangePi AIpro 这块板子作为业界首款基于昇腾深度研发的AI开发板,一经发布本博主就火速去关注了,其配备的 8/20TOPS澎湃算力是目前开发板市场中所具备的最大算力,可谓是让我非常眼馋啊!这么好的板子那必须拿来用用&#xff…

Java面试八股之如何提高MySQL的insert性能

如何提高MySQL的insert性能 提高MySQL的INSERT性能可以通过多种策略实现,以下是一些常见的优化技巧: 批量插入: 而不是逐条插入,可以使用单个INSERT语句插入多行数据。例如: INSERT INTO table_name (col1, col2) V…

用Python轻松转换PDF为CSV

数据的可访问性和可操作性是数据管理的核心要素。PDF格式因其跨平台兼容性和版面固定性,在文档分享和打印方面表现出色,尤其适用于报表、调查结果等数据的存储。然而,PDF的非结构化特性限制了其在数据分析领域的应用。相比之下,CS…

AI时代下 AI搜索成“兵家必争之地”

当下,海量信息爆发性增长,用户的搜索需求也从找不到信息转变成找不到“需要的”信息。不过随着AI技术的迅速发展,这个需求将会得到解决,AI搜索也将成为“兵家必争之地”。 在全球范围内,谷歌作为全球最大的搜索引擎公司…

国衍科技——梅雨季节文物保护专家

尊敬的文物保护者们 随着梅雨季节的脚步渐近,湿润的空气和连绵的雨水不仅为我们的生活带来了不便,更为文物保护工作带来了严峻的挑战。在这个季节,文物发霉的风险急剧上升,每一件珍贵的文化遗产都面临着被时间侵蚀的威胁。然而&am…

使用Mybatis批量插入大量数据的实践

前言 在项目开发过程中,我们经常会有批量插入的需求,例如:定时统计任务 但是受限于MySQL中 max_allowed_packet 参数限制,5.7版本默认值为4M,这显然不太符合我们的需求,当然我们也可以通过修改此值来适应…

ChatGPT如何提升论文写作(附指令集合)

先讲前提: ChatGPT无论是3.5还是4.0都存在非常严重的幻觉问题,目前ChatGPT无法替代搜索引擎。 如果你希望得到更加优质的体验,请用GPT-4.0,幻觉问题上比3.5大幅降低 ChatGPT中文版,一站式AI创作平台​aibox365.com …

昇思MindSpore学习笔记4-01生成式--CycleGAN图像风格迁移互换

摘要: 记录了昇思MindSpore AI框架用循环对抗生成网络模型CycleGAN实现图像匹配的方法、步骤。包括环境准备、数据集下载、数据加载和预处理、构建生成器和判别器、优化、模型训练和推理等。 1.模型介绍 1.1模型简介 CycleGAN(Cycle Generative Adversarial Netwo…

Yolo系列——动态卷积

一、为什么要提出动态卷积? 为了更好的将模型部署在边端设备上,需要设计轻量级网络模型。轻量级卷积网络因其较低的运算而限制了CNN的深度(卷积层层数)和宽度(通道数),限制了模型的表达能力&am…

3dmax全景图用什么渲染软件好?渲染100邀请码1a12

全景图是常见的效果图类型,常用于展示大型空间,如展厅、会议室等。全景图的制作需要渲染,下面我介绍几个常用的渲染软件分享给大家。 1、V-Ray:十分流行的渲染引擎,功能强大,它提供了高质量的光线追踪技术…

gitee代码初次上传步骤

ps. 前提是已经下载安装gitee 一、在本地项目目录下空白处右击,选择“Git Bash Here” 二、初始化 git init 三、添加、提交代码(注意add与点之间的空格) git add . git commit -m 添加注释 四、连接、推送到gitee仓库 git remote add …

计算机网络——数据链路层(以太网)

目录 局域网的数据链路层 局域网可按照网络拓扑分类 局域网与共享信道 以太网的两个主要标准 适配器与mac地址 适配器的组成与运作 MAC地址 MAC地址的详细介绍 局域网的mac地址格式 mac地址的发送顺序 单播、多播,广播mac地址 mac帧 如何取用…

C++基石:掌握高效编程的艺术

C 关于命名空间:namespace 上述文档详细介绍了C标准库(Standard C Library)的一些关键约定,这些约定不仅帮助开发者理解如何正确使用库中的功能,也明确了实现者在设计库时的灵活性和限制。下面是对文档中提到的几个要点…

2024-07-05 base SAS programming学习笔记9(variables)

1.在数据集增加累加变量值(SUM) 求和语句(SUM STATEMENT):variableexpression variable是累积求和的变量名,为数值型,默认初始值为0;该variable值则会保留到一个观测 当expression有缺失值,在求…

深度学习Week19——学习残差网络和ResNet50V2算法

文章目录 深度学习Week18——学习残差网络和ResNet50V2算法 一、前言 二、我的环境 三、论文解读 3.1 预激活设计 3.2 残差单元结构 四、模型复现 4.1 Residual Block 4.2 堆叠Residual Block 4.3. ResNet50V2架构复现 一、前言 🍨 本文为🔗365天深度学…

v-html 空格/换行不生效

接口返回的内容如下&#xff1a;有空格有换行&#xff0c;但 使用v-html无效 需加css样式 white-space: pre-wrap; <div class"pretty-html" v-html"Value"></div>.pretty-html {white-space: pre-wrap; /* 保留空格和换行&#xff0c;并允许…