kafka数据在服务端时怎么写入的

news2024/12/26 19:02:20

学习背景

接着上篇,我们来聊聊kafka数据在服务端怎么写入的

服务端写入

在介绍服务端的写流程之前,我们先要理解服务端的几个角色之间的关系。

假设我们有一个由3个broker组成的kafka集群,我们在这个集群上创建一个topic叫做shitu-topic,他有10个分区,每个分区有3个副本。那么partition和broker的关系假设如下。

kafka partition关系

因为每个partition有3个副本,所以每个partition的副本都会均匀的分布在这三台机器上,我们取shitu-topic-0的副本来观察。

在三个broker上,每个broker的log存储目录都有一个shitu-topic-0目录,我们可以成为shitu-topic-0分区,但是同一个时间,只有broker-0上的leader副本对外提供服务,broker-1和broker-2需要去到broker-0上同步消息。在shitu-topic-0目录下就是存储的实际的日志文件。日志文件里包含三个主要的文件内容.log文件存储实际的消息,.index文件存储索引,.timeindex文件存储时间索引。我们把这三个文件合称为一个logsegment日志段,每个log文件只要超过1G就会产生一个新的段文件。日志段文件的命名是以当前段内第一条消息的offset来命名的,这里因为是新创建的topic,第一条消息是0,所以都是0。因为消息是顺序写入的,所以只有最后一个日志段是激活的我们称为active segemnt,活跃段。比如这里活跃段就是00000000000020123000开头的段。

kafka leader-flower关系

研究消息的写入,就是研究这些文件时怎么产生的,让我们来看一下段文件里每个文件的组织格式。

写入文件

log文件

.log文件存储实际的写入日志,也就是实际的数据存储位置。kafka的log文件存储格式经过了3次变化,目前使用的日志格式称为V2版本,我们取这个版本的日志格式来做讲解。

log文件格式

上图左侧显示的是log文件的格式,我们把log文件内存储的的消息集合称为record batch,而每条消息我们称为一条record,每条record的格式如右边所示。record batch内的字段主要记录的整个log文件的全局属性,比如log文件的起始偏移量,文件长度,epoch,时间戳等等,不做详细解释,也不是重点。

我们说一下每条消息的格式,我们知道每条消息除了实际的消息内容value外,伴随着每条消息的产生,还会产生这条消息的额外附带的信息,比如消息的偏移量,offsset,时间戳timestamp等等。kafka在设计消息的存储时花了很大的心思。

这里我解释一下varint,varlong类型,简单的说,就是可变长的类型。比如一条消息的偏移量是int存储容量是4字节,比如存储10这个偏移量,虽然前面大部分是0,但是实际存储还是需要4字节。而varint则可以根据数据的范围选择合适的存储,比如还是10,那么实际记录这个值1个字节就够了。这样,当写入消息时,比如写入2条消息,偏移量分别是10和11,如果分别存储这两个偏移量,需要

2 * 4B = 8b

而如果使用varint存储,则只需要

2 * 1B + 4B(基础偏移量) = 6B

这里如果不是2条消息,而是10000条消息,那么这个优化就会非常有用。kafka这么做是为了尽最大的可能使用存储空间。当然除了数据格式上的优化,kafka还对数据进行了压缩,也就是records是可以配置不同的压缩算法进行压缩的,比如ZIP。

index文件

.index记录偏移量到实际消息的映射关系。一个很简单的述求,我们想知道某个偏移量的日志的内容,那么我们就需要一种能根据偏移量定位到消息的格式。

index文件的格式由相对偏移量realtive offset和物理偏移量position组成。当一条消息写入时,根据消息的偏移量计算出这条消息的相对偏移量,比如写入的是20123025这条消息,那么用20123025-20123000 = 25得到相对偏移量25,再记录下这消息的起始物理地址1024,即可组成对这条消息的索引。需要·注意的是,这里的索引是稀疏索引,也就是不是每条消息都会产生索引,而是每隔一些消息产生索引,这样能减少索引的文件大小。

每一条索引的需要4B的相对偏偏移量和4B的物理地址偏移量,一共8B,kafka的服务端在设置index文件最大大小时要求index文件必须是索引项的整数倍,如果不是,则会自动转换成最接近的整数倍的数字。

index文件

大家这里肯定很好奇那么怎么利用相对偏移量来查找消息,我们解释一下,其实对消息的查找可以概述为根据二分法查找。比如想要查找20123050这条偏移量的消息,先根据这个偏移量,去到我们当前副本的segement集合中根据segement的起始偏移量找到对应的segement,所有的segement的信息是根据相对偏移量以跳表的形式记录的。找到的对应的segement后先计算出相对偏移量20123050-20123000 = 50,然后根据50这个相对偏移量,我们去到相对偏量数组里,使用二分查找找到[20,75]这个相对偏移量范围,那么我们可以在log文件里从1024字节开始,逐条消息的解析,并计算出消息的偏移量是不是50,直到2147字节这个结束的位置为止。如果能找到,说明消息在本partition内,不能我们再换另外的partition查找。

timeindex文件

timeidnex记录时间戳到实际消息的映射关系,我们介绍了index文件的格式,再来理解timeindex文件的格式就容易多了。timeindex文件和index文件的格式类似,由时间戳相对偏移量和消息相对偏移量组成。时间戳相对偏移量根据消息的写入时间来计算,比如写入时间是1733001000,用这个写入的时间减去timeindex文件的起始时间1733000000得到1000这个相对时间戳偏移量。

timeindex文件

timeindex文件的查找我们就不说了,大家可以参考index文件。需要注意一点timeindex文件的时间戳是可以设置的,虽然一般kafka服务端会采取自动设置消息写入时间的配置,即log.message.timestamp.type=LogAppendTime,这种情况下因为时间戳由服务器端设置,能够保证时间戳递增。但是如果服务端设置的是CreateTime,并且producer自己设置了消息的生产时间,那么有可能造成timeIndex的写入失败,因为timeindex要求写入的时间必须是递增的。如果不递增,则拒绝本次写入。还有就是,timeindex文件和index文件虽然都是索引,但是他们并不是每条索引项一一对应的,大家从图中也能看出来。

根据timeindex查找对应消息的过程也和index文件的查找类似,不过因为timeindex本身是根据时间戳来查找,所以会有一步先查找每个timeindex文件的最大时间戳,直到找到一个大于查找时间并且最接近查找时间的timeindex文件。这里有点绕,举个例子,第一个timeindex文件的最大时间戳10000,第二个timeindex文件最大时间戳23000,第三个timeindex文件最大时间戳50000,要查找时间戳为15000的消息,那么因为timeindex文件的时间戳是顺序递增的,很明显,第三个文件的消息都是在15000之后产生的,第一个文件的消息都是在15000之前产生的,那么理所应当的,正好拥有大于15000的时间戳23000的第二个文件理论上应该包含15000这个时间戳写入的消息,所以找到第二个文件。找到对应的文件后再去到到对应的这个timeindex文件根据时间偏移量索引找到这个对应的消息(找不到就换partition)。

写入过程

介绍完毕实际的文件内容,我们再来归纳一下数据的写入过程。这里不会介绍副本之间的同步的问题,只介绍在leader副本上数据的写入。

当消息通过client发送到broker上时,broker根据消息的topic找到这个topic的leadder副本。leadter副本根据消息的信息计算出消息归属的parititon。找到parititon后根据偏移量设置计算出消息的偏移量和时间戳,再找到对应的active segement,在index文件中追加消息,并根据需要决定是否写入index文件和timeindex文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252045.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Rook入门:打造云原生Ceph存储的全面学习路径(上)

文章目录 一.Rook简介二.Rook与Ceph架构2.1 Rook结构体系2.2 Rook包含组件2.3 Rook与kubernetes结合的架构图如下2.4 ceph特点2.5 ceph架构2.6 ceph组件 三.Rook部署Ceph集群3.1 部署条件3.2 获取rook最新版本3.3 rook资源文件目录结构3.4 部署Rook/CRD/Ceph集群3.5 查看rook部…

[STM32] ADC 模数转换器 (十)

文章目录 1.ADC概述1.1 转换模式(规则组)1.2 数据对齐1.3 转换时间1.4 校准 2.代码步骤 STM32F103C8T6的12位逐次逼近型ADC的工作原理,包括转换模式、数据对齐、转换时间、校准以及程序配置流程,同时涵盖了关键的库函数和中断管理…

Web3.0安全开发实践:代理合约最佳实践总结

代理模式使智能合约能够升级其逻辑,同时维持其链上地址和状态值。对代理合约的调用会通过delegateCall的方式执行来自逻辑合约的代码,以修改代理合约的状态。 本文将为大家概述代理合约的类型、相关的安全事件和建议,以及使用代理合约的最佳…

第29天 MCU入门

目录 MCU介绍 MCU的组成与作用 电子产品项目开发流程 硬件开发流程 常用元器件初步了解 硬件原理图与PCB板 常见电源符号和名称 电阻 电阻的分类 贴片电阻的封装说明: 色环电阻的计算 贴片电阻阻值计算 上拉电阻与下拉电阻 电容 电容的读数 二极管 LED 灯电路 钳位作…

【人工智能基础05】决策树模型

文章目录 一. 基础内容1. 决策树基本原理1.1. 定义1.2. 表示成条件概率 2. 决策树的训练算法2.1. 划分选择的算法信息增益(ID3 算法)信息增益比(C4.5 算法)基尼指数(CART 算法)举例说明:计算各个…

数据结构与算法(排序算法)

我本将心向明月,奈何明月照沟渠。 排序的概念 1. 排序是指将一组数据,按照特定的顺序进行排列的过程。 2. 这个过程通常是为了使数据更加有序,从而更容易进行搜索、比较或其他操作。 常见的排序算法 插入排序 1. 把待排序的记录&#xff0c…

思科实现网络地址转换(NAT)和访问控制列表(ACL)和动态路由配置并且区分静态路由和动态路由配置。

实验拓扑(分为静态路由和动态路由两种) 静态路由互通 动态路由互通 实验背景 这个是想实现外网与内网的连接跟网络的探讨,最终实现互通以及使用并且在网络地址转换后能使用网络然后再这个基础上再配置访问控制列表和网络地址转换的的学习过程。 实验需了解的知识…

开发一套ERP 第八弹 RUst 插入数据

更全面的报错,方便检查错误在哪里,现代高级语言越来越智能 还是得看下原文档怎么操作的 src 目录为crate 的根目录 想在crate 中模块相互引入需要在 main 中声明,各个模块,然后才能在各个模块中相互引入和使用 原始工程引入,避免直接使用 lib.rs 回合cargo 中的一些 工程管理出…

课程答疑微信小程序设计与实现

私信我获取源码和万字论文,制作不易,感谢点赞支持。 课程答疑微信小程序设计与实现 摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了课程答疑微信小程序设计与实现的开发全过程。通过分析…

【时间之外】IT人求职和创业应知【53】-东莞也转型

目录 新闻一:Freysa挑战赛:人类智慧与策略战胜AI,奖金高达4.7万美元 新闻二:中国生成式AI用户规模突破2.3亿,行业应用广泛 新闻三:2024东莞智能终端新技术推广会圆满举行,聚焦AI与智能终端融…

ARP欺骗-监控网络

kali: 使用arp-scan -l 查看同个局域网 windows arp -a 查看地址的物理地址 192.168.21.2 对应的是00-50-56-f5-d5-f0 攻击利用: 我们要让目标ip的流量经过我的网卡,从网关出去 使用的开启 echo 1 > /proc/sys/net/ipv4/ip_forward 当为0时,我们不转发&…

Cesium 当前位置矩阵的获取

Cesium 位置矩阵的获取 在 3D 图形和地理信息系统(GIS)中,位置矩阵是将地理坐标(如经纬度)转换为世界坐标系的一种重要工具。Cesium 是一个强大的开源 JavaScript 库,用于创建 3D 地球和地图应用。在 Cesi…

网络编程项目1

基于TCP通信控制红色机械臂和蓝色机械臂的运作 1、项目要求&#xff1a;通过w&#xff08;红色臂角度增大&#xff09;s&#xff08;红色臂角度减小&#xff09;d&#xff08;蓝色臂角度增大&#xff09;a&#xff08;蓝色臂角度减小&#xff09; #include <myhead.h> #…

使用GitZip for github插件下载git仓库中的单个文件

背景&#xff1a;git仓库不知道抽什么疯&#xff0c;下载不了单个文件&#xff0c;点击下载没有反应&#xff0c;遂找寻其他方法&#xff0c;在这里简单记录下。 使用GitZip for github插件下载仓库中的单个文件 1、首先在浏览器安装插件&#xff0c;并确保为打开状态。 2、然…

Y20030022基于PHP+MYSQL疫苗预约管理网站的设计与实现 源码 初稿

旅游度假区微信小程序 1.摘要2.研究背景和意义3. 系统功能4.界面展示5.源码获取 1.摘要 疫苗预约管理系统是为了提供一个高效、便捷、安全的平台&#xff0c;方便用户进行疫苗预约接种&#xff0c;并优化疫苗接种的管理工作。通过该系统&#xff0c;用户可以随时随地进行预约操…

C#中判断两个 List<T> 的内容是否相等

ET实现游戏中邮件系统逻辑思路&#xff08;服务端&#xff09;_游戏邮件系统设计-CSDN博客 场景&#xff1a;今天遇到一个BUG&#xff0c;在服务器重启的时候&#xff08;体验服&#xff09;&#xff0c;玩家之前接收的邮件又重新接收了一次&#xff0c;但是两封邮件的ID是不同…

算法编程题-优势洗牌

算法编程题-优势洗牌 原题描述方法一、排序二分查找思路简述代码实现复杂度分析 方法二、红黑树思路简述代码实现复杂度分析 方法三、贪心思路简述代码实现复杂度分析 摘要&#xff1a;本文将对LeetCode原题优势洗牌进行介绍&#xff0c;从最容易想到的方法开始&#xff0c;循序…

【从零开始的LeetCode-算法】3264. K 次乘运算后的最终数组 I

给你一个整数数组 nums &#xff0c;一个整数 k 和一个整数 multiplier 。 你需要对 nums 执行 k 次操作&#xff0c;每次操作中&#xff1a; 找到 nums 中的 最小 值 x &#xff0c;如果存在多个最小值&#xff0c;选择最 前面 的一个。将 x 替换为 x * multiplier 。 请你…

ERROR in [eslint] Invalid Options ‘extensions‘ has been removed.

看着这个报错 感觉是版本不对引起的 ERROR in [eslint] Invalid Options: - Unknown options: extensions - extensions has been removed. ERROR in Error: Child compilation failed: [eslint] Invalid Options: - Unknown options: extensions - extensions has b…

SpringCloud书单推荐

重新定义SpringCloud实战 129 疯狂SpringCloud微服务架构实战 SpringBootSpringCloud微服务开发实战 点餐系统 SpringCloud微服务架构实战派 日访问量3000W平台 SpringCloud Alibaba微服务原理与实战 Spring CloudNginx 极简spring cloud实战 Spring Cloud 微服…