Kafka源码解析之索引

news2024/11/17 22:37:00

Kafka源码解析之索引

索引结构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XNNYeyUG-1687105485417)(C:\Users\jimmy\AppData\Roaming\Typora\typora-user-images\image-20230618220841971.png)]

Kafka有两种类型的索引:

  • TimeIndex: 根据时间戳索引,可以通过时间查找偏移量所在位置,目录下以.timeindex结尾
  • Index: 根据偏移量索引,.index结尾

构建索引时机

由log.index.interval.bytes 参数控制,默认4KB构建一条索引

为什么默认值是4kb呢?这里认为与基于磁盘的读写单位是 block(一般大小为 4KB)还有内存管理与分配的最小单位是4kb有关

def append(largestOffset: Long,
           largestTimestamp: Long,
           shallowOffsetOfMaxTimestamp: Long,
           records: MemoryRecords): Unit = {
  	.....
    // 判断是否写入索引文件
    if (bytesSinceLastIndexEntry > indexIntervalBytes) {
      offsetIndex.append(largestOffset, physicalPosition)
      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
      bytesSinceLastIndexEntry = 0
    }
    bytesSinceLastIndexEntry += records.sizeInBytes
  }
}

在源码的LogSegment.append方法中,会对当前segement写入大小与上次构建索引时大小差值进行判断,如果超过log.index.interval.bytes,会构建timeIndex以及offsetIndex索引

AbstractIndex

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ze80Lt2U-1687105485418)(C:\Users\jimmy\AppData\Roaming\Typora\typora-user-images\image-20230618222246953.png)]

AbstractIndexl类时TimeIndex以及OffsetIndex文件的父类,其中有一个很重要的成员变量 mmap:

protected var mmap: MappedByteBuffer = {
  val newlyCreated = file.createNewFile()
  val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
  try {
    //提前进行文件的创建
    if(newlyCreated) {
      if(maxIndexSize < entrySize)
        throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
      raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
    }

    /* memory-map the file */
    _length = raf.length()
    val idx = {
      if (writable)
        raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
      else
        raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
    }
    /* set the position in the index for the next entry */
    if(newlyCreated)
      idx.position(0)
    else
      // if this is a pre-existing index, assume it is valid and set position to last entry
      idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
    idx
  } finally {
    CoreUtils.swallow(raf.close(), AbstractIndex)
  }
}

这里用到了Memory Mapped Files即内存映射

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XyfKUiva-1687105485418)(C:\Users\jimmy\AppData\Roaming\Typora\typora-user-images\image-20230618222936451.png)]

内存映射mmap参考文章:https://zhuanlan.zhihu.com/p/507907660

mmap同样是一种零拷贝的技术,常规的文件操作需要从磁盘到页缓存再到用户主存的两次数据拷贝。而mmap操控文件,只需要从磁盘到用户主存的一次数据拷贝过程,其实也是一种通过磁盘空间代替内存的操作,提供进程间共享内存及相互通信的方式。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JdjQsreD-1687105485419)(C:\Users\jimmy\AppData\Roaming\Typora\typora-user-images\image-20230618235006202.png)]

二分查找与页缓存

Kafka根据索引文件查找offset

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
  // 判断index文件是否为空
  if(_entries == 0)
    return (-1, -1)

  def binarySearch(begin: Int, end: Int) : (Int, Int) = {
    // 二分查找开始
    var lo = begin
    var hi = end
    while(lo < hi) {
      val mid = (lo + hi + 1) >>> 1
      //parseEntry方法在timeindex与index里有不同实现方式
      val found = parseEntry(idx, mid)
      val compareResult = compareIndexEntry(found, target, searchEntity)
      if(compareResult > 0)
        hi = mid - 1
      else if(compareResult < 0)
        lo = mid
      else
        return (mid, mid)
    }
     //如果lo等于最后一条,那么就返回-1
    (lo, if (lo == _entries - 1) -1 else lo + 1)
  }

  val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
  // 冷热区判断
  if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
    return binarySearch(firstHotEntry, _entries - 1)
  }

  // check if the target offset is smaller than the least offset
  if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
    return (-1, 0)

  binarySearch(0, firstHotEntry)
}

Kafka对二分搜索的优化

kafka (3)

对正常的二分查找来说,假设索引的大小有13个页,我们需要查找的偏移量在页12上,那么我们会依次访问0->12->6->9->11->12这六个页

当生产者继续往分区中生产消息,超过4kb后,又写了一个新的索引项,这个时候索引访问的顺序是:0->13->7->10->11>12->13

kafka (5)

通过对上面mmap的研究可以知道,磁盘到用户主存的映射实际上依赖于页表,只是用户进程可以通过指针操作直接读写page cache,不再需要系统调用和内存拷贝。常用的页表置换算法基本是基于LRU的,当读取页7/10的时候,这两个页可能已经很长时间没有被访问到了,已经从LRU中移除了,这个时候再访问这两个页的时候就可能导致操作系统陷入缺页中断。

Here, we use a more cache-friendly lookup algorithm:
if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
   binarySearch(end - N, end)
else
   binarySearch(begin, end - N)

这里Kafka做了一个优化,保证index文件的最后N个项分为热区,而剩余项则是冷区。这是因为在热区中的索引项可能因为更为频繁的访问,更有可能存在于页表中,可以增加搜索的效率。这里N的值是8192,官方给的解释是:

We set N (_warmEntries) to 8192, because
1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
   lookup. So that, the entire warm section is really "warm".
   When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
   and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
   touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
   SPARC, Power, ARM etc.).
2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
   settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.

warmEntries的个数:

protected def _warmEntries: Int = 8192 / entrySize
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
    extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
  import OffsetIndex._

  override def entrySize = 8

这样的设计保证了几点:

1、8192的大小保证了三个索引项是在页表中的indexEntry(end), indexEntry(end-N), indexEntry((end*2 -N)/2)

//待续

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/661091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3. redis cluster集群运维与核心原理剖析

分布式缓存技术Redis 1. Redis集群方案比较2. Redis高可用集群搭建 本文是按照自己的理解进行笔记总结&#xff0c;如有不正确的地方&#xff0c;还望大佬多多指点纠正&#xff0c;勿喷。 课程内容&#xff1a; 1、哨兵集群与Redis Cluster架构异同 2、Redis高可用集群快速实…

2023/6/18总结

JS 在document.querySelectorAll(CSS选择器) 选到的集合并没有pop()和push()等数组的方法。是一个伪数组。 如果想要得到里面的每一个对象&#xff0c;需要用for遍历获得 document.getElementById(id名称) 根据id获取一个元素 document.getElementsByTagName(标签名字) 根…

Css面试题:css文字隐藏

文章目录 文字隐藏单行文字隐藏多行文字隐藏基于高度设置多行文字隐藏基于行数设置多行文字隐藏 文字隐藏 单行文字隐藏 主要是通过overflow&#xff0c;text-overflow&#xff0c;white-space三个属性实现。 overflow&#xff1a;visible|hidden|auto|scroll|inherit&#…

【c语言】-- 操作符汇总

&#x1f4d5;博主介绍&#xff1a;目前大一正在学习c语言&#xff0c;数据结构&#xff0c;计算机网络。 c语言学习&#xff0c;是为了更好的学习其他的编程语言&#xff0c;C语言是母体语言&#xff0c;是人机交互接近底层的桥梁。 本章来学习数组。 让我们开启c语言学习之旅…

简单认识web与http协议

文章目录 web基础域名概述DNS&#xff08;Domain Name System域名系统&#xff09; 域名空间结构 域名实际用法 2. 网页的概念2.1 网页&#xff08;HTTP/HTTPS&#xff09;HTML 概述HTML超文本标记语言 HTML文档的结构头标签中常用标签内容标签中常用标签Web概述具体组成web的主…

chatgpt赋能python:Python如何创建窗口——从入门到精通

Python如何创建窗口——从入门到精通 Python是一种高级编程语言&#xff0c;它的易读性和清晰简洁的语法使它成为许多人喜欢学习的编程语言之一。Python的一个主要特色是其丰富的库和模块。在本文中&#xff0c;我们将讨论如何使用Python创建一个窗口&#xff0c;并在其中添加…

【力扣刷题 | 第十一天】

前言&#xff1a; 我将会利用几天把树的经典例题都刷完&#xff0c;希望我可以坚持下去。 226. 翻转二叉树 - 力扣&#xff08;LeetCode&#xff09; 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 解题思路&#xff1a;我们交换每一…

C语言之运算符用法(补充前面运算符中的不足)

设定&#xff1a;int X20,Y10 1、算术运算符 注&#xff1a;自增和自减运算符只能用于变量&#xff0c;不可用于常量或表达式。另&#xff0c;X与X是不同的(–亦同)。以语句a[x]100;为例&#xff1a; a[X]100;执行之后得到&#xff1a;a[20] 100、X 21。//即&#xff0c;先执行…

Windows10下超详细Mysql安装

目录 0. 前言1. 下载mysql2. 开始安装3. 验证安装4. 环境变量配置 0. 前言 Mysql简介&#xff1a; MySQL是一种开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它使用SQL&#xff08;结构化查询语言&#xff09;语言进行数据的存储和访问。MySQL的设计…

git版本管理入门(本地/远程仓库,常用命令)

目录 git简介 安装git 配置SSH key Linux环境下需要命令生成ssh key 本地git管理 多人协作流程 追加 重新提交 git命令 git commit本地和git push远程 git stash和git stash pop暂存 git status查看修改哪些了文件​ git diff 查看修改前后的差异 git log查看提交…

Centos7安装配置Docker

1. 什么是Docker 在开篇之前考虑到阅读人群,我觉得有必要向各位读者朋友简单介绍一下Docker是什么,它解决了什么问题&#xff1f;Docker是基于Go语言实现的云开源项目。它对此给出了一个标准化的解决方案-----系统平滑移植&#xff0c;容器虚拟化技术。让开发者可以打包他们的…

从加密到签名:如何使用Java实现高效、安全的RSA加解密算法?

目录 1. 接下来让小编给您们编写实现代码&#xff01;请躺好 ☺ 1.1 配置application.yml文件 1.2 RSA算法签名工具类 1.3 RSA算法生成签名以及效验签名测试 1.4 RSA算法生成公钥私钥、加密、解密工具类 1.5 RSA算法加解密测试 我们为什么要使用RSA算法来进行加解密&…

React之state详解

目录 执行过程 异步 React18与自动批处理 setState 推荐用法 ()>{return }&#xff0c;this.state. 生命周期 数据没改变时​不渲染 shouldComponentUpdate PureComponent自动&#xff08;推荐&#xff09; 你真的理解setState吗&#xff1f; - 掘金 组件的私有…

《Nature Aging》: 揭示皮肤衰老的分子机制

一个人衰老最直接的体现就是皮肤衰老。人体的皮肤一般从25&#xff5e;30岁以后即随着年龄的增长而逐渐衰老&#xff0c;大约在35&#xff5e;40岁后逐渐出现比较明显的衰老变化。但是&#xff0c;我们的皮肤为什么会衰老呢&#xff1f;要回答这个问题&#xff0c;我们首先要了…

STC单片机存储器介绍和使用

STC单片机存储器介绍和使用 🌿STC15F2K60S2系列内部结构框图 🌿STC12C5A60S2系列内部结构框图 📑程序存储器(ROM/Flash) 🔖STC单片机ROM容量大小可以根据其型号和命名规则了解到。 🌿STC

chatgpt赋能python:Python怎样让画笔变粗

Python怎样让画笔变粗 Python是一门强大的编程语言&#xff0c;不仅适用于数据分析和机器学习等领域&#xff0c;也可以用来进行图像处理。在Python中&#xff0c;我们可以使用Pillow库来进行图像操作。在本篇文章中&#xff0c;我们将介绍如何使用Python和Pillow来让画笔变粗…

基于游客时空行为特征研究(两步路)

1 轨迹计算 1.1 使用geopy geopy模块常用于定位全球地址、以及经纬度相关的转换与计算&#xff0c;详细请参考&#xff1a; https://pypi.org/project/geopy/ 1.2 安装 pip install geopy 1.3 根据经纬度计算距离 Geopy可以使用测地线距离或大圆距离计算两点之间的测地线距离&a…

【C数据结构】无头非循环单向链表_SList

目录 无头非循环单向链表LinkedList 【1】链表概念 【2】链表分类 【3】无头单向非循环链表 【3.1】无头单向非循环链表数据结构与接口定义 【3.2】无头单向非循环链表初始化 【3.3】无头单向非循环链表开辟节点空间 【3.4】无头单向非循环链表销毁 【3.5】 无头单向非…

Qt中以qRegister开头的几个函数的用法说明

目录 1. 前言 2. qRegisterMetaTypeStreamOperators 2.1. 函数功能简述 2.2.用法举例1 3. qRegisterMetaType 1. 前言 Qt通过qRegister开头的函数和Q_DECLARE开头的几个宏向Qt元系统注册、声明一些非基本类型。一旦声明、注册后&#xff0c;在Qt元系统中就可以很方便的利用这…

神秘龙卷风

那道提示 打开后是一个加密压缩包&#xff0c;根据题目提示&#xff0c;这应该是一道暴力破解的题目 暴力破解后得到密码位5463 结果拿到是一串不止到啥的字符&#xff0c;根据提示应该是还要进行解码 经过查询&#xff0c;得知这个编码叫Brainfuck&#xff1a;&#xff08;下面…