Kafka运行机制(二):消息确认,消息日志的存储和回收,生产者消息分区

news2025/1/17 4:12:14

前置知识

Kafka基本概念icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501Kafka运行机制(一):Kafka集群启动,controller选举,生产消费流程icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141307210?spm=1001.2014.3001.5501

​1. 消息确认

生产者端,客户端在通过生产者生产消息时,需要知道消息是否发送成功,防止消息丢失或进行其他操作。消费者端,消费者也需要确认自己在消费数据后,提交偏移量是否成功,防止重复消费。

生产者

生产者的消息确认机制通常只有在JavaApi中,或生产者配置了重试机制时才有意义。在JavaApi中Kafka驱动提供了同步提交和异步提交两种提交方式(因为生产者批处理的机制,Kafka可以配置缓冲区大小,消息可以暂存到缓冲区中,等到时间或消息大小达到指定值时发送),同步提交会阻塞当前线程,等待kafka返回确认信号,并更具生产者配置选择是否重试。而异步提交则不会阻塞代码,Kafka返回确认信号后执行指定的回调函数。命令行配置重试机制,也可以在Kafka返回失败的信号时,触发重新提交。通过下文可以详细了解在JavaApi中如何控制生产者和消费者的消息确认

Kafka快速入门:Kafka驱动JavaApi的使用icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141356436?spm=1001.2014.3001.5501

而Kafka何时或者说何种条件下才会返回确认信息,则是由acks配置项控制。

生产者消息确认策略是由acks配置项控制具体如下:

  • acks=0:生产者发送消息后不会等待任何确认。这种模式下,消息的丢失不可避免,适用于对消息丢失容忍的场景。
  • acks=1:生产者发送消息后,会等待 Leader 副本的确认。Leader 确认后,生产者会认为消息已成功写入。这种模式下,消息丢失的风险较低,但如果 Leader 宕机,消息可能会丢失。
  • acks=allacks=-1:生产者发送消息后,会等待所有 ISR(In-Sync Replicas)副本的确认。只有当所有 ISR 副本都确认了消息,生产者才会认为消息写入成功。这种模式下,消息的可靠性最高,但写入延迟可能增加。

实际生产中并非所有情况都不允许消息丢失,在视频相关的功能中,丢失几帧数据并不影响视频整体流畅度,反而是服务的响应速度对流畅度的影响更大,在这种情况下可以完全不在乎是否丢包,将ack设为0,以达到极致的性能。

其中,0和1都比较好理解,当设置为all或-1时所有的ISR副本是什么呢?

ISR是集群元数据的一部分,其中记录了各个Leader分区以及和其日志偏移量差距不差,上次同步数据时间据当前时间不长的Follower副本,和其日志偏移量和上次同步数据的时间的映射关系。

在上文中我们讲述了主题副本分区中的Leader和Follower的角色和相关功能,其中Leader 副本负责处理所有的生产请求,并将数据写入自己的日志中。Follower 副本则从 Leader 副本同步数据,将其写入到自己的日志中。当Follower在同步Ledaer的数据时,Leader会讲这次同步的Follower,其同步数据的偏移量以及同步的时间的映射关系通过controlelr节点,再由controller节点存入元数据中的ISR数据列表。当Leader将新的偏移量和时间存入ISR中时,ISR不仅会更新数据,还会检查当前内部保存的Follower的同步数据偏移量和时间是否和Leader日志中的偏移量和当前时间相差太多(配置中的replica.lag.max.messages和replica.lag.time.max.ms),如果超过预期值则会将该Follwer从ISR踢出。

踢出ISR并不会影响改分区副本的正常功能,不过当选举新的Leader会从ISR中选取,并且当设置acks为all时,生产消息的确认,也是通过当前生产消息的偏移量与ISR中的值进行比对,当ISR中记录的所有的Follwer偏移量都超过了这个消息对应的偏移量,则认为所有分区都已经成功,返回客户端成功响应,不会确认未再ISR中的分区副本。整体流程如下图

​消费者

消费者的确认机制是依赖于提交偏移量的。不同于生产者生产消息,如果生产成功时不返回任何确认信号,客户端则无法知道自己是否生产成功。而消费者消费成功会获取最终数据,所以只要获得了数据,就是消费成功,不需要什么确认信息。不过消费者在消费成功时需要向__consumer-offsets主题中提交一个自己消费分区当前的偏移量,所以只有当成功向向__consumer-offsets主题提交了偏移量后,才叫消费成功,如果没有成功提交偏移量,后续仍然会重复消费。

消费者提交偏移量分为手动提交和自动提交,手动提交时会马上提交偏移量,即使没有消费成功,这种模式下,当消费者由于某些意外消费失败,偏移量也会加一,就会造成消息丢失。

自动提交会在消费成功时自动提交,在这种模式下,当消费者由于某些意外消费成功,但意外宕机导致没有提交偏移量,则会造成消息重复消费。

2. Kafka消息日志的存储和回收

1. 日志存储机制

Kafka 中的每个主题包含若干分区,每个分区在物理上对应一个日志文件夹,该文件夹包含一系列的日志段文件。

日志段文件
  • 日志段:每个分区的日志被分成多个日志段文件。日志段是一个物理文件,存储在 Kafka 的存储目录下。每个日志段文件以偏移量范围命名。
  • 日志文件命名:日志段文件的命名格式通常是 log-start-offset.log,其中 log-start-offset 是该日志段中第一条消息的偏移量。
  • 索引文件:除了日志段文件外,每个分区还包含两个索引文件:时间索引文件和偏移量索引文件,用于快速查找消息。
写入机制
  • 顺序写入:Kafka 中的消息是顺序写入日志段的,每条消息都有一个唯一的偏移量,表示该消息在分区中的位置。
  • 追加模式:消息总是被追加到当前的日志段文件中。当日志段的大小达到配置的阈值时,Kafka 会滚动创建新的日志段。

2. 日志存储配置

Kafka 提供了一系列配置参数,控制日志的存储行为:

  • log.segment.bytes:每个日志段文件的最大大小,默认为 1GB。当日志段大小达到这个限制时,Kafka 会创建一个新的日志段。

    log.segment.bytes=1073741824 # 每个日志段大小限制为1GB
  • log.roll.ms:日志段滚动的时间限制。即使日志段未达到 log.segment.bytes 限制,Kafka 也会根据时间限制滚动日志段,默认为 7 天。

    log.roll.ms=604800000 # 每7天滚动一次日志段

3. 日志回收机制

Kafka 的日志回收是通过删除策略和压缩策略实现的,用于管理存储空间并保持系统的高效运行。

删除策略(Log Cleanup Policy)

Kafka 提供两种主要的日志回收策略:

  • 基于时间的保留策略

    • log.retention.ms:定义了消息在日志中的最长保留时间。超过这个时间的消息会被标记为可删除。

    • log.retention.minuteslog.retention.hours:这是 log.retention.ms 的简写形式,以分钟和小时为单位配置。

      log.retention.hours=168 # 保留7天的消息
  • 基于大小的保留策略

    • log.retention.bytes:定义每个分区日志的最大存储大小。当日志文件的大小超过这个值时,最旧的消息会被删除。

      log.retention.bytes=1073741824 # 每个分区的日志大小限制为1GB
日志清理策略(Log Cleanup Policy)

Kafka 允许用户为每个主题配置日志清理策略:

  • delete 策略:这是默认策略。当消息超过保留时间或日志大小限制时,Kafka 会自动删除旧消息,以释放存储空间。

    log.cleanup.policy=delete
  • compact 策略:此策略不删除旧消息,而是保留每个键的最新版本。

    log.cleanup.policy=compact

compact策略下,通过生产者生产消息时给该条消息设置一个key,当多条消息都有一个key值时,compact会删除旧的key对应的消息,保存最新的key消息。

compact可以和delete结合使用,也就是同时配置两种策略,在这种策略下会对有key的消息保存最新版本,而对于没有key的消息依据删除配置删除。

Kafka 使用后台线程定期检查日志段,并根据删除策略和保留策略执行清理操作。这些线程负责删除旧的日志段或压缩日志段,以保证 Kafka 的存储空间得到合理管理。

3. 生产者消息分区

当没有使用自定义分区器时,会使用默认分区器,默认分区器会根据消息是否具有key值来进行不同的分区判断。

  • 有key值时:有key值时会通过类似于Hash槽的模式来对消息进行分区,通过对key值取hash值,然后在通过hash值取余分区总数,的到最终的值就是分区位置。
  • 无key值时:无key值时则通过轮询算法来进行消息分区

客户端也可以根据具体要求来自定义分区策略,比如某些服务器更强,我们可以向其中分区投入更多消息,具体的自定义分区实现,下文中有详细讲解

Kafka快速入门:Kafka驱动JavaApi的使用icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141356436?spm=1001.2014.3001.5501​​​​​​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2067702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Verilog刷题笔记57

题目: Exams/2014 q3bfsm Given the state-assigned table shown below, implement the finite-state machine. Reset should reset the FSM to state 000. 解题: module top_module (input clk,input reset, // Synchronous resetinput x,output z );parameter…

ESP32-IDF 在 Ubuntu 下的配置

目录 一、安装准备二、获取 ESP-IDF三、设置工具四、使用案例 参考资料:官方文档:Linux 和 macOS 平台工具链的标准设置。 一、安装准备 参照官方文档,首先下载编译 ESP-IDF 所需要的软件包: sudo apt-get install git wget fl…

如何使用ssm实现基于Java的共享客栈管理系统

TOC ssm058基于Java的共享客栈管理系统jsp 第1章 绪论 1.1 课题背景 互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。所以各…

一篇快速搞懂 JavaSE 高级特性(代码块,注解,枚举,异常处理,多线程,集合框架,泛型,反射,IO ......)

JavaSE 高级 一、面向对象(高级)1、单例模式(Singleton)2、代码块1)静态代码块2)非静态代码块 3、关键字 final4、抽象类与抽象方法(abstract)5、模板方法设计模式(Templ…

利用GPT绘制流程图(无需下载任何软件

目录 什么是Flowchart Fun?如何利用GPT绘制流程图?步骤1:确定流程图的目的和内容步骤2:训练GPT编写流程图的文本描述步骤3:转换文本格式为可视化的流程图步骤4:调整和优化 结论小结: 什么是Flow…

SpringBoot【重修之HTTP协议】

request 请求,response 响应! HTTP-协议的解析: 客户端:浏览器已经会自动解析了 服务端:通过Web服务器来解析!【Tomcat , Jetty , WebLogic , WebSphere 】 Tomcat的研究学习 SpringBoot Web快速入门…

AIxBoard部署BLIP模型进行图文问答

一、AIxBoard简介 AIxBoard(X板)是一款IA架构的人工智能嵌入式开发板,体积小巧功能强大,可让您在图像分类、目标检测、分割和语音处理等应用中并行运行多个神经网络。它是一款面向专业创客、开发者的功能强大的小型计算机&#xf…

[Linux CMD] 查看系统资源 (持续更新中)

概述 在Linux中,有许多命令和工具可用于查看系统的资源使用情况。以下是一些常用的方式: top:top命令是最常见的实时系统监视工具之一。它显示了当前运行的进程列表,以及每个进程的CPU、内存使用情况、nice值等信息。top命令还会…

【Linux】简易日志工具项目

有些鸟儿是不应该被关在笼子里的, 因为他们的羽毛太丰润了。 当他们飞走,你会由衷地庆贺他获得自由。 --- 肖申克的救赎》--- 从零开始构建简易日志系统 1 日志1.1 什么是日志1.2 日志的意义1.3 为什么要构建自己的日志工具 2 构建自己的日志工具2.1…

带有限制编辑的PDF文件怎么取消编辑限制

在日常工作和学习中,我们经常会遇到一些带有“限制编辑”的PDF文件。这些文件由于设置了密码保护,使得我们无法直接编辑、复制或打印其中的内容,给信息的处理和利用带来了诸多不便。然而,通过一些有效的方法和工具,我们…

C++—八股文总结(25秋招期间一直更新)

1、const 1.1 指针常量和常量指针 说说const int *a, int const *a, const int a, int *const a, const int *const a分别是什么,有什么特点。 const int *aint const *a; //可以通过 a 访问整数值,但不能通过 a 修改该整数的值,指针本身是…

【AI赋能游戏】《黑神话:悟空》专属黑悟空无限创意生成器!(整合包分享)

最近最火的话题,肯定就是《黑神话:悟空》了!这游戏实在是忒火火火了。。。全网破圈霸屏,连官媒央视都给了无死角宣传! 《黑神话:悟空》登顶Steam历史售卖榜,同时在线玩家冲到了最高**222**万人&…

8.22-docker的部署及其使用

docker 1.docker环境部署以及语法 [rootdocker ~]# cat << EOF | tee /etc/modules-load.d/k8s.conf> overlay> br_netfilter> EOFoverlaybr_netfilter[rootdocker ~]# modprobe overlay[rootdocker ~]# modprobe br_netfilter[rootdocker ~]# cat /etc/module…

【个人学习】JVM(8): 对象的实例化、内存布局、访问定位

对象的实例化内存布局与访问定位 对象的实例化 对象创建的方式 new&#xff1a;最常见的方式、单例类中调用getInstance的静态类方法&#xff0c;XXXFactory的静态方法Class的newInstance方法&#xff1a;在JDK9里面被标记为过时的方法&#xff0c;因为只能调用空参构造器&am…

Ignition Gateway配置

Config-System backup和restore&#xff1a; backup可以直接备份整个gateway配置&#xff0c;包括所有项目。 restore可以恢复gateway配置&#xff0c;包括所有项目。

pytorch基础学习

环境安装 mac安装conda&#xff08;为什么安装conda? conda类似沙箱&#xff0c;将一个一个环境隔离起来&#xff0c;解决Python工程之前的包冲突问题&#xff09; 下载Miniconda安装器:https://docs.conda.io/en/latest/miniconda.html 执行dmg安装。 安装完成后&#xff0c…

C++对象初始化

背景 最近在编译一个库的时候发现有个编译错误&#xff0c;最后发现是初始化对象的时候出了问题&#xff0c;这里简单记录一下&#xff1a; #include <iostream>class A { public:int m_a; }; class C { public:int m_c;operator A(){}; }; class B { public:B(){};B (…

探寻孩子自闭症之因:为 “星星的孩子” 寻找答案

在这个丰富多彩的世界里&#xff0c;有一群特殊的孩子&#xff0c;他们仿佛来自遥远的星球&#xff0c;沉浸在自己的独特世界中&#xff0c;难以与外界进行有效的沟通和互动。他们是自闭症儿童&#xff0c;也被称为 “星星的孩子”。那么&#xff0c;为什么孩子会患上自闭症呢&…

大语言模型私有化部署和个性化调优的技术实践

简介 本文介绍如何在不依赖任何三方服务的情况下&#xff0c;私有化部署和使用大语言模型&#xff0c;以及如何以较低成本让大语言模型使用自己的数据来产生个性化输出。 本文偏技术向&#xff0c;读者需要具备一定技术背景&#xff0c;如有不懂之处&#xff0c;欢迎留言交流…

ARM——驱动——inmod加载内核模块

在上一篇文章的代码上添加出错处理 #include <linux/init.h> // 包含初始化宏和函数 #include <linux/kernel.h> // 包含内核函数和变量 #include <linux/fs.h> // 包含文件操作的结构和函数 #include <linux/kdev_t.h> /…