Kafka服务端日志详解

news2025/1/25 4:35:27

文章目录

  • 服务端日志
    • Topic消息存储方式
      • 主体介绍
      • log文件追加记录消息
      • index和timeindex索引文件
    • 日志文件清理
    • Kafka的文件高效读写机制
      • Kafka的文件结构
      • 顺序写磁盘
      • 零拷贝
    • 合理配置刷盘频率
    • 客户端消费进度管理

服务端日志

Kafka的日志信息是通过conf/server.properties文件中的log.dirs配置项来配置的

在这里插入图片描述



Topic消息存储方式

主体介绍

进入到上方配置文件中指定的目录下查看,topic的数据都是以topic名 + partition下标的命名方式保存的

在这里插入图片描述



我们现在进入其中一个partition目录

在这里插入图片描述



  • .index

    日志索引文件,采用的稀疏索引提高查询效率,记录的是消息偏移量offset 和 该消息在.log文件中的位置position

  • .log

    消息保存在.log文件中,是以二进制的方式保存的。可以通过.index和.timeindex两个索引文件加速查找消息

    文件大小是(log.segment.bytes参数设定)默认1GB,新文件名是第一条消息的offset.log

  • .timeindex

    日志索引文件,采用的也是稀疏索引结构,每隔一段时间保存一条索引记录,记录的是消息产生的时间戳timestamp和消息偏移量offset

  • .snapshot

    快照,可以理解为一个备份文件

  • leader-epoch-checkpoint

    Leader partition新上任就会往该文件中写入一个epoch,来保证HW的一致性

  • partition.metadata

    保存着该partition对应的topic_id



最后两个文件可以直接查看,就记录的一些简单的信息

[root@worker1 disTopic-0]# cat leader-epoch-checkpoint 
0
3
5 0
9 6
14 18

[root@worker1 disTopic-0]# cat partition.metadata 
version: 0
topic_id: rDUdZBO7RH2GNPgdRXk7Tw



而前三个文件是以二进制的方式保存的,需要通过Kafka提供的kafka-dump-log.sh来查看文件内容,如下所示

# 保存的是消息产生的时间戳 和 消息offset
[root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1723254597947 offset: 51
timestamp: 1723254598224 offset: 102
timestamp: 1723254598501 offset: 152
timestamp: 1723254598816 offset: 201
timestamp: 1723254599085 offset: 250

# 保存是的消息的offset 和 该消息在.log文件中对应的position位置
[root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 51 position: 4160
offset: 102 position: 8324
offset: 152 position: 12428
offset: 201 position: 16544
offset: 250 position: 20660
offset: 299 position: 24776


# 每一条记录保存的是一批消息信息,只不过我下面刚好都只保存一条消息
[root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.log
baseOffset: 50 lastOffset: 50 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4080 CreateTime: 1723254597907 size: 80 magic: 2 compresscodec: none crc: 672861010 isvalid: true
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4160 CreateTime: 1723254597947 size: 80 magic: 2 compresscodec: none crc: 3136762717 isvalid: true
baseOffset: 52 lastOffset: 52 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4240 CreateTime: 1723254597951 size: 80 magic: 2 compresscodec: none crc: 3076149845 isvalid: true



log文件追加记录消息

  • .log文件是以追加的方式写入新的消息日志。position表示写入位置,size表示消息的总长度,通过这两个值就能从一段二进制中获取到一条具体的消息。
  • Kafka中的消息日志,只允许追加,不允许删除和修改。
  • log文件的固定大小是log.segment.bytes参数设定,默认1GB。新创建的文件是以写入第一条消息的offset作为的文件名



index和timeindex索引文件

index和timeindex索引文件目的都是为了加快从log文件中读取消息的效率,如下图所示,

在这里插入图片描述



  • index和timeindex文件是以相对偏移量的方式建立的log消息日志数据索引

    比如说0000.index和 0947.index索引文件中的内容offset都是以0开始计数的,使用的是第一条消息的相对偏移量,而消息绝对偏移量=文件名+相对偏移量

  • index和timeindex文件采用的是类似于数据的跳表,并不是每一条消息都会记录一条索引。

    log.index.interval.bytes决定.log文件中产生多少大小的消息就生成一条index记录 官网 服务端的参数说明

    log.index.interval.bytes
    The interval with which we add an entry to the offset index
    
    Type:	int
    Default:	4096 (4 kibibytes)
    



index文件的作用类似于数据结构中的跳表,他的作用是用来加速查询log文件的效率。而timeindex文件的作用则是用来进行一些跟时间相关的消息处理。比如文件清理。



日志文件清理

Kafka为了防止过多的日志文件给服务器带来过大的压力,他会定期删除过期的log文件。



判断那些日志文件过期了

  • log.retention.check.interval.ms

    定时检测文件是否过期。默认是 300000毫秒,也就是五分钟。 在检查文件是否超时时,是以每个.timeindex中最大的那一条记录为准。

  • log.retention.hours , log.retention.minutes, log.retention.ms 。

    这一组参数表示文件保留多长时间。默认生效的是log.retention.hours,默认值是168小时,也就是7天。如果设置了更高的时间精度,以时间精度最高的配置为准。

官网 服务端的参数说明

# 日志清除程序检查是否有日志符合删除条件的频率(以毫秒为单位)
log.retention.check.interval.ms
Type:	long
Default:	300000 (5 minutes)

# 在删除日志文件之前保留它的小时数(以小时为单位),仅次于log.retention.ms属性
log.retention.hours
Type:	int
Default:	168

# 在删除日志文件之前保留日志文件的分钟数(以分钟为单位),仅次于log.retention.ms属性。如果没有设置,则使用log.retention.hours中的值
log.retention.minutes
Type:	int
Default:	null

# 日志文件删除前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值。如果设置为-1,则不应用时间限制。
log.retention.ms
Type:	long
Default:	null



过期的日志文件如何处理

# 日志清理策略。有两个选项,delete表示删除日志文件。 compact表示压缩日志文件。
log.cleanup.policy
Type:	list
Default:	delete
Valid Values:	[compact, delete]

# 日志删除前的最大容量
# 当log.cleanup.policy选择delete时  当总的日志文件大小超过这个阈值后,就会删除最早的日志文件。默认是-1,表示无限大。
log.retention.bytes
Type:	long
Default:	-1



Kafka的文件高效读写机制

Kafka的文件结构

kafka的数据文件结构可以加速日志文件的读取。

Topic下的多个partition采用的是单独记录日志文件,这样加快了topic下的数据读取

通过.index索引文件的稀疏索引结构,进一步加快日志检索速度。



顺序写磁盘

对每个Log文件,Kafka会提前规划固定的大小,这样在申请文件时,可以提前占据一块连续的磁盘空间。

Kafka的log文件只能以追加的方式往文件的末端添加(这种写入方式称为顺序写)



零拷贝

零拷贝是Linux操作系统提供的一种IO优化机制,而Kafka大量的运用了零拷贝机制来加速文件读写。

零拷贝就是配合内核态的复制机制,减少用户态和内核态之间的内容拷贝

传统的一次硬件IO是这样工作的。如下图所示:

在这里插入图片描述



零拷贝主要有两种实现机制

1、mmap文件映射机制

不再将整个文件复制进用户态,而是用户态只持有一个文件的映射信息,通过这个映射信息控制内核态的文件读写。

java中大量使用该方式 ,可以参考下JDK中的DirectByteBuffer实现机制

适用于文件不超过2G的文件,所以Kafka将日志文件设计成1G

在这里插入图片描述



2、sendfile文件传输机制

用户态连文件索引都不读取,直接向内核态发送一个sendfile指令,让内核态去进行文件拷贝

例如当Consumer要从Broker上poll消息时,Broker不需要对消息进行任何的加工,用户态就只需要往内核态发一个sendfile指令,而不需要有任何的数据拷贝过程。Kafka大量的使用了sendfile机制,用来加速对本地数据文件的读取过程。

在这里插入图片描述

JDK中8中java.nio.channels.FileChannel类提供了transferTo和transferFrom方法,底层就是使用了操作系统的sendfile机制。



合理配置刷盘频率

应用程序读取文件是从内核态的pageCache中读取的,保存文件是最终只能保存到pageCache中,应用程序不能直接操作pageCache。而pageCache中的数据如果还没有来得及刷盘持久化到磁盘,服务器忽然非正常断电,那么pageCache中的数据就会丢失。

应用程序唯一能做的就是频繁的调用OS提供的fsync()通知OS进行刷盘操作,但是则会降低应用的执行性能。所以应用程序需要在数据安全和高性能上做取舍。

Kafka在服务端设计了几个参数,来控制刷盘的频率:

这里可以看到,Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理。

# 多长时间进行一次强制刷盘。默认是Long.MAX。
flush.ms
Type:	long
Default:	9223372036854775807
Valid Values:	[0,...]


# 表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
log.flush.interval.messages
Type:	long
Default:	9223372036854775807
Valid Values:	[1,...]


#当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
log.flush.interval.ms
Type:	long
Default:	null


# 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
log.flush.scheduler.interval.ms
Type:	long
Default:	9223372036854775807



客户端消费进度管理

消费者消费消息的进度被保存在一个名称为__consumer_offsets内置的topic中,该topic默认会创建50个分区partition

在这里插入图片描述



该topic在zookeeper也能查看到相应的信息,只不过zookeeper上只是简单记录了partition的Leader和ISR列表,并没有看见真实消费者的消费进度

在这里插入图片描述



既然这也是一个topic下的partition,我们启动一个消费者来消费其中的消息看看

可以看到下面记录的消息内容就是一个key-value的格式,key为消费者组+topic+partition 而value保存则offset和一些元数据信息

也就是说这里记录了消费者组在某个topic下的partition的消息消费偏移量offset

[root@worker1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

[test,disTopic,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[5], metadata=, commitTimestamp=1723081907174, expireTimestamp=None)
[test,disTopic,3]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[7], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
[test,disTopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[1], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
[test,disTopic,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[4], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
[test,disTopic,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[5], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
[test,disTopic,3]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[7], metadata=, commitTimestamp=1723081907377, expireTimestamp=None)
[test,disTopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[1], metadata=, commitTimestamp=1723081907377, expireTimestamp=None)



而这些Offset数据,其实也是可以被消费者修改的,在之前章节已经演示过消费者如何从指定的位置开始消费消息。而一旦消费者主动调整了Offset,Kafka当中也会更新对应的记录。

另外,这个系统Topic里面的数据是非常重要的,因此Kafka在消费者端也设计了一个参数来控制这个Topic应该从订阅关系中剔除。

public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether internal topics matching a subscribed pattern should " +
    "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2041071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用苹果机连接mac后怎么在电脑上调试苹果手机上页面的元素

问: 用苹果机连接mac后怎么在电脑上调试苹果手机上页面的元素, 使用MAC电脑、iPhone 真机调试 H5页面 回答: 使用MAC电脑、iPhone 真机调试 H5页面 简介 Safari 浏览器设置iPhone 手机设置开始调试 简介 为方便在 H5开发过程中在真实手机调试 H5页面,可进行一下…

Mybatis-springBoot

MyBatis 是一个流行的 Java 持久层框架,它简化了与关系型数据库的交互。通过将 SQL 语句与 Java 代码进行映射,MyBatis 提供了一种方便、灵活的方式来执行数据库操作。它支持动态SQL、缓存机制和插件扩展,使得开发人员能够更高效地编写和管理…

重学我的数据结构

二叉树 1. 遍历 (Traversal) 前序遍历 (Preorder Traversal): 先访问根节点,再访问左子树,最后访问右子树。 void preorderTraversal(Node root) {if (root null) return; System.out.print(root.value " "); preorderTraversal(root.left)…

韦东山瑞士军刀项目之I2C控制ssd 1306 OLED显示屏幕驱动源码分析(硬核)

太硬核了,但即便如此,我也只分析了如何实现ssd 1306的控制命令与显示命令的代码。尽管如此,我也了解了如何实现I2C的字符输出显示。意外收获是知道了ASCII码到底是个什么玩意儿。

python pygame如何实现碰撞检测

前言: 在python中,我们实现两个物品的碰撞检测往往是判断两个物体的x、y坐标是否有重合,根据坐标来进行判断,但是这种判断方式往往不太准确,对于一些透明部分,会出现误判的情况,今天介绍的是一…

古印度的未解之谜——哈拉帕印章文字

关注我们 - 数字罗塞塔计划 - 在之前的文章中,我们知晓了古埃及莎草纸的制作工艺(参见《莎草纸——数千年前的信息记录载体》),也了解了由粘土变为陶片可保存数千年的苏美尔泥板书(参见《泥板书:两河文明传…

暑期破防实录——捡漏腾讯

序 经历了整整三个月的折磨,暑期实习终于尘埃落定。 其实还没收到 offer 的时候,还会想着到时候录用了该怎么大写特写小作文,但真到了这一天,只剩下一种解脱感,一种摆脱了漫长的焦虑与压抑的淡淡喜悦。 或许就像久病…

Java垃圾收集底层算法实现

垃圾收集底层算法实现 三色标记 在并发标记的过程中,因为标记期间应用线程还在继续跑,对象间的引用可能发生变化,多标和漏标的情况就有可能发生。漏标的问题主要引入了三色标记算法来解决。 三色标记算法是把Gc roots可达性分析遍历对象过…

QT翻金币小游戏

目录 QT翻金币小游戏 效果展示 图片 视频 实现代码 main.cpp mymainwindow.h mymainwindow.cpp startscene.h startscene.cpp selectscene.cpp playscene.h playscene.cpp mypushbutton.h mypushbutton.cpp dataconfig.h dataconfig.cpp QT翻金币小游戏 效果展示…

什么是进程?C语言

进程的概念 进程就是执行中的程序,是系统资源分配的最小单位。 进程的内存分配 进程的作用 宏观上是并行的,微观上是串行的 进程的状态 对于基本的操作系统:有三个状态: 就绪态->执行态-> 阻塞态 在LInux中有四种&am…

docker数据卷、资源控制

一、docker数据卷: 1.容器和宿主机之间数据共享----挂载卷----容器内的目录和宿主机的目录进行挂载。实现数据文件共享容器的生命周期有限,一旦重启所有对容器内部文件数据的修改以及保存的数据都会被初始化,所以为了防止数据丢失重要的组件…

前端学习大纲 | 主流前端技术 | 学习路线

需要完整的学习路线的宝子可以点击获取:点击即可获取完整的学习路线 第一阶段(页面还原能力) HTML5、CSS3、Git 第二阶段(专攻 JS 逻辑能力) JavaScript 基础、JavaScript 进阶、JavaScript 高级、ES6 第三阶段&a…

【Java】如何使用jdbc连接并操作MySQL,一文读懂不迷路,小白也能轻松学会

JDBC的原理 JDBC(Java Database Connectivity)是Java提供的用于连接和操作数据库的API。它允许Java应用程序与各种数据库进行交互,以下是JDBC的基本原理: 驱动程序管理:JDBC使用不同的数据库驱动程序来连接不同类型的…

微信小程序--24(列表渲染)

一、wx&#xff1a;for 1.作用 根据指定数组&#xff0c;循环渲染重复的组件结构 2.语法 <view wx:for"{{data中的数据}}"> 索引是&#xff1a;{{index}}, item项是&#xff1a;{{item}}</view> index:表索引item&#xff1a;表当前循环项 …

【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用

文章目录 局域网 LAN广域网 WAN网络中的重要概念IP 地址端口号 认识协议协议分层是什么OSI 七层网络模型TCP/IP 五层网络模型&#xff08;或四层&#xff09;物理层传输层网络层数据链表层应用层网络设备所在分层 封装和分用[站在发送方视角]&#xff08;封装&#xff09;[站在…

新工种,AI商业化变现思路

本文由 ChatMoney团队出品 AI变现&#xff0c;你我都能成为创收高手! 不必是科技大咖&#xff0c;也无需深厚背景&#xff0c;让我们一起探索Chatmoney全能知识库AI的奥秘&#xff0c;轻松步入收益之门! 想象一下&#xff0c;你的智慧和创意通过ChatmoneyAI技术转化为可观的收益…

如何使用 Go 连接 MO

MatrixOne 是一款超融合异构分布式数据库&#xff0c;与 MySQL 高度兼容&#xff0c;通过云原生化和存储、计算、事务分离的架构构建 HSTAP 超融合数据引擎&#xff0c;实现单一数据库系统支持 OLTP、OLAP、流计算等多种业务负载&#xff0c;通过为用户提供一站式超融合数据解决…

【数据结构与算法】最短路径算法

最短路径算法目录 一.什么是最短路径二.最短路径算法的实现1.准备工作2.拆解为子问题——递归 三.完整代码 一.什么是最短路径 顾名思义根据需求,可以获取的最优的路径. 比如说: 我标的数值,就是时间,那么假如我们是A点到D点. 那么我们可以看到有三条路径: A->E->D所花…

AI绘画:一篇文章带你解析Stable Diffusion 原理!

前言 Stable Diffusion原理 1. Stable Diffusion能做什么 直白地说&#xff0c;SD是一个text-to-image模型&#xff0c;通过给定text prompt&#xff08;文本提示词&#xff09;&#xff0c;它可以返回一个匹配文本的图片。 2. Diffusion 模型 Stable Diffusion属于深度学习…

2003-2023年高铁线路信息数据

2003-2023年高铁线路信息数据 1、时间&#xff1a;2003-2023年 2、来源&#xff1a;高铁航线数据库&#xff08;Chinese High-speed Rail and Airline Database&#xff0c;CRAD&#xff09; 3、指标&#xff1a;高铁线路名称、起点名、终点名、开通时间、线路长度(km)、设计…