kafka 的内部结构和 kafka 的工作原理

news2024/12/23 20:57:48

基本设置

让我们开始安装kafka。下载最新的 Kafka 版本并解压缩。打开终端并启动 kafka 和 zookeeper。

$ cd $HOME
$ tar -xzf kafka_<version>.tgz
$ cd kafka_<version>
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# open another terminal session and start kafka
$ bin/kafka-server-start.sh config/server.properties

让我们在新的终端选项卡中创建一个主题。

# Open another terminal and create a topic.
$ bin/kafka-topics.sh --create --topic payments --partitions 10 --replication-factor 1 \
 --bootstrap-server localhost:9092

如果您想知道上述命令是如何使用这些参数构造的,那非常简单。照做,bin/kafka-topics.sh --help您将看到所有带有描述的参数。文件夹中存在的所有 shell 实用程序也是如此bin

现在让我们看看幕后发生了什么。

转到/tmp/kafka-logs目录并执行ls我们将看到以下结果。

cleaner-offset-checkpoint        payments-0    payments-3    payments-6     payments-9
log-start-offset-checkpoint      payments-1    payments-4    payments-7     recovery-point-offset-checkpoint
meta.properties                  payments-2    payments-5    payments-8     replication-offset-checkpoint

/tmp/kafka-logs是kafka存储数据的默认目录。config/server.properties我们可以将它配置到kafka 和config/zookeeper.propertieszookeeper的不同目录。

恢复点偏移检查点

kafka 代理在内部使用此文件来跟踪刷新到磁盘的日志数量。文件的格式是这样的。

<version>
<total entries>
<topic name> <partition> offset

复制偏移检查点

该文件由 kafka 代理在内部使用,用于跟踪复制到集群中所有代理的日志数量。recovery-point-offset-checkpoint该文件的格式与上述文件相同。

主题和分区

正如我们从上面的结果中看到的,payments-0payments-1payments-10是文件系统中的目录分区。正如我在之前的博文中强调的那样,主题是 kafka 中的一个逻辑概念。它在物理上不存在,只有分区存在。主题是所有分区的逻辑分组。

Producer

现在,让我们使用以下命令为主题生成一些消息。

$ cd $HOME/kafka
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic payments
> hello
> world
> hello world
> hey there!

我们就该主题制作了四条消息。让我们看看它们是如何存储在文件系统中的。很难找出消息去了哪个分区,因为 kafka 使用循环算法将数据分发到分区。简单的方法是找到所有分区(目录)的大小并选择最大的。

$ cd /tmp/kafka-logs
$ du -hs *
8.0K    payments-0
8.0K    payments-1
 12K    payments-2
8.0K    payments-3
 12K    payments-4
8.0K    payments-5
8.0K    payments-6
 12K    payments-7
8.0K    payments-8
 12K    payments-9

正如我们从上面的代码片段中看到的那样,我们的消息进入了分区 2、4、7 和 9。让我们看看每个分区中有什么。

$ ls payments-7

00000000000000000000.index     00000000000000000000.log
00000000000000000000.timeindex leader-epoch-checkpoint
partition.metadata
$ cat 00000000000000000000.log
=
��Mr���Mr����������������
world%
$ cat partition.metadata
version: 0
topic_id: tbuB6k_uRsuEE03FsechjA
$ cat leader-epoch-checkpoint
0
1
0 0
$ cat 00000000000000000000.index
$ cat 00000000000000000000.timeindex

分区元数据

partition.metadata文件包含一个version和一个topic_id。此主题 ID 对于所有分区都是相同的。

日志文件

这是生产者写入的数据以二进制格式存储的地方。下面我们尝试使用kafka提供的命令行工具来查看这些文件的内容。

$ bin/kafka-dump-log.sh --files data/kafka/payments-7/00000000000000000000.log,data/kafka/payments-7/00000000000000000000.index --print-data-log

Dumping data/kafka/payments-7/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0
CreateTime: 1672041637310 size: 73 magic: 2 compresscodec: none crc: 456919687 isvalid: true | offset: 0
CreateTime: 1672041637310 keySize: -1 valueSize: 5 sequence: -1 headerKeys: [] payload: world

除了一些属性外,以上输出的解释是不言自明的。payload是推送到kafka的实际数据。offset告诉当前消息离零索引有多远。producerIdproduerEpoch用于交付保证语义。我们将在以后的博文中讨论它们。我们将在下面了解.index.timeindex文件。

分区键

我们了解到,kafka 以循环方式将数据分发到分区。但是,如果我们想发送按键分组的数据怎么办?这就是分区键的用武之地。当我们将数据与分区键一起发送时,kafka 将它们放在一个分区中。kafka是如何找到partition key的?它使用计算hash(partition_key) % number_of_partitions。如果不存在分区键,则它使用循环算法。

我们可能想知道,分区键的用例是什么?Kafka 只保证分区级别的消息排序,而不是主题级别。分区键的应用是为了确保消息跨所有分区的顺序。

让我们看看它是如何工作的。让我们生成一些消息。

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic payments --property parse.key=true --property key.separator=|
> lokesh1729|{"message": "lokesh1729 : order placed"}
> lokesh1729|{"message": "lokeh1729 : logged in"}
> lokesh1729|{"message": "lokesh1729 : logged out"}
> lokesh1729|{"message": "lokesh1729 : payment success"}

parse.key告诉 kafka 通过分隔符解析密钥。默认情况下key.separator设置为选项卡,我们重写为管道。

让我们使用相同的命令查看数据kafka-dump-log。我们需要在所有 10 个分区中执行命令来找到分区,因为我们不知道它去了哪个分区。

$ $ bin/kafka-dump-log.sh --files data/kafka/payments-7/00000000000000000000.log,data/kafka/payments-7/00000000000000000000.index --print-data-log
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 147 CreateTime: 1672057287522 size: 118 magic: 2 compresscodec: none crc: 2961270358
isvalid: true | offset: 2 CreateTime: 1672057287522 keySize: 10 valueSize: 40 sequence: -1 headerKeys: [] key: lokesh1729
payload: {"message": "lokesh1729 : order placed"}

baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 265 CreateTime: 1672057301944 size: 114 magic: 2 compresscodec: none crc: 204260463
isvalid: true | offset: 3 CreateTime: 1672057301944 keySize: 10 valueSize: 36 sequence: -1 headerKeys: [] key: lokesh1729
payload: {"message": "lokeh1729 : logged in"}

baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 379 CreateTime: 1672057311110 size: 116 magic: 2 compresscodec: none crc: 419761401
isvalid: true | offset: 4 CreateTime: 1672057311110 keySize: 10 valueSize: 38 sequence: -1 headerKeys: [] key: lokesh1729 payload: {"message": "lokesh1729 : logged out"}

baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 495 CreateTime: 1672057327354 size: 121 magic: 2 compresscodec: none crc: 177029556
isvalid: true | offset: 5 CreateTime: 1672057327354 keySize: 10 valueSize: 43 sequence: -1 headerKeys: [] key: lokesh1729 payload: {"message": "lokesh1729 : payment success"}

正如我们从上面的日志中看到的,所有带有键的消息都lokesh1729去了同一个分区,即分区 7。

索引和时间索引文件

让我们使用此脚本生成更多消息并使用上述命令转储数据。

$ bin/kafka-dump-log.sh --files data/kafka/payments-8/00000000000000000000.log,data/kafka/payments-8/00000000000000000000.index --print-data-log
Dumping data/kafka/payments-8/00000000000000000000.index
offset: 33 position: 4482
offset: 68 position: 9213
offset: 100 position: 13572
offset: 142 position: 18800
offset: 175 position: 23042
offset: 214 position: 27777
offset: 248 position: 32165
offset: 279 position: 36665
offset: 313 position: 40872
offset: 344 position: 45005
offset: 389 position: 49849
offset: 422 position: 54287
offset: 448 position: 58402
offset: 485 position: 62533

正如我们从上面的输出中看到的,索引文件存储了偏移量及其在文件中的位置.log。为什么需要它?我们知道消费者是顺序处理消息的。当消费者请求消息时,kafka 需要从日志中获取它,即它需要执行磁盘 I/O。想象一下,kafka 逐行读取每个日志文件以找到偏移量。它需要O(n)(其中 n 是文件中的行数)磁盘 I/O 的时间和延迟。当日志文件达到千兆字节大小时,它将成为瓶颈。因此,为了优化它,kafka 将偏移量存储到文件中的位置映射.index,这样如果消费者要求任意偏移量,它只需.index及时对文件进行二进制搜索O(log n),然后转到.log文件并再次执行二进制搜索。

让我们举个例子,假设消费者正在读取第 190 个偏移量。首先,kafka broker 读取索引文件(参考上面的日志)并进行二分查找,要么找到确切的偏移量,要么找到最接近的偏移量。在这种情况下,它发现偏移量为 175,其位置为 23042。然后,它转到文件.log并再次执行二进制搜索,因为该.log文件是按偏移量升序存储的仅追加数据结构。

现在,让我们看一下.timeindex文件。让我们使用以下命令转储文件。

$ bin/kafka-dump-log.sh --files data/kafka/payments-8/00000000000000000000.timeindex --print-data-log

Dumping data/kafka/payments-8/00000000000000000000.timeindex
timestamp: 1672131856604 offset: 33
timestamp: 1672131856661 offset: 68
timestamp: 1672131856701 offset: 100
timestamp: 1672131856738 offset: 142
timestamp: 1672131856772 offset: 175
timestamp: 1672131856816 offset: 213
timestamp: 1672131856862 offset: 247
timestamp: 1672131856901 offset: 279
timestamp: 1672131856930 offset: 312
timestamp: 1672131856981 offset: 344
timestamp: 1672131857029 offset: 388
timestamp: 1672131857076 offset: 419
timestamp: 1672131857102 offset: 448
timestamp: 1672131857147 offset: 484
timestamp: 1672131857185 offset: 517
timestamp: 1672131857239 offset: 547

从上面的结果我们可以看出,.timeindex文件中存储了纪元时间戳和文件中偏移量的映射关系.index。当消费者想要根据时间戳重放事件时,kafka首先通过对文件进行二分查找找到偏移量.timeindex,找到偏移量,通过对文件进行二分查找找到位置.index

表示 .index 和 .timeindex 文件如何在 kafka 中工作的图像

消费者

让我们使用以下命令启动消费者

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic payments --group payments-consumer --from-beginning
{"message": "lokesh1729 : order placed"}
{"message": "lokeh1729 : logged in"}
{"message": "lokesh1729 : logged out"}
{"message": "lokesh1729 : payment success"}

请注意,--from-beginning参数用于从头开始读取。如果不使用,消费者读取最新的消息,即消费者启动后产生的消息。

现在,让我们看一下文件系统。我们可以观察到将创建名称为 … 的新__consumer_offsets-0文件__consumer_offsets-1__consumer_offsets-49。Kafka 将每个消费者偏移量的状态存储在一个名为__consumer_offsets默认分区大小为 50 的主题中。如果我们查看文件夹中的内容,将会出现与payments我们在上面看到的主题中相同的文件。

一张描绘kafka broker和consumer之间交互的图片

正如我们从上图中看到的,消费者轮询记录并在处理完成时提交偏移量。Kafka 非常灵活,我们可以配置在单个轮询中获取多少条记录、自动提交间隔等…我们将在单独的博客文章中讨论所有这些配置。

当消费者提交偏移量时,它会发送主题名称、分区和偏移量信息。然后,代理使用它来构造键 as<consumer_group_name>, <topic>, <partition>和值 as<offset>,<partition_leader_epoch>,<metadata>,<timestamp>并将其存储在__consumer_offsets主题中。

当消费者崩溃或重启时,它向kafka broker发送请求,broker__consumer_offsets通过doing找到分区hash(<consumer_group_name>, <topic>, <partition> ) % 50并获取最新的偏移量并将其返回给消费者。

[磁盘 I/O 优化

Kafka 使用硬盘作为其主要数据存储。我们知道磁盘 I/O 比主存慢。因此,我们可能想知道 kafka 是如何在高吞吐量下实现低延迟的。让我们深入研究它。

  1. 顺序磁盘读取比随机内存访问更快。现代操作系统提供以多个块的形式从磁盘读取数据的功能。
  2. 现代操作系统使用空闲主内存进行磁盘缓存,并通过此缓存转移磁盘 I/O。
  3. 依赖磁盘缓存比主内存更优化,因为即使服务崩溃或重新启动,磁盘缓存也会保持温暖。
  4. Kafka 使用索引文件来加快访问速度。我们已经在上面讨论过它们。
  5. Kafka 批处理磁盘写入。

以下是文件中的示例日志.log。让我们剖析一下。

baseOffset- 开始的起始偏移量

lastOffset- 不言自明

count- 批次中的消息总数

CreateTime- 创建日期的纪元时间戳

size- 批处理中消息的总大小(以字节为单位)

baseOffset: 1992 lastOffset: 1995 count: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 260309 CreateTime: 1672131859025 size: 474 magic: 2 compresscodec: none crc: 36982599 isvalid: true
| offset: 1992 CreateTime: 1672131859022 keySize: 12 valueSize: 84 sequence: -1 headerKeys: [] key: craigpearson payload: {"username": "craigpearson", "address": "0414 Fischer Rest\nZacharyshire, MN 38196"}
| offset: 1993 CreateTime: 1672131859024 keySize: 11 valueSize: 80 sequence: -1 headerKeys: [] key: gregoryjoel payload: {"username": "gregoryjoel", "address": "827 Nelson Burg\nSherrimouth, OK 49255"}
| offset: 1994 CreateTime: 1672131859025 keySize: 11 valueSize: 83 sequence: -1 headerKeys: [] key: gregoryjoel payload: {"username": "gregoryjoel", "address": "8306 Reed Trail\nFitzgeraldstad, PA 18715"}
| offset: 1995 CreateTime: 1672131859025 keySize: 12 valueSize: 84 sequence

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/595479.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web安全之常见攻防

前言&#xff1a; 在当下&#xff0c;数据安全与个人隐私受到了前所未有的挑战。如何才能更好地保护我们的数据&#xff1f;接下来分析几种常见的攻击的类型以及防御的方法。 一、XSS&#xff08;Cross Site Script&#xff09; 首先了解最常见的 XSS 漏洞&#xff0c;XSS (Cr…

Nginx的网站服务

Nginx网站服务 一、Nginx与apache的差异二、编译安装Nginx服务三、Nginx命令四、向系统添加nginx服务五、nginx配置文件六、http块的配置 一、Nginx与apache的差异 nginx相对于apache的优点&#xff1a; 轻量级&#xff0c;同样是web服务&#xff0c;比apache占用内存更少及资…

chatgpt赋能python:Python为什么运行不出结果?

Python为什么运行不出结果&#xff1f; 简介 Python是一种高级编程语言&#xff0c;可以帮助程序员快速开发软件应用。不过&#xff0c;在使用Python的过程中&#xff0c;你可能会遇到一些麻烦。其中一个常见的问题是Python运行不出结果。 如果你正在遇到这个问题&#xff0…

DAY18_基础加强-XMLDTDschema注解单元测试

目录 1 xml1.1 概述1.2 标签的规则1.3 语法规则1.4 xml解析1.5 DTD约束1.6 schema约束 2 注解2.1 概述2.2 自定义注解2.3 元注解 3 单元测试3.1 概述3.2 特点3.3 使用步骤3.4 相关注解 1 xml 1.1 概述 万维网联盟(W3C)官网&#xff1a;https://www.w3school.com.cn 万维网联盟…

解释公有云、私有云、混合云、边缘云、分布式云

Author:skate Time:2023/06/01 云计算是一种基于互联网的计算模式&#xff0c;它通过网络提供IT资源和服务&#xff0c;包括计算、存储、网络、应用等&#xff0c;以服务的形式向用户提供&#xff0c;用户可以按需获取和使用这些资源和服务&#xff0c;而无需拥有和管理这些资…

chatgpt赋能python:Python主类:一个强大的编程工具

Python主类&#xff1a;一个强大的编程工具 介绍 Python是一种高级编程语言&#xff0c;它已经成为了世界上最流行和广泛使用的编程语言之一。Python在各种领域都有着广泛的应用&#xff0c;包括科学计算、数据分析、网络编程、机器学习等等。 虽然Python具有许多有用的功能…

iPhone技巧之「合并PDF文档」

有时候我们在手机上需要将多个PDF文档合并为一个&#xff0c;方便发送或者观看。 如果电脑不在身边&#xff0c;或只想直接在手机上合并PDF&#xff0c;是不是需要下载什么新的app或者工具呢&#xff1f;答案是不需要的&#xff0c;如果你用的是苹果手机&#xff0c;用苹果自带…

点云数据处理方法的应用PCL函数库为例的设计与实现_kaic

摘 要 在计算机视觉和虚拟现实技术的发展过程中&#xff0c;点云数据已成为主要的三维数据表达形式。将点云数据转换成灰值图时&#xff0c;测算每一个像素点周边领域的点云&#xff0c;但是其效率精密度比较低。文中探讨了点云数据的多视图拼凑和滤波处理&#xff0c;明确提出…

网络编程知识点总结(1)

TCP/UDP对比 1.TCP面向连接 (如打电话要先拨号建立连接);UDP是无连接的&#xff0c;即发送数据之前不需要建立连接 2.TCP提供可靠的服务。也就是说&#xff0c;通过TCP连接传送的数据&#xff0c;无差错&#xff0c;不丢失&#xff0c;不重复&#xff0c;且按序到达;UDP尽最大…

【蓝桥刷题】备战国赛——区间修改、区间查询

蓝桥杯线段树模板题——区间修改、区间查询 &#x1f680; 每日一题&#xff0c;冲刺国赛 &#x1f680; 题目导航&#xff1a; 区间修改、区间查询 &#x1f387;思路&#xff1a;线段树 &#x1f531;思路分析&#xff1a; 本题涉及到了对区间操作的问题&#xff0c;因此&a…

《商用密码应用与安全性评估》第四章密码应用安全性评估实施要点4.2密码应用基本要求与实现要点

4.2.1 标准介绍 2018年2月8日&#xff0c;GM/T0054-2018《信息系统密码应用基本要求》由国家密码 管理局发布并实施。 ① 总体要求规定了密码算法、密码技术、密码产品和密码服务应当符合商用密码管理的相关规定&#xff0c;满足标准规范的相关要求&#xff0c;即合规性。 ②密…

校园综合能效管理平台建设的意义

摘要&#xff1a;为响应国家绿色校园建设的号召&#xff0c;切实提高高校能源利用水平&#xff0c;推动学校能源资源合理配置&#xff0c;服务学校高质量发展大局&#xff0c;根据教育部印发的《关于开展节能减排学校行动的通知》《关于勤俭节约办教育建设节约型校园的通知》《…

ESP8285 多个bin文件合并烧录

可通过两种方式烧录固件&#xff0c;一种是基于esp-idf开发时&#xff0c;中命令终端执行make flash命令烧录&#xff1b;二是使用ESPFlashDownloadTool工具。 bin文件说明 ESP8285/ESP8266的固件一般包含4个bin文件。 查看各bin文件的路径 以带OTA的固件为例&#xff0c;在…

chatgpt赋能python:Python中的乘方操作

Python中的乘方操作 作为一种流行的编程语言&#xff0c;Python内置了许多强大的数学运算工具。其中&#xff0c;乘方操作是一个非常常见的数学操作&#xff0c;它可以快速地计算一个数的任意次幂。本文将介绍Python中乘方操作的用法&#xff0c;并提供了一些相关的示例代码。…

Git服务器集成 · GitHub 服务器(二)

本篇文章旨在分享本人在学习Git时的随笔记&#x1f929; 文章目录 1、注册账号2、创建新的仓库3、本地仓库的基本操作指令4、SSH 免密操作4.1、本地生成 SSH 密钥4.2、集成用户公钥 5、设定全局用户6、创建本地库以远程地址7、新增&#xff0c;提交本地仓库文件8、推送到 GitHu…

某马大数据全套视频

某马大数据 需要的私信&#xff1a;某马大数据 01、阶段一 Python大数据开发基础 01、第一章大数据介绍及开发环境 02、第二章 linux命令 03、第三章 MySQL数据库 04、第四章 excel的使用 05、第五章 kettle的使用 06、第六章 数据分析及可视化 07、第七章 大数据框架与…

修改gd32f305时钟

如题&#xff0c;移植GD32F305芯片驱动&#xff0c;修改时钟的方法。 硬件外部时钟为8MHz&#xff0c;官方demo文件为25MHz&#xff0c;基于此修改时钟配置。 1、选择system_gd32f30x.c并修改 如下图 1.1&#xff09;注意查看 __HXTAL 的时钟是否为硬件电路对应的实际大小。…

2023 华为 Datacom-HCIE 真题题库 11--含解析

单项选择题 1.[试题编号&#xff1a;190685] &#xff08;单选题&#xff09;通过iMasterNCE-Campus部署的虚拟化园区网络场景中&#xff0c;以下关于“添加设备”的描述中&#xff0c;错误的是哪一项&#xff1f; A、IMaster NCE-Campus支持通过设备角色添加设备 B、IMaster …

概率论:样本与总体分布,Z分数与概率

参考书目&#xff1a;《行为科学统计精要》&#xff08;第八版&#xff09;——弗雷德里克J格雷维特 数据及其样本的分布 描述一组数据分布 描述一组样本数据的分布 描述样本数据的均值和整体数据一样&#xff0c;但是样本标准差的公式除以了n-1&#xff0c;这里引入自由度的…

DAY05_Maven

目录 1 Maven1.1 Maven简介 2 Maven安装配置2.1 下载2.2 配置maven的环境变量2.3 MavenSetting.xml文件配置2.4 检测 3 Maven基本使用3.1 Maven常用命令3.2 Maven生命周期3.2.1 default 构建生命周期 4 IDEA配置Maven4.1 IDEA配置 Maven 环境4.2 Maven 坐标详解4.3 IDEA 创建 M…