以上是kafka的数据的存储方式。
这些数据可以在服务器集群上对应的文件夹中查看到。
[hexuan@hadoop106 __consumer_offsets-0]$ ll
总用量 8
-rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index
-rw-rw-r--. 1 hexuan hexuan 0 10月 28 22:21 00000000000000000000.log
-rw-rw-r--. 1 hexuan hexuan 10485756 10月 28 22:21 00000000000000000000.timeindex
-rw-rw-r--. 1 hexuan hexuan 8 10月 28 22:21 leader-epoch-checkpoint
-rw-rw-r--. 1 hexuan hexuan 43 10月 28 22:21 partition.metadata
每个文件夹以topic+partition进行命名,更加便于管理和查询检索,因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,所以检索速度必须要快,分块管理是最好的方式。
消费者在检索相应数据的时候会非常的简单。
consumer检索数据的过程。
首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数。
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
代表存储的数据是从0条开始的00000000000000100000.index
00000000000000100000.log
00000000000000100000.timeindex
代表存储的数据是从100000条开始的
所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据
那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容
# 首先创建一个topic_b
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_b --partitions 5 --replication-factor 2
# 然后通过代码随机向不同的分区中分发不同的数据1W条
package com.hainiu.kafka.consumer;
/**
* ClassName : test1
* Package : com.hainiu.kafka.consumer
* Description
*
* @Author HeXua
* @Create 2024/11/3 22:45
* Version 1.0
*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class test1 {
public static void main(String[] args) throws InterruptedException {
Properties pro = new Properties();
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
pro.put(ProducerConfig.RETRIES_CONFIG, 3);
pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
for (int i = 0; i < 10000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");
producer.send(record);
}
producer.close();
}
}
然后去查看log和index中的内容
# kafka查看日志和索引的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx
查看日志.log
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
baseOffset: 606 lastOffset: 1205 count: 600 baseSequence: 606 lastSequence: 1205 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 5149 CreateTime: 1730645208577 size: 4929 magic: 2 compresscodec: snappy crc: 1974998903 isvalid: true
baseOffset: 1206 lastOffset: 1439 count: 234 baseSequence: 1206 lastSequence: 1439 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 10078 CreateTime: 1730645208584 size: 2085 magic: 2 compresscodec: snappy crc: 1665550202 isvalid: true
查看索引.index
内容即:
index索引
offset 第几条 | position 物理偏移量位置,也就是第几个字 |
---|---|
1187 | 5275 |
1767 | 10140 |
2022 | 15097 |
log日志
# 打印日志内容的命令 --print-data-log 打印数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
| offset: 0 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 0 headerKeys: [] key: 14 payload: this is hainiu
| offset: 1 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 1 headerKeys: [] key: 19 payload: this is hainiu
| offset: 2 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 2 headerKeys: [] key: 24 payload: this is hainiu
| offset: 3 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 3 headerKeys: [] key: 26 payload: this is hainiu
可以看到刷写的日志
baseOffset: 0 lastOffset: 605 count: 606
从0 到605 条一次性刷写606条
lastSequence: 605 producerId
刷写事务日志编号,生产者的编号
通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据。
大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引。
官网给出的默认值4KB
一个数据段大小是1G
timeIndex
我们看到在数据中还包含一个timeindex的时间索引
# 查询时间索引
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1730645208577 offset: 1205
timestamp: 1730645208584 offset: 1439
可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算
也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快。