Kafka的消息存储机制

news2025/1/23 4:39:01

前面咱们简单讲了K啊开发入门相关的概念、架构、特点以及安装启动。 今天咱们来说一下它的消息存储机制。

前言:

Kafka通过将消息持久化到磁盘上的日志文件来实现高吞吐量的消息传递。

这种存储机制使得Kafka能够处理大量的消息,并保证消息的可靠性。

1、消息存储机制概述:

1.1 分区与副本:

Kafka将每个主题划分为一个或多个分区,每个分区可以有多个副本。分区和副本的概念为Kafka提供了水平扩展和故障容错的能力。

1.2 消息日志:

Kafka的消息存储机制基于消息日志的概念。消息被追加到一个或多个分区的日志文件中,每个分区都有一个单独的日志文件,其中的消息按顺序存储。

1.2.1 消息发送

每当往某个Topic发送数据时,数据会被hash到不同的partition,这些partition位于不同的集群节点上,所以每个消息都会被记录一个offset消息号,随着消息的增加逐渐增加,这个offset也会递增,同时,每个消息会有一个编号,就是offset号。消费者通过这个offset号去查询读取这个消息。

  • 发送消息流程

    1. 首先获取topic的所有Patition

    2. 如果客户端不指定Patition,也没有指定Key的话,使用自增长的数字取余数的方式实现指定的Partition。这样Kafka将平均的向Partition中生产数据。

    3. 如果想要控制发送的partition,则有两种方式,一种是指定partition,另一种就是根据Key自己写算法。继承Partitioner接口,实现其partition方法。

alt
  • 消息消费

    消费者有消费者族群的概念,当生产者将数据发布到topic时,消费者通过pull的方式,定期从服务器拉取数据,当然在pull数据的时候,,服务器会告诉consumer可消费的消息offset。

alt

2、源码解析与技术细节:

2.1 日志文件格式:

Kafka使用一种特殊的文件格式来存储消息日志,该格式称为“分段的日志(segmented log)”。

alt 根据你的需求,这里给出一个简单的针对Kafka日志文件源码解析的示例:

Kafka日志文件的源码实现位于Kafka项目的core模块中,主要包括以下几个关键类和接口:

  1. Log类:代表一个分区的日志文件,它负责对消息的追加、读取和索引等操作。在Log类中,核心的数据结构是Segment,它表示一个日志分段。

  2. Segment类:代表日志文件中的一个分段,是Kafka用于存储消息的基本单元。每个分段都有一个起始偏移量和一个结束偏移量,用于定位消息的位置。分段由多个消息组成,按照消息的追加顺序顺序存储。

  3. OffsetIndex类:用于支持高效的偏移量查找。Kafka在每个分段中维护一个偏移量索引,使得可以通过偏移量快速定位到消息的物理位置。

  4. OffsetPosition类:表示一个偏移量在日志文件中的位置信息,包括分段文件名、消息在文件中的位置和消息的大小等信息。

2.2 消息追加与索引:

Kafka使用追加写的方式将消息写入日志文件,并使用索引结构来提供高效地消息检索。我们将通过源码解析,详细讲解消息追加和索引的实现原理及相关技术细节。

下面是一个简化的示例,包括消息的追加、读取和索引等操作:

// Log类的部分关键源码
class Log(dir: File, config: LogConfig{
  // 初始化日志目录
  private val logDir = CoreUtils.createDirectory(dir)

  // 初始化日志片段
  private val segments: mutable.Map[LongLogSegment] = loadSegments()

  // 向日志中追加消息
  def append(messages: Seq[Message]): LogAppendInfo = {
    // ...
    // 将消息追加到当前活跃的日志片段中
    val currentSegment = segments(activeSegmentIndex)
    currentSegment.append(messages)
    // ...
  }

  // 从日志中读取消息
  def read(offset: Long, maxLength: Int): FetchDataInfo = {
    // ...
    // 根据偏移量找到对应的分段文件
    val segment = segments.floorEntry(offset).getValue
    segment.read(offset, maxLength)
    // ...
  }

  // 根据配置删除老旧的日志片段
  def deleteOldSegments(): Unit = {
    // ...
    // 删除所有小于日志保留大小的分段
    val deletableSegments = segments.filter(segment => segment.getSize <= config.retentionSize)
    deletableSegments.foreach {
      case (_, segment) => segment.delete()
    }
    // ...
  }

  // 加载已存在的日志片段
  private def loadSegments(): mutable.Map[LongLogSegment] = {
    // ...
    // 遍历日志目录下的所有分段文件,加载到内存中
    val segments = mutable.Map[LongLogSegment]()
    val segmentFiles = logDir.listFiles.filter(_.isFile).sortBy(_.getName)
    segmentFiles.foreach { file =>
      // 解析文件名中的偏移量
      val offset = parseOffset(file)
      // 创建并加载分段
      val segment = new LogSegment(file, offset)
      segment.load()
      // 添加到分段列表中
      segments.put(offset, segment)
    }
    // ...
    segments
  }
}

2.3 日志压缩:

在Kafka中,可以通过启用日志压缩来减小存储空间的占用和网络传输的开销。Kafka支持多种压缩算法,包括Gzip、Snappy和LZ4等。下面是一个简单的步骤,说明如何在Kafka中启用日志压缩:

  1. 在Kafka服务器配置文件中,找到以下配置项:

    compression.type = producer

    将该配置项的值设置为所需的压缩算法,例如:

    compression.type = gzip
  2. 如果你的Kafka集群有多个副本(replica),你还需要在Kafka服务器配置文件中为每个副本设置以下配置项:

    min.insync.replicas = 2

    该配置项指定了进行压缩的最小副本数,确保至少有指定数量的副本处于同步状态。这是为了防止数据丢失,在进行日志压缩时仍然能够保持高可靠性。

  3. 重启Kafka服务器,以使配置生效。

在启用日志压缩后,Kafka将会自动对生产者发送的消息进行压缩,并在消费者读取消息时自动解压缩。这样可以显著减小消息的存储空间和网络传输开销,提高系统的性能和效率。

需要注意的是,在启用日志压缩后,读写数据的性能会受到一些影响,因为压缩和解压缩需要一定的计算资源。因此,在选择压缩算法和配置压缩参数时,需要权衡存储空间的节省和性能的需求。

此外,还有一种在Kafka中压缩日志的方法是使用外部工具(如Hadoop的hadoop-archive-logs命令),先将日志文件打包成压缩文件(如tar.gz),然后再进行存储。这种方式需要额外的步骤和工具,并且不支持实时的压缩和解压缩。因此,如果需要实时的压缩和解压缩功能,建议使用Kafka内置的日志压缩功能。

3、存储性能优化:

优化Kafka存储性能可以提高消息的写入和读取速度,以及减少存储占用。下面是一些常见的Kafka存储性能优化策略建议:

  1. 批量发送:通过将多条消息合并为一个批次进行发送,可以减少网络传输开销和降低磁盘IO。在生产者端,可以设置batch.size参数来调整批次大小。较大的批次大小可以提高吞吐量,但可能会增加延迟。在消费者端,可以使用fetch.min.bytes参数来配置批量拉取的最小字节数,默认为1字节。

  2. 合理的副本因子:Kafka的消息是以副本的形式存储在不同的节点上。通过合理配置副本因子,可以在保证消息的可靠性的同时,提高写入性能。较小的副本因子可以减少副本间的同步开销。如有必要,可以将min.insync.replicas参数设置为小于副本因子的值。但同时要注意,较小的副本因子可能会增加消息的丢失风险。

  3. 启用压缩:Kafka支持对消息进行压缩,在减小存储占用和网络传输开销方面具有很大优势。可以配置生产者的compression.type参数来启用压缩功能,并选择合适的压缩算法(如Gzip、Snappy或LZ4)。压缩会增加一些额外的CPU开销,但通常能在存储和传输方面带来明显的性能收益。

  4. SSD存储:Kafka使用大量的磁盘IO,因此使用固态硬盘(SSD)可以显著提高性能。SSD具有更低的读写延迟和更高的吞吐量,适合处理大量的随机读写操作。

  5. 分区和副本的平衡:合理设置分区和副本的数量,可以提高负载均衡和并行处理能力。如果某个分区或副本的读写速度较慢,可以考虑增加其数量。

  6. 优化日志清理:Kafka会定期清理日志段文件来释放磁盘空间。通过调整log.retention.hourslog.retention.bytes等参数,可以控制日志的保留时间和大小。合理设置这些参数可以避免过早的数据清理和降低磁盘压力。

  7. 确保足够的磁盘带宽:Kafka的存储性能受限于磁盘带宽。确保磁盘子系统具有足够的带宽和IO吞吐量,可以避免磁盘成为性能瓶颈。

以上是一些常见的Kafka存储性能优化策略,根据实际情况和需求,可以选择适合的优化方法,并进行配置和调整。同时,定期监控系统性能并进行性能测试,可以帮助发现潜在的性能问题并进行进一步优化。

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1038901.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

西门子S7-1200使用LRCF通信库与安川机器人进行EthernetIP通信的具体方法示例

西门子S7-1200使用LRCF通信库与安川机器人进行EthernetIP通信的具体方法示例 准备条件: PLC:S7-1200 1214C DC/DC/DC 系统版本4.5及以上。 机器人控制柜:安川YRC1000。 软件:TIA V17 PLC做主站,机器人做从站。 具体方法可参考以下内容: 使用的库文件为西门子 1200系列…

[React] react-hooks如何使用

react-hooks思想和初衷&#xff0c;也是把组件&#xff0c;颗粒化&#xff0c;单元化&#xff0c;形成独立的渲染环境&#xff0c;减少渲染次数&#xff0c;优化性能。 文章目录 1.为什么要使用hooks2.如何使用hooks2.1 useState2.2 useEffect2.3 useLayoutEffect2.4 useRef2.5…

【前段基础入门之】=>HTML5 的新增特性!

这里写目录标题 HTML5简介HTML5 优势新增语义化标签新增布局标签新增状态标签meter 标签progress 标签 新增列表标签新增文本标签文本注音文本标记 新增表单功能表单控件新增属性input 新增属性值form 标签新增属性 新增多媒体标签视频播放标签音乐播放标签 新增全局属性&#…

ICCV 2023 | 噪声关联鲁棒的图匹配方法

©PaperWeekly 原创 作者 | 林义杰 单位 | 四川大学 研究方向 | 多模态、多视角学习 论文标题&#xff1a; Graph Matching with Bi-level Noisy Correspondence 论文地址&#xff1a; https://arxiv.org/pdf/2212.04085.pdf 开源代码&#xff1a; https://github.com/XLe…

word文档莫名其妙的丢失了怎么办?7个方案恢复

不知道你是否曾经遇到过相似的情况&#xff1a;花费了数小时甚至数天编辑的Word文档&#xff0c;却莫名其妙的丢失了。这时的心情可能非常复杂。如果你不知道该怎么办&#xff0c;以下是几种恢复方案&#xff0c;希望能对你有所帮助&#xff01; 关于Word文档 Word文档通常是由…

Vue路由与nodejs下载安装及环境变量的配置

目录 前言 一、Vue路由 1.路由简介 是什么 作用 应用场景 2.SPA简介 SPA是什么 SPA的优点 注意事项 3.路由实现思路 1.引入路由的js依赖 2.定义组件 3.定义组件与路径的对应关系 4.通过路由关系获取路由对象router 5.将路由对象挂载到实例中 6.触发路由事…

学生选课系统基础版

目录 一.Java 中的集合框架&#xff08;上&#xff09; 1.Java中的集合框架概述 2.Collection接口&接口简介 3.学生选课——创建学生类和课程类 4.学生选课——添加课程Ⅰ 5.学生选课——添加课程Ⅱ 6.学生选课——课程查询 7.学生选课——课程修改 8.学生选课——课程删…

Java日志源码详解,SpringBoot日志 slf4j、logback、log4j

日志视频讲解—上日志视频讲解—下学习文档集合 一、前提 在Java中说起日志&#xff0c;定听过这样几个名词&#xff1a;slf4j、logback、log4j&#xff0c;在正式开始之前&#xff0c;先了解几个简单的概念 slf4j、logback、log4j 的作者都是一个人slf4j 的全名是 Simple Log…

ByteTrack 论文学习

1. 解决了什么问题&#xff1f; 多目标跟踪是在给定的视频片段中&#xff0c;预测出目标的边框和 ID 信息。现有方法需要在 true positives 和 false positives 之间做取舍&#xff0c;将高于一定阈值的检测框关联起来&#xff0c;获取其 ID。而那些低得分的目标&#xff08;如…

全新的Windows12上线抢先体验

AIGC专栏/AI绘画教程/java面试题领取 win12太离谱了&#xff0c;win11还没用几天&#xff0c;win12就已经出来了&#xff0c;如此流畅的页面&#xff0c;很具有和苹果一拼的效果&#xff0c;流畅度也是一流。文末有领取方式。 WIN12系统在色差表现方面也超越了苹果。它采用了前…

华为数通方向HCIP-DataCom H12-831题库(单选题:141-160)

第141题 R3与R1的IS-IS邻居没有建立,根据本图的信息,可能的原因是? A、R3与R1的IS-Level不匹配 B、R3与R1的互连接口circuit-type不匹配 C、R3与R1的IIH认证失 D、R3与R1的System ID重复 答案: B 解析: 从上图的Bad Circuit Type:16 可知道R3与R1的互连接口circuit-type…

【EI会议征稿】第三届计算机图形学、人工智能与数据处理国际学术会议 (ICCAID 2023)

第三届计算机图形学、人工智能与数据处理国际学术会议 2023 3rd International Conference on Computer Graphics, Artificial Intelligence and Data Processing (ICCAID 2023) 第三届计算机图形学、人工智能与数据处理国际学术会议&#xff08;ICCAID 2023&#xff09;将于…

基于数据驱动的成本洞察,趣丸科技的FinOps进阶之路~

今年以来&#xff0c;我们注意到越来越多的单位开始积极实践FinOps&#xff0c;而随着FinOps的发展&#xff0c;大家对于其落地过程的关注也更加具体和深入&#xff0c;涉及了账单波动、FinOps的边际效应、成本模型、依赖工具等多个关键问题。 本月「UGeek大咖说」线上直播活动…

机器学习之泛化与过拟合的概念

文章目录 泛化&#xff08;Generalization&#xff09;&#xff1a;过拟合&#xff08;Overfitting&#xff09;&#xff1a;例子 泛化&#xff08;Generalization&#xff09;&#xff1a; 泛化是指机器学习模型在未见过的新数据上表现良好的能力。换句话说&#xff0c;一个好…

【软件测试】Junit5

Selenium自动化测试框架Junit单元测试框架拿着一个技术写自动化测试用例 (Selenium3)拿着一个技术管理已经编写好的测试用例 (Junit5) Junit相关技术 Junit是针对java的一个单元测试框架。 注解 Test 表示当前的这个方法是一个测试用例 添加依赖&#xff1a; 不需要main方…

解决react使用redux toolkits时出现的数组对象长度始终为0的怪异问题

有个react项目在添加购物车后&#xff0c;立马白屏&#xff0c;看一下console报错properties of undefined(reading length) 那意思是说数组没有长度&#xff0c;然后定位Header.tsx的182行&#xff0c;果然是数组长度报错 回到具体代码中&#xff1a;发现shoppingCartItems实…

大模型存在“反转诅咒”现象,无法处理反向问题;Langchain课程资源

&#x1f989; AI新闻 &#x1f680; 大模型存在“反转诅咒”现象&#xff0c;无法处理反向问题 摘要&#xff1a;最新研究发现&#xff0c;大语言模型存在“反转诅咒”现象&#xff0c;即明知道“A 是 B”&#xff0c;却答不出“B 是 A”。研究人员进行了两项实验&#xff0…

【RocketMQ专题】快速实战及集群架构原理详解

目录 课程内容一、MQ简介基本介绍*作用&#xff08;解决什么问题&#xff09; 二、RocketMQ产品特点2.1 RocketMQ介绍2.2 RocketMQ特点2.3 RocketMQ的运行架构2.4 消息模型 三、RocketMQ快速实战3.1 快速搭建RocketMQ服务3.2 快速实现消息收发3.3 搭建Maven客户端项目3.4 搭建R…

Linear Feedback Shift Register

线性反馈移位寄存器&#xff08;Linear Feedback Shift Register&#xff0c;简称LFSR&#xff09;是一种数字电路设计和密码学中常用的寄存器类型。它是一种简单而高效的方式&#xff0c;用于生成伪随机的二进制序列&#xff0c;并在数据混淆、错误检测和加密等领域中有应用。…

怎么用蜂邮EDM和Outlook批量发送邮件带附件

蜂邮EDM和Outlook批量发送邮件带附件的流程&#xff1f;有哪些邮件批量发送邮件附件的方法&#xff1f; 在现代社会中&#xff0c;电子邮件是一种广泛应用的沟通工具&#xff0c;而批量发送邮件带附件则是许多商业和个人用户的常见需求。本文将介绍如何使用蜂邮EDM和Outlook这…