Kafka运行机制(二):消息确认,消息日志的存储和回收

news2025/1/5 9:19:20

前置知识

Kafka基本概念icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501Kafka运行机制(一):Kafka集群启动,controller选举,生产消费流程icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141307210?spm=1001.2014.3001.5501

​1. 消息确认

生产者端,客户端在通过生产者生产消息时,需要知道消息是否发送成功,防止消息丢失或进行其他操作。消费者端,消费者也需要确认自己在消费数据后,提交偏移量是否成功,防止重复消费。

生产者

生产者的消息确认机制通常只有在JavaApi中,或生产者配置了重试机制时才有意义。在JavaApi中Kafka驱动提供了同步提交和异步提交两种提交方式(因为生产者批处理的机制,Kafka可以配置缓冲区大小,消息可以暂存到缓冲区中,等到时间或消息大小达到指定值时发送),同步提交会阻塞当前线程,等待kafka返回确认信号,并更具生产者配置选择是否重试。而异步提交则不会阻塞代码,Kafka返回确认信号后执行指定的回调函数。命令行配置重试机制,也可以在Kafka返回失败的信号时,触发重新提交。而Kafka何时或者说何种条件下才会返回确认信息,则是由acks配置项控制。

生产者消息确认策略是由acks配置项控制具体如下:

  • acks=0:生产者发送消息后不会等待任何确认。这种模式下,消息的丢失不可避免,适用于对消息丢失容忍的场景。
  • acks=1:生产者发送消息后,会等待 Leader 副本的确认。Leader 确认后,生产者会认为消息已成功写入。这种模式下,消息丢失的风险较低,但如果 Leader 宕机,消息可能会丢失。
  • acks=allacks=-1:生产者发送消息后,会等待所有 ISR(In-Sync Replicas)副本的确认。只有当所有 ISR 副本都确认了消息,生产者才会认为消息写入成功。这种模式下,消息的可靠性最高,但写入延迟可能增加。

实际生产中并非所有情况都不允许消息丢失,在视频相关的功能中,丢失几帧数据并不影响视频整体流畅度,反而是服务的响应速度对流畅度的影响更大,在这种情况下可以完全不在乎是否丢包,将ack设为0,以达到极致的性能。

其中,0和1都比较好理解,当设置为all或-1时所有的ISR副本是什么呢?

ISR是集群元数据的一部分,其中记录了各个Leader分区以及和其日志偏移量差距不差,上次同步数据时间据当前时间不长的Follower副本,和其日志偏移量和上次同步数据的时间的映射关系。

在上文中我们讲述了主题副本分区中的Leader和Follower的角色和相关功能,其中Leader 副本负责处理所有的生产请求,并将数据写入自己的日志中。Follower 副本则从 Leader 副本同步数据,将其写入到自己的日志中。当Follower在同步Ledaer的数据时,Leader会讲这次同步的Follower,其同步数据的偏移量以及同步的时间的映射关系通过controlelr节点,再由controller节点存入元数据中的ISR数据列表。当Leader将新的偏移量和时间存入ISR中时,ISR不仅会更新数据,还会检查当前内部保存的Follower的同步数据偏移量和时间是否和Leader日志中的偏移量和当前时间相差太多(配置中的replica.lag.max.messages和replica.lag.time.max.ms),如果超过预期值则会将该Follwer从ISR踢出。

踢出ISR并不会影响改分区副本的正常功能,不过当选举新的Leader会从ISR中选取,并且当设置acks为all时,生产消息的确认,也是通过当前生产消息的偏移量与ISR中的值进行比对,当ISR中记录的所有的Follwer偏移量都超过了这个消息对应的偏移量,则认为所有分区都已经成功,返回客户端成功响应,不会确认未再ISR中的分区副本。整体流程如下图

​消费者

消费者的确认机制是依赖于提交偏移量的。不同于生产者生产消息,如果生产成功时不返回任何确认信号,客户端则无法知道自己是否生产成功。而消费者消费成功会获取最终数据,所以只要获得了数据,就是消费成功,不需要什么确认信息。不过消费者在消费成功时需要向__consumer-offsets主题中提交一个自己消费分区当前的偏移量,所以只有当成功向向__consumer-offsets主题提交了偏移量后,才叫消费成功,如果没有成功提交偏移量,后续仍然会重复消费。

消费者提交偏移量分为手动提交和自动提交,手动提交时会马上提交偏移量,即使没有消费成功,这种模式下,当消费者由于某些意外消费失败,偏移量也会加一,就会造成消息丢失。

自动提交会在消费成功时自动提交,在这种模式下,当消费者由于某些意外消费成功,但意外宕机导致没有提交偏移量,则会造成消息重复消费。

2. Kafka消息日志的存储和回收

1. 日志存储机制

Kafka 中的每个主题包含若干分区,每个分区在物理上对应一个日志文件夹,该文件夹包含一系列的日志段文件。

日志段文件
  • 日志段:每个分区的日志被分成多个日志段文件。日志段是一个物理文件,存储在 Kafka 的存储目录下。每个日志段文件以偏移量范围命名。
  • 日志文件命名:日志段文件的命名格式通常是 log-start-offset.log,其中 log-start-offset 是该日志段中第一条消息的偏移量。
  • 索引文件:除了日志段文件外,每个分区还包含两个索引文件:时间索引文件和偏移量索引文件,用于快速查找消息。
写入机制
  • 顺序写入:Kafka 中的消息是顺序写入日志段的,每条消息都有一个唯一的偏移量,表示该消息在分区中的位置。
  • 追加模式:消息总是被追加到当前的日志段文件中。当日志段的大小达到配置的阈值时,Kafka 会滚动创建新的日志段。

2. 日志存储配置

Kafka 提供了一系列配置参数,控制日志的存储行为:

  • log.segment.bytes:每个日志段文件的最大大小,默认为 1GB。当日志段大小达到这个限制时,Kafka 会创建一个新的日志段。

    log.segment.bytes=1073741824 # 每个日志段大小限制为1GB
  • log.roll.ms:日志段滚动的时间限制。即使日志段未达到 log.segment.bytes 限制,Kafka 也会根据时间限制滚动日志段,默认为 7 天。

    log.roll.ms=604800000 # 每7天滚动一次日志段

3. 日志回收机制

Kafka 的日志回收是通过删除策略和压缩策略实现的,用于管理存储空间并保持系统的高效运行。

删除策略(Log Cleanup Policy)

Kafka 提供两种主要的日志回收策略:

  • 基于时间的保留策略

    • log.retention.ms:定义了消息在日志中的最长保留时间。超过这个时间的消息会被标记为可删除。

    • log.retention.minuteslog.retention.hours:这是 log.retention.ms 的简写形式,以分钟和小时为单位配置。

      log.retention.hours=168 # 保留7天的消息
  • 基于大小的保留策略

    • log.retention.bytes:定义每个分区日志的最大存储大小。当日志文件的大小超过这个值时,最旧的消息会被删除。

      log.retention.bytes=1073741824 # 每个分区的日志大小限制为1GB
日志清理策略(Log Cleanup Policy)

Kafka 允许用户为每个主题配置日志清理策略:

  • delete 策略:这是默认策略。当消息超过保留时间或日志大小限制时,Kafka 会自动删除旧消息,以释放存储空间。

    log.cleanup.policy=delete
  • compact 策略:此策略不删除旧消息,而是保留每个键的最新版本。

    log.cleanup.policy=compact

compact策略下,通过生产者生产消息时给该条消息设置一个key,当多条消息都有一个key值时,compact会删除旧的key对应的消息,保存最新的key消息。

compact可以和delete结合使用,也就是同时配置两种策略,在这种策略下会对有key的消息保存最新版本,而对于没有key的消息依据删除配置删除。

Kafka 使用后台线程定期检查日志段,并根据删除策略和保留策略执行清理操作。这些线程负责删除旧的日志段或压缩日志段,以保证 Kafka 的存储空间得到合理管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2054596.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt 0816作业

一、思维导图 二、将day1做的登录界面升级优化【资源文件的添加】 三、在登录界面的登录取消按钮进行一下设置 使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函数 将登录按钮使用qt5版本的连接到…

C++ | Leetcode C++题解之第350题两个数组的交集II

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> intersect(vector<int>& nums1, vector<int>& nums2) {sort(nums1.begin(), nums1.end());sort(nums2.begin(), nums2.end());int length1 nums1.size(), length2 nums2…

函数递归VS操作符深入?

1>>前言 函数递归函数递归&#xff0c;当小白听到这样的词会感到无比陌生&#xff0c;请不要惊慌&#xff0c;这是正常的&#xff0c;以至于都不是很经常用到&#xff0c;但是它的算法&#xff0c;它的思想是值得我们深入思考的。还有一些复杂操作符&#xff0c;如按位与…

【原创】java+swing+mysql共享充电宝管理系统设计与实现

个人主页&#xff1a;程序员杨工 个人简介&#xff1a;从事软件开发多年&#xff0c;前后端均有涉猎&#xff0c;具有丰富的开发经验 博客内容&#xff1a;全栈开发&#xff0c;分享Java、Python、Php、小程序、前后端、数据库经验和实战 文末有本人名片&#xff0c;希望和大家…

PyTorch之TensorBoard使用

接回上一篇&#xff1a;PyTorch深度学习框架-CSDN博客 在学习这篇之前建议先按照上一篇搭建好整个PyTorch环境 然后这一篇讲怎么用TensorBoard&#xff0c;这个玩意是Tensorflow官方推出的一个可视化工具&#xff0c;当使用Tensorflow训练大量深层的神经网络时&#xff0c;我们…

全局锁、表级锁、行级锁

锁的作用和特点 WHY&#xff1a;锁的出现是为了解决并发场景下不同用户同时对共享资源进行操作&#xff0c;而可能引发的并发问题。 HOW&#xff1a;控制不同线程对资源访问的规则。 全局锁 顾名思义&#xff0c;全局锁就是对整个数据库实例加锁。一般在进行全库备份的时候…

prometheus + grafana + 告警

配置环境 准备三台主机&#xff0c;将三台主机的信息分别写入/etc/hosts文件中 192.168.100.115 server.example.com server 192.168.100.116 agent1.example.com agent1 192.168.100.117 grafana.example.com grafana [rootserver ~]# cat /etc/hosts 127.0.0.1 localhos…

【MySQL 08】内置函数 (带思维导图)

文章目录 &#x1f308; 一、日期函数⭐ 1. 常见日期函数⭐ 2. 日期函数使用示例⭐ 3. 日期函数综合案例 &#x1f308; 二、字符串函数⭐ 1. 常见字符串函数⭐ 2. 字符串函数使用示例⭐ 3. 字符串函数综合案例 &#x1f308; 三、数值函数⭐ 1. 常见数值函数⭐ 2. 数值函数使用…

探索GitHub的无限可能:从注册到Linux环境下的库分支链接

在这个数字化时代&#xff0c;GitHub已成为开发者们不可或缺的宝藏库。无论你是编程新手还是资深开发者&#xff0c;GitHub都能为你打开一扇通往无限创意与协作的大门。今天&#xff0c;就让我们一起踏上这段探索之旅&#xff0c;从GitHub的注册开始&#xff0c;再到如何在Linu…

google transalte api的使用,V2服务账户方式(google-cloud-java)

Google Cloud Translation API 有几个不同的使用方式&#xff0c;其中之一是使用最新的 Google Cloud Client Library。这些库提供了简化的 API&#xff0c;使得与 Google Cloud 服务的交互变得更加容易。 对于gcp平台的创建方式&#xff0c;我记得得绑定真信用卡了&#xff0c…

Debug-021-el-table实现分页多选的效果(切换分页,仍可以保持前一页的选中效果)

前情提要&#xff1a; 这个功能实现很久了&#xff0c;但是一直没有留意如何实现&#xff0c;今天想分享一下。具体就是我们展示table数据的时候&#xff0c;表格中的数据多数情况是分页展示&#xff0c;毕竟数据量太多&#xff0c;分页的确是有必要的。那么我们有业务需要给表…

portswigger的Exploiting DOM clobbering to enable XSS

目录 尝试一下看看可不可以XSS DOM破坏 查看源码确定DOM破坏漏洞点以及代码分析 首先查看/resources/labheader/js/labHeader.js&#xff0c;没有什么作用 然后domPurify这东西是一个过滤框架也没啥子用 看/resources/js/loadCommentsWithDomClobbering.js尝试分析代码(对…

使用Poi-tl对word模板生成动态报告

一、pom依赖问题&#xff1a; <dependency> <groupId>com.deepoove</groupId> <artifactId>poi-tl</artifactId> <version>1.12.2</version> </dependency> 使用 poi-tl 的 1.12.2版本&#xff0c;如果使用了poi依赖&#x…

【编程之路:在 Bug 的迷宫中寻找出口】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Mysql-linux通过rpm安装、linux离线安装mysql

新建用户 useradd mysql passwd mysqlmysql用户增加sudo权限 Linux-创建用户、给普通用户sudo权限、设置不需要密码执行sudo 卸载旧版本软件包 卸载mariadb --查询mariadb版本 rpm -qa|grep mariadb --控制台输出 mariadb-libs-5.5.68-1.el7.x86_64 --执行卸载 sudo rpm -…

系规学习第13天

1、规划设计的主要目的不包括() A、设计满足业务需求的IT服务 B、设计SLA、测量方法和指标。 C、设计服务过程及其控制方 D、设计实施规划所需要的进度管理过程 [答案] D [解析]本题考察的是规划设计的目的&#xff0c;建议掌握。 (1)设计满足业务需求的IT服务。 (2)设…

Python计算机视觉 第1章-基本的图像操作和处理

Python计算机视觉 第1章-基本的图像操作和处理 本章讲解操作和处理图像的基础知识&#xff0c;将通过大量示例介绍处理图像所需的Python工具包&#xff0c;并介绍用于读取图像、图像转换和缩放、计算导数、画图和保存结果等的基本工具。 1.1 PIL&#xff1a;Python图像处理类…

零基础学习Redis(4) -- 常用数据结构介绍

我们之前提到过&#xff0c;redis中key只能是字符串类型&#xff0c;而value有多种类型。 redis中的数据结构有自己独特的实现方式能根据特定的场景进行优化 1. string(字符串) 内部编码&#xff1a; raw&#xff1a;最基本的字符串&#xff0c;类比我们平常使用的Stringin…

MFC读取 Excel

2.添加读取excel数据的接口类&#xff1a; 添加读取excel的接口类&#xff1a; 3、添加完成后&#xff0c;找到这几个接口类的头文件&#xff0c;注释/删除 下图红框中的引入语句 注意&#xff1a;每个接口类的头文件都需进行处理。 4、添加源文件 excel.h文件&#xff1a; …

Spring websocket并发发送消息异常的解决

https://www.jb51.net/program/297186nkq.htm本文主要介绍了 Spring websocket并发发送消息异常的解决,当多个线程同时尝试通过 WebSocket 会话发送消息时,会抛出异常,下面就来解决一下,感兴趣的可以了解一下https://www.jb51.net/program/297186nkq.htm