Kafka -- 日志存储
- 日志文件目录
- 日志索引
- 偏移量索引
- 时间戳索引
- 日志清理
- 日志删除
- 基于时间
- 基于日志大小
- 基于日志起始偏移量
- 日志压缩
日志文件目录
Kafka 中的消息以主题为单位进行基本归类,而每个主题又可以划分为一个或者多个分区。在不考虑多副本的情况下,每个分区对应一个日志 Log。为防止日志过大,Kafka 又引入了日志分段 LogSegment 的概念,即将大的日志文件均分为多个较小的文件,便于消息的维护和清理。
Log 在物理上以文件夹的形式存在,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能存在的其他功能文件,如下图所示:
向 Log 中追加消息是按顺序写入的,只有最后一个 LogSegment 才能执行写入操作,因此最新的一个 LogSegment 也称为 activeSegment。随着消息不断的写入,当 activeSegment 满足一定限制条件时,就会创建新的 activeSegment。LogSegment 的切分包含以下几个条件:
-
当前 LogSegment 问价文件的大小超过了 broker 端参数
log.segment.bytes
配置的值,该参数默认 1GB; -
当前 LogSegment 中消息的最大时间戳与当前系统时间的时间戳的差值大于
log.roll.hours
或log.roll.ms
的值(若同时配置则后者优先级高)。默认情况下只配置了后者,其值为 168(7 天); -
偏移量索引文件或时间戳索引文件的大小达到 broker 端参数
log.index.size.max.bytes
的值,该参数默认为 10 MB; -
新增消息的偏移量与当前 LogSegment 的偏移量之间的差值大于
Integer.MAX_VALUE
,即要追加消息的偏移量无法转换为相对偏移量;
为了便于消息的检索,每个 LogSegment 都对应着两个索引文件:偏移量索引文件(以.index为后缀)和时间戳索引文件(以.timeindex为后缀)。另外,每个 LogSegment 都有一个基准偏移量 baseOffset,用于表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整型数,日志以及索引文件都是以基准偏移量命名的,名称长度固定为 20 位,不足的位数以 0 补充。查看 LogSegment 的内容如下所示:
日志索引
每个日志分段文件对应了两个索引文件,用于提高查找消息的效率:
-
偏移量索引用于建立消息偏移量 offset 到物理地址之间的映射关系;
-
时间戳索引根据指定的时间戳来查找对应的偏移量信息;
Kafka 中的索引以稀疏索引的方式来构造消息的索引,每当写入一定量的消息时,偏移量索引和时间戳索引则分别增加一个偏移量索引项和时间戳索引项。该值由 broker 端参数log.index.interval.bytes
指定,默认为 4096,即 4KB。
偏移量索引
偏移量索引的格式如下:
relativeOffset | position |
---|---|
4B | 4B |
每个索引项占据 8 个字节,分为两部分:
-
relativeOffset:相对偏移量,表示消息当对于 baseOffset 的偏移量,占用 4 个字节。当前索引的文件名即 baseOffset;
-
position:物理地址,即消息在日志分段文件中对应的物理地址;
消息的绝对偏移量 offset 占用 8 个字节,此处采用占据 4 个字节的相对偏移量以减少索引文件占据的空间,relativeOffset = offset - baseOffset。
索引和日志的对应关系如下所示:
倘若存在下图所示的几个分段日志文件:
若要查找偏移量为 268 的消息:
-
首先需要定位到 baseOffset 为 251 的日志分段。Kafka 的每一个日志对象中都采用了 ConcurrentSkipListMap (跳跃表)来存储各个日志分段,每个日志分段的 baseOffset 作为 key 用于快速查找。
-
确定日志分段文件后再计算相对偏移量=268-251=17,利用二分查找快速定位到消息的 position。
-
最后根据 position 定位到日志文件的相应位置查询目标消息。
时间戳索引
时间戳索引的格式如下:
timestamp | relativeOffset |
---|---|
8B | 4B |
每个索引项占据 12 个字节,分为两部分:
-
timestamp:当前日志分段的最大时间戳;
-
relativeOffset:时间戳所对应的消息的相对偏移量;
时间戳索引、偏移量索引与日志的对应关系如下所示:
若要查找目标时间戳为 1526384718288 开始的消息:
-
首先需要依次对比日志分段中的最大时间戳与目标时间戳,直到找到最大时间戳不小于目标时间戳的日志分段文件;
-
确定日志分段文件后利用二分法查找不大于目标时间戳的最大索引项,获取其相对偏移量;
-
在偏移量索引文件中使用二分法查找不大于上一步查到的相对偏移量的最大索引项,获取物理位置 position;
-
在步骤一的日志分段文件中,从上一步查询到的物理位置处开始查找不小于目标时间戳的消息;
日志清理
Kafka 的消息存储在磁盘当中,为控制磁盘占用空间的不断增加,其提供了两种日志清理策略:
-
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段;
-
日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于 key 相同的不同 value 值,只保留最新的版本;
通过 broker 端参数log.cleanup.policy
可以设置日志的清理策略,默认值为 “delete”,即采用删除策略。设置为 “compact” 即采用压缩策略。还可以设置为 “delete,compact” 以同时支持删除和压缩策略。注意,还需要将log.cleaner.enable
设置为 true,默认为 true。
日志清理的细粒度可以控制到主题级别。
日志删除
Kafka 日志管理器中存在一个周期性检测和删除不符合保留条件的日志分段文件的任务,该任务的检测周期可以通过 broker 端参数log.retention.check.interval.ms
来配置,默认值为 300000,即 5 分钟。
日志分段的保留策略有 3 种:基于时间、基于日志大小以及基于日志起始偏移量。
基于时间
在该策略中,日志删除任务会检查当前日志文件中是否存在保留时间超过所设定阈值 retentionMs 以寻找可删除的日志分段文件集合 deletableSegments。如下图:
其中,retentionMs 可以通过 broker 端参数log.retention.hours
、log.retention.minutes
以及log.retention.ms
来配置,各参数优先级按顺序依次增高。Kafka 默认配置log.retention.hours
为 168,即 7 天。
若全部的日志分段都已经过期,则首先会切分出一个新的活跃日志分段 activeSegment,然后再执行删除操作。删除时,首先将 Log 对象维护的日志分段跳跃表中移除待删除的日志分段,以保证不会有线程对这些日志分段执行读取操作。然后再将待删除的日志分段及其索引文件标记“.deleted”的后缀。最后交由一个延迟任务执行文件的删除,该任务的延迟执行时间由file.delete.delay.ms
参数配置,默认值为 1 分钟。
基于日志大小
在该策略中,日志删除任务会检查当前日志分段文件的大小是否超过设定的阈值 retentionSize 以查找可删除的日志分段文件集合 deletableSegments。如下图:
其中,retentionMs 可以通过 broker 端参数log.retention.bytes
来配置,默认值为 -1,表示无穷大。
查询到可删除的日志分段文件集合 deletableSegments 后,其删除过程与基于时间策略的删除过程相同。
基于日志起始偏移量
在该策略中,日志删除任务会检查当前日志分段文件的的下一日志起始分段文件的起始偏移量 baseOffset 是否小于等于 logStartOffset 以查找可删除的日志分段文件集合 deletableSegments。如下图:
其中,logStartOffset 的值可以通过 DeleteRecordsRequest 请求、日志的清理和截断等操作进行修改。
查询到可删除的日志分段文件集合 deletableSegments 后,其删除过程与基于时间策略的删除过程相同。
日志压缩
日志压缩是另外一种清理过时数据的方式,其对于具有相同 key 的不同 value,只保留 key 中最新的 value 值,如下图所示。若应用只关心 key 对应的最新 value 值,则可以开启日志压缩的功能。
在日志压缩前后,日志分段文件中的每条消息的偏移量与写入时的偏移量保持一致,只是日志压缩后的偏移量不再是连续的。
在 Kafka 日志的存放目录当中,存在着名为“cleaner-offset-checkpoint”的文件,该文件用于记录每个主题中的每个分区中已清理的偏移量。文件中记录的 cleaner checkpoint 可以将日志划分为两部分:已经清理过的 clean 部分和未清理过的 dirty 部分,如下图。
其中,dirty 部分的消息偏移量是逐一递增的,而 clean 部分的消息偏移量是不连续的。firstDirtyOffset 表示 dirty 部分的起始偏移量,firstUncleanableOffset 是 dirty 部分的终止偏移量。activeSegment 不会参与日志压缩的过程。
此外,Kafka 支持通过参数log.cleaner.min.compaction.lag.ms
配置消息被压缩清理前的最小保留时间,默认为 0,即默认情况下 firstUncleanableOffset 等于 activeSegment 的 baseOffset。
注意:日志压缩是针对 key 的,故在使用时应注意每个消息的 key 不为 null。每个 broker 会启动log.cleaner.thread
所配置个数的日志清理线程负责执行清理任务,这些线程会选择污浊率(=dirty日志大小/(clean日志大小+dirty日志大小)
)最大的日志文件进行清理。为防止日志的频繁清理,可以使用参数clean.cleanable.ratio
来限定可进行清理操作的最小污浊率,默认值为 0.5。
在执行清理操作时,Kafka 日志清理线程会使用 SkimpyOffsetMap 对象来构建 key 与 offset 的映射关系表。清理过程需遍历日志文件两次:
-
第一次遍历将每一个 key 和最后出现的 offset 保存在 SkimpyOffsetMap 对象中;
-
第二次遍历判断每一个消息的 offset,将其与消息对应的 kay 在 SkimpyOffsetMap 中存储的值作比较,若前者大于后者则保留,反之进行清理;