目录
日志存储设计
1. 日志存储的目录结构
2. 日志内容格式设计
3. 日志索引设计
4. 设计优势
消息写入流程
示例
流程图
消息读取流程
示例
关键设计细节
流程图
日志存储设计
Kafka的日志存储是其高吞吐、持久化能力的核心设计,其结构包含目录组织、消息格式、索引设计三部分。
1. 日志存储的目录结构
- 分区目录:每个Topic分区对应一个目录,命名格式为
<topic>-<partition>
,例如orders-0
。 - Segment文件:每个分区目录下包含多个日志段(Segment),每个段由两个文件组成:
-
- .log文件:存储实际消息(如
00000000000000000000.log
),文件名基于基准位移(Base Offset),即该段第一条消息的Offset。 - .index文件:稀疏索引文件(如
00000000000000000000.index
),记录Offset到物理位置的映射。
- .log文件:存储实际消息(如
- 活跃段(Active Segment):当前正在写入的Segment,文件名格式为
<nextOffset>.log
,例如新段的第一条消息Offset为100,则文件名为00000000000000000100.log
。
源码关键类:
Log
:管理分区的所有Segment(ConcurrentNavigableMap<Long, LogSegment>
)。LogSegment
:封装单个Segment的.log和.index文件操作。
2. 日志内容格式设计
Kafka消息以**批次(RecordBatch)**为单位存储,每个批次包含多条消息,减少I/O开销。单条消息格式如下:
RecordBatch Header:
Base Offset (int64)
Length (int32)
Partition Leader Epoch (int32)
Magic (int8)
CRC (int32)
Attributes (int16)
Last Offset Delta (int32)
First Timestamp (int64)
Max Timestamp (int64)
Producer ID (int64)
Producer Epoch (int16)
Base Sequence (int32)
Records Count (int32)
Record (多条):
Length (varint)
Attributes (int8)
Timestamp Delta (varint)
Offset Delta (varint)
Key (varint bytes)
Value (varint bytes)
Headers (varint array)
特点:
- 紧凑二进制格式:通过变长类型(varint)和位移差值(Delta)压缩空间。
- 批量写入:多个Record打包成RecordBatch,减少网络和磁盘I/O。
- 幂等与事务支持:通过
Producer ID
、Epoch
、Sequence
字段实现。
3. 日志索引设计
- 稀疏索引(Sparse Index):.index文件不记录每条消息的Offset,而是每隔一定消息量(如4KB)记录一个索引项。
- 索引项结构:每个索引项占8字节,包含两个字段:
-
- Relative Offset:相对于基准位移的差值(4字节)。
- Physical Position:对应.log文件中的物理位置(4字节)。
索引查询流程(源码见OffsetIndex
类):
- 根据目标Offset,通过二分查找找到最近的索引项。
- 从.log文件的对应位置开始顺序扫描,直到找到目标消息。
4. 设计优势
- 高效查询:稀疏索引+顺序扫描平衡了索引大小与查询速度。
- 快速扩容:Segment文件按基准Offset分割,易于清理旧数据和扩展新文件。
- 高吞吐:批量写入、页缓存、零拷贝等技术最大化磁盘和网络效率。
通过这种设计,Kafka在保证消息持久化的同时,实现了百万级TPS的吞吐能力。
消息写入流程
示例
生产者发送一条消息{"order_id": 1001}
到Topic orders
的Partition 0。
写入流程:
- 选择分区:根据
Partitioner
确定消息写入orders-0
。 - 追加到活跃段:
-
- Broker将消息封装为RecordBatch,追加到当前活跃段(如
00000000000000001000.log
)。 - 更新对应的索引文件
00000000000000001000.index
(每隔4KB或一定时间写入索引项)。
- Broker将消息封装为RecordBatch,追加到当前活跃段(如
- 刷盘策略:根据
log.flush.interval.messages
或log.flush.interval.ms
决定何时将数据从页缓存刷到磁盘。
源码关键方法:
Log.append()
:处理消息追加。LogSegment.append()
:写入.log文件并更新索引。
流程图
消息读取流程
示例
消费者请求读取Offset为1005的消息。
1. 消费者发送FetchRequest:请求包含目标Topic、Partition和Offset(例如Offset=1005)。
2. Broker验证Offset有效性:
- 检查Offset是否在
LogStartOffset
(日志起始位移)和HighWatermark
(已提交消息的最大位移)之间。 - 若Offset无效(如小于LogStartOffset或大于HighWatermark),返回错误
OFFSET_OUT_OF_RANGE
。
- 定位Segment文件:
-
- Broker根据Offset值,在分区的
Log
对象中通过二分查找找到对应的LogSegment
。 - 具体逻辑:在
LogSegments
(一个有序的ConcurrentNavigableMap
)中调用floorEntry(Offset)
,找到基准Offset ≤ 目标Offset的Segment。 - 示例:Offset=1005 → 找到基准Offset=1000的Segment(文件
00000000000000001000.log
)。
- Broker根据Offset值,在分区的
- 加载索引文件:
-
- 打开对应Segment的
.index
文件(稀疏索引),通过内存映射(MappedByteBuffer
)加载到内存。
- 打开对应Segment的
- 解析Offset并查询索引:
-
- 计算相对Offset:
目标Offset - 基准Offset
(如1005 - 1000 = 5)。 - 在
.index
文件中二分查找最接近且≤相对Offset的索引项。 - 示例:索引项可能为
[4 → 4096]
(相对Offset=4,对应.log文件的物理位置4096字节)。
- 计算相对Offset:
- 定位消息物理位置:
-
- 根据索引项中的物理位置(4096),从
.log
文件的该位置开始顺序扫描。 - 逐条解析消息头中的Offset,直到找到目标Offset=1005的消息。
- 根据索引项中的物理位置(4096),从
- 操作系统缓存与文件缓存:
-
- 页缓存(Page Cache):Kafka依赖操作系统的页缓存机制,
.log
和.index
文件会被缓存到内存,后续读取直接从内存访问,避免磁盘I/O。 - 内存映射(Memory-Mapped Files):索引文件通过
MappedByteBuffer
映射到内存,加速索引查询。
- 页缓存(Page Cache):Kafka依赖操作系统的页缓存机制,
- 返回消息数据:
-
- 将找到的消息数据封装为
FetchResponse
返回给消费者,消息内容可能直接从页缓存中读取(零拷贝优化)
- 将找到的消息数据封装为
关键设计细节
1. Offset解析与索引查询
- 相对Offset计算:
索引文件中存储的是相对于Segment基准Offset的差值(如基准Offset=1000,索引项中的相对Offset=5 → 实际Offset=1005)。 - 稀疏索引优化:
索引文件仅记录部分Offset(如每隔4KB),通过二分查找快速定位到近似位置,再顺序扫描少量数据。
2. 文件定位与缓存机制
- Segment文件定位:
LogSegments
使用ConcurrentNavigableMap
维护所有Segment,floorEntry(Offset)
方法通过跳表(Skip List)快速查找。 - 操作系统缓存:
-
- 页缓存:Kafka的.log文件读写完全依赖操作系统的页缓存,消息读取时直接从内存访问,避免磁盘寻址。
- 内存映射文件:.index文件通过
FileChannel.map()
映射到内存,索引查询几乎无磁盘I/O。
3. 顺序扫描优化
- 批量读取:从.log文件的物理位置开始,按块(如8KB)读取数据,减少小文件I/O次数。
- 零拷贝(Zero-Copy):
消息数据通过FileChannel.transferTo()
直接从页缓存发送到网络Socket,无需经过用户态(源码见FileRecords.writeTo()
)。
流程图