项目实战总结-Kafka实战应用核心要点

news2025/1/27 12:39:18

Kafka实战应用核心要点

  • 一、前言
  • 二、Kafka避免重复消费
    • 2.1 消费者组机制
    • 2.2 幂等生产者
    • 2.3 事务性生产者/消费者
    • 2.4 手动提交偏移量
    • 2.5 外部存储管理偏移量
    • 2.6 去重逻辑
    • 2.7 幂等消息处理逻辑
    • 2.8 小结
  • 三、Kafka持久化策略
    • 3.1 持久化文件
    • 3.2 segment 分段策略
    • 3.3 数据文件刷盘策略
    • 3.4 日志清理策略
    • 3.5 Kafka消息查找策略
  • 四、Kafka零复制(Zero-copy)
  • 五、Kafka设计实现延迟消息
  • 六、Kafka与ZooKeeper依赖性

一、前言

在这里插入图片描述
记录Kafka在项目中应用的核心要点,面试可食用。

二、Kafka避免重复消费

在 Apache Kafka 应用于项目中时,避免重复消费是个重要且常见的问题,尤其是在处理消息时需要确保每条消息只被处理一次。总结而言,避免重复消费的方式有七种:

2.1 消费者组机制

Kafka消费者组(Consumer Group)机制可以确保每个分区的消息只被一个消费者实例消费。通过合理的分区和消费者组设计,可以避免同一消息被多个消费者重复消费。
优点:

  • 简单易用,Kafka内置支持。
  • 适用于简单的负载均衡和扩展。

缺点:

  • 不能完全避免重复消费,比如在消费者重启或重新平衡的过程中可能会有些消息被重复消费。
  • 需要额外处理消费者重平衡带来的复杂性。

2.2 幂等生产者

Kafka 0.11.0版本引入幂等生产者(Idempotent Producer),可确保相同的消息在网络或其他错误导致重试时不会被重复写入Kafka。
启用幂等生产者只需要在生产者配置中设置enable.idempotence=true。幂等生产者确保消息在网络或其他错误导致重试时不会被重复写入 Kafka,通过为每个消息分配唯一的序列号来实现幂等性。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

优点:

  • 简化生产者端的去重逻辑。
  • 可以确保消息在Kafka中只写入一次。

缺点:

  • 需要Kafka 0.11.0及以上版本。
  • 在某些情况下可能会增加生产者的延迟。

2.3 事务性生产者/消费者

Kafka支持事务性消息,允许生产者和消费者在一个事务中一起工作。生产者可以将一组消息作为一个事务写入Kafka,消费者也可以在一个事务中读取和处理消息。这样可确保消息处理的原子性和一致性。要使用事务性生产者,需要配置transactional.id
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

优点:

  • 提供强一致性保证。
  • 避免消息处理中的部分提交问题。

缺点:

  • 复杂度较高,需Kafka 0.11.0及以上版本。
  • 性能开销较大,适用于对一致性要求高的场景。

2.4 手动提交偏移量

Kafka消费者默认会自动提交偏移量(auto commit),为更好地控制消息处理和偏移量提交,可关闭自动提交(enable.auto.commit=false),并在确保消息处理成功后手动提交偏移量。这可通过commitSync()commitAsync()方法来实现。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
    }
    consumer.commitSync();
}

优点:

  • 精细控制偏移量提交时机,确保消息处理成功后才提交。
  • 提高处理的可靠性。

缺点:

  • 增加消费者代码的复杂性。
  • 如果处理逻辑很慢,可能导致偏移量提交延迟。

2.5 外部存储管理偏移量

在某些特定场景下,可将偏移量存储在外部存储(如数据库)中,而不是依赖 Kafka的内部偏移量管理。这样可在消息处理和偏移量提交之间建立更强的关联,确保只有当消息处理成功后才更新偏移量。
优点:

  • 可以在消息处理和偏移量提交之间建立更强的关联。
  • 灵活性高,可根据业务需求自定义偏移量管理。

缺点:

  • 需要额外的存储和管理逻辑。
  • 增加系统的复杂性。

2.6 去重逻辑

在消息处理逻辑中引入去重机制。
例如,可以使用消息的唯一标识符(如消息ID)在处理前检查是否已经处理过该消息,从而避免重复处理。
优点:

  • 灵活性高,可根据业务逻辑自定义去重策略。
  • 适用于需要严格去重的场景。

缺点:

  • 需要额外的存储和管理去重信息。
  • 增加处理逻辑的复杂性。

2.7 幂等消息处理逻辑

设计消息处理逻辑时,尽量使其成为幂等操作,即相同的消息即使被处理多次也不会产生副作用。
例如,在数据库操作时,可以使用UPSERT操作(更新插入)来确保数据的一致性。
优点:

  • 简化重复消费问题的处理。适
  • 用于可以设计为幂等操作的业务场景。

缺点:

  • 并不是所有业务逻辑都能设计为幂等操作。
  • 需要仔细设计和验证处理逻辑的幂等性。

2.8 小结

对于大多数场景,结合使用消费者组、手动提交偏移量和幂等处理逻辑可以有效避免重复消费,而在需要更严格一致性的场景下,可以考虑使用幂等生产者和事务性消息
具体选择方案取决于具体的应用场景和需求。

三、Kafka持久化策略

Kafka实际上就是日志消息存储系统, 根据offset获取对应的消息,消费者获取到消息之后该消息不会立即从mq中移除,而是继续存储在磁盘中

3.1 持久化文件

topic有分区(partition)的概念,Kafka 会将topic分成多个不同的分区,生产者往同一个topic发送的消息最终是发送到不同的分区里面,每个分区中拆分成多个不同的segment文件存储日志。
每个segment文件包含:

  • .index 文件 (消息偏移量索引文件)
  • .log 文件(消息物理存放文件)
  • .timeindex文件(时间索引文件)

每个segment文件容量最大默认为500MB,如果超过500MB就生成新的 segment文件,且文件命名后几位表示上个segment文件最后offset值,如:segment01 、segment500 、segment1000
由此可知:一个topic里的消息是由该topic下所有分区里的消息组成的。在同一个分区内部,消息是有序的,而不同分区之间,消息是不能保证有序的

存储的消息日志文件在 server.properties 配置文件的 log.dirs 参数指定的目录下,以" t o p i c − topic- topicpartition"为名称的目录:
在这里插入图片描述
注:由于每个分区都有leader的概念,而不同分区的leader可能位于不同的broker上,除leader外,分区还有副本(replica)的概念,因此每个broker只会存储分区leader或副本位于该broker中的topic的消息。

3.2 segment 分段策略

在 server.properties 配置文件中,分段文件配置默认是500MB ,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件),文件较多时性能会稍微降低。下面是相关配置参数:

##日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
log.roll.hours=72
##segment的索引文件最大尺寸限制,即时log.segment.bytes没达到,也会生成一个新的segment
log.index.size.max.bytes=10*1024*1024
##控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1024*1024*1024

3.3 数据文件刷盘策略

当把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。若此时操作系统宕机,数据就会丢失。
这里可根据消息的数量log.flush.interval.messages和时间log.flush.interval.ms进行配置,如果时间设置的过大,有没达到指定的数量的情况下,如果系统宕机,数据就会丢失。
Kafka官方并不建议通过Broker端的log.flush.interval.messageslog.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。

##每当producer写入10000条消息时,刷数据到磁盘 配置
log.flush.interval.messages=10000
##每间隔5秒钟时间,刷数据到磁盘
log.flush.interval.ms=5000

3.4 日志清理策略

##   是否开启日志清理
log.cleaner.enable=true
##  日志清理运行的线程数
log.cleaner.threads = 2
##  日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖,默认 delete
log.cleanup.policy = delete
##  数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
##  log.retention.bytes和 log.retention.minutes或 log.retention.hours任意一个达到要求,都会执行删除
log.retention.minutes=300
log.retention.hours=24
##   topic每个分区的最大文件大小,-1没有大小限制
log.retention.bytes=-1
##  文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes

3.5 Kafka消息查找策略

前文提到每个segment file有命名规则,且在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始顺序消费的offset值,value代表该消息的物理消息存放位置。
但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小。
kafka就是利用分段+索引的方式来解决查找效率的问题,kafka没有对每个文件建立索引,而是利用kafka 消息写入磁盘的顺序性,对其中部分的消息建立偏移量索引和时间戳索引,这就是稀疏索引,目的是节约空间的资源,定位到相邻.log文件,再根据顺序遍历查找,此方式的时间复杂度是O(n)。
其中,偏移量索引源码:

offsetIndex.append(largestOffset, physicalPosition)
 
def append(offset: Long, position: Int) {
  inLock(lock) {
    // 索引位置
    mmap.putInt(relativeOffset(offset))
    // 日志位置
    mmap.putInt(position)
    _entries += 1
    _lastOffset = offset
  }
}

// 用当前offset减去基准offset
def relativeOffset(offset: Long): Int = {
  val relativeOffset = offset - baseOffset
}

时间戳索引源码:

timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
    inLock(lock) {
      if (timestamp > lastEntry.timestamp) {
        // 添加时间戳
        mmap.putLong(timestamp)
        // 添加相对位移(偏移量索引)
        mmap.putInt(relativeOffset(offset))
        _entries += 1
        _lastEntry = TimestampOffset(timestamp, offset)
      }
    }
}

Kafka使用改进版的二分查找,改的不是二分查找的内部,而是把所有索引项分为热区和冷区 这个改进可以让查询热数据部分时,遍历的Page永远是固定的,这样能避免缺页中断。
整体流程:
在这里插入图片描述

四、Kafka零复制(Zero-copy)

Kafka信息复制的原因:确保任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
Kafka之所以能够快速地处理大量数据,其中一个重要原因就是采用零拷贝(Zero-copy)技术。Kafka采用两种零拷贝技术来提高性能:mmap(memory-map)sendfile
主要有两个大的场景:

  • Broker 读写.index文件,用 mmap零复制
  • Broker 向Consumer发消息,用 sendfile 零复制

mmap (memory-map):把文件映射到进程的虚拟内存空间。通过对这段内存的读取和修改,可以实现对文件的读取和修改,而不需要用read和write系统调用。
sendfile:直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。

五、Kafka设计实现延迟消息

Kafka延时操作的实现方式:基于时间戳的延时和基于特殊Topic的延时
(1)基于时间戳的延时:通过设置消息的时间戳来实现延时操作。Producer在发送消息时,可以为消息设置一个未来的时间戳,指定消息在该时间点之后才能被消费者消费。Kafka会根据消息的时间戳进行延时推送,直到时间点到达后才将消息发送给消费者。
(2)基于特殊Topic的延时:通过创建专门的延时Topic来实现延时操作。可以将需要延时的消息发送到延时Topic中,然后设置一个定时任务来定期检查延时Topic中的消息,并将到期的消息转发到目标Topic供消费者消费。

简单步骤:

1.创建正常的topic(即即时消费的消息)。
2.创建延迟的topic,并设置合适的副本因子和参数以支持延迟消费。
3.发送消息到正常的topic,同时指定消息需要被延迟消费。
4.使用Kafka的消费者API从延迟topic拉取消息并处理。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class DelayedMessageProducer {
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        // 正常的topic
        String immediateTopic = "immediate-messages";
        // 延迟的topic
        String delayedTopic = "delayed-messages";
 
        // 消息内容
        String value = "This is a delayed message";
 
        // 延迟消费的时间,例如10秒
        long delayTime = 10000;
 
        // 发送消息到延迟的topic
        producer.send(new ProducerRecord<>(delayedTopic, 0, System.currentTimeMillis() + delayTime, value));
 
        producer.close();
    }
}

六、Kafka与ZooKeeper依赖性

从Kafka 2.8版本开始,Kafka提供KRaft模式,需要配置Quorm控制器,可以在没有ZooKeeper的情况下运行Kafka集群。
之前版本,Zookeeper是Kafka的核心组件之一,负责集群元数据的管理和控制器的选举等任务。Zookeeper存储和管理着Kafka的元数据信息和配置信息,包括broker的IP地址、端口号、主题分区的分配方案等。此外,Zookeeper还帮助Kafka集群实现自动故障转移和负载均衡等功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2161522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode面试经典150题-39.组合总和

给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返回这些组合。 candidates 中的 同一个 数字可以 无限制重复被选取 。如…

加固与脱壳01 - 环境搭建

虚拟机 VMWare 多平台可用&#xff0c;而且可以直接激活&#xff0c;需要先注册一个账号 https://support.broadcom.com/group/ecx/productdownloads?subfamilyVMwareWorkstationPro KALI 类Ubuntu系统&#xff0c;官方提供了 vmware 版本&#xff0c;直接下载就可以使用。…

【Python机器学习】NLP信息提取——提取人物/事物关系

目录 词性标注 实体名称标准化 实体关系标准化和提取 单词模式 文本分割 断句 断句的方式 使用正则表达式进行断句 词性标注 词性&#xff08;POS&#xff09;标注可以使用语言模型来完成&#xff0c;这个语言模型包含词及其所有可能词性组成的字典。然后&#xff0c;该…

三子棋小游戏

使用C语言编写代码&#xff0c;实现一个简单小游戏---三子棋 这里创建1个game.h文件&#xff0c;用来声明函数、宏的文件&#xff0c;一个game.c文件用来实现函数game&#xff08;&#xff09;&#xff0c;一个play.h文件用来作为该游戏的源文件。 具体代码如下&#xff1a; …

利用大型语言模型轻松打造浪漫时刻

当情人节年年如约而至&#xff0c;每每都需费尽心思为对方营造一场令人难忘的仪式&#xff0c;却因缺乏创意与思路而倍感困扰。今天&#xff0c;我决定让大型语言模型为我们提供一些灵感和建议&#xff0c;让我们能够轻松实现这一目标。让我们开始行动吧&#xff01;此前&#…

问卷是否要做信效度分析,5类信度与4类效度常用指标及评价标准

论文问卷进行分析时&#xff0c;大家是否有这样的疑惑—— 我收集的问卷是否需要进行信效度分析呢&#xff1f; 下面一文给大家梳理问卷信效度分析的相关内容&#xff0c;包括什么样的题目需要进行信效度分析、5类信度分析与4类效度分析常用指标及评价标准。 一、问卷是否需…

JW01二氧化碳传感器(串行通信 STM32)

目录 一、介绍 二、传感器原理 1.工作原理介绍 2.串口数据流格式 三、程序设计 main.c文件 usart3.h文件 usart3.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 JW01-CO2检测模块是一种用于检测空气中二氧化碳浓度的传感器模块。它可以广泛应用于室内空气质量…

java算法OJ(1)位运算

目录 1.前言 2.正文 2.1位运算符号 2.1俩数相除 2.1.1题目 2.1.2示例 2.1.3题解 2.2二进制求和 2.2.1题目 2.2.2示例 2.2.3题解 2.3只出现一次的数字 2.3.1题目 2.3.2示例 2.3.3题解 2.4只出现一次的数字&#xff08;进阶版&#xff09; 2.4.1题目 2.4.2示例…

glb数据格式

glb数据格式 glb 文件格式只包含一个glb 文件&#xff0c;文件按照二进制存储&#xff0c;占空间小 浏览 浏览glb工具的很多&#xff0c;ccs&#xff0c;3D查看器等都可以&#xff0c;不安装软件的话用下面网页加载就可以&#xff0c;免费 glTF Viewer (donmccurdy.com) glb…

uniapp小程序中通过uni.setClipboardData实现复制功能无效的原因和解决方案

// 复制下载链接const shareFile (filePath) > {const pdfUrl 复制内容uni.showModal({title: 下载提示,content: 请复制链接到浏览器中下载,confirmColor: #eb2444,confirmText: 复制链接,success(res) {if (res.confirm) {uni.setClipboardData({data: pdfUrl, // url地…

C++: unordered系列关联式容器

目录 1. unordered系列关联式容器1.1 unordered_map1.2 unordered_set 2. 哈希概念3. 哈希冲突4. 闭散列5. 开散列 博客主页: 酷酷学 感谢关注!!! 正文开始 1. unordered系列关联式容器 在C98中&#xff0c;STL提供了底层为红黑树结构的一系列关联式容器&#xff0c;在查询时…

【笔记】自动驾驶预测与决策规划_Part4_时空联合规划

文章目录 0. 前言1. 时空联合规划的基本概念1.1 时空分离方法1.2 时空联合方法 2.基于搜索的时空联合规划 &#xff08;Hybrid A* &#xff09;2.1 基于Hybrid A* 的时空联合规划建模2.2 构建三维时空联合地图2.3 基于Hybrid A*的时空节点扩展2.4 Hybrid A* &#xff1a;时空节…

多线程——“死锁”

目录 前言 一、一个线程&#xff0c;一把锁 1.问题介绍 2.可重入锁 二、两个线程&#xff0c;两把锁 1.问题介绍 2.解决方式 三、N个线程&#xff0c;M把锁 1.哲学家就餐问题 2.解决方式 结尾 前言 “死锁”是多线程代码中一类常见的问题&#xff0c;加锁是能解决线…

plt的简单使用

目录 介绍示例 介绍 plt 是 Python 中 Matplotlib 库的一个常用别名&#xff0c;它表示 pyplot&#xff0c;这是一个用于创建图形和图形的可视化表示的工具。下面是一些 plt 函数的详解和示例&#xff0c;以帮助大家理解和使用。 示例 示例1&#xff1a; import matplotlib…

AV1 Bitstream Decoding Process Specification--[9]:语法结构语义-5

原文地址&#xff1a;https://aomediacodec.github.io/av1-spec/av1-spec.pdf 没有梯子的下载地址&#xff1a;AV1 Bitstream & Decoding Process Specification摘要&#xff1a;这份文档定义了开放媒体联盟&#xff08;Alliance for Open Media&#xff09;AV1视频编解码…

loadrunner个人笔记

创建场景配置&#xff1a; 两个同时 去四&#xff1a;日志、时间、模拟、其他自动事务 加一&#xff1a;首选项 1、写脚本&#xff0c;沟通官方、文件打印扫描 MFI-sw.support.gsd.imsc.sda.globalopentext.com support.casemicrofocus.com 支持资源 | Micro Focus | OpenT…

【毕业论文+源码】基于ASP的课程指导平台的开发

引 言 随着全球信息化技术的兴起&#xff0c;特别是Internet的日益普及&#xff0c;解决了信息Internet上传递的问题&#xff0c;建立了一个组织得很好的信息结构框架&#xff0c;使得Internet用户能够在Internet上的任何一个终端&#xff0c;以一种简单、统一的方式来访问超…

Leetcode Hot 100刷题记录 -Day18(反转链表)

反转链表&#xff1a; 问题描述&#xff1a; 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&a…

视频监控相关笔记

一、QT 之 QTreeWidget 树形控件 Qt编程指南&#xff0c;Qt新手教程&#xff0c;Qt Programming Guide 一个树形结构的节点中的图表文本 、附带数据的添加&#xff1a; QTreeWidgetItem* TourTreeWnd::InsertNode(NetNodeInfo node, QTreeWidgetItem* parent_item) { // …

回文子串通用做法

647. 回文子串 先引出力扣链接 给定一个字符串&#xff0c;你的任务是计算这个字符串中有多少个回文子串。 具有不同开始位置或结束位置的子串&#xff0c;即使是由相同的字符组成&#xff0c;也会被视作不同的子串。示例 1&#xff1a; 输入&#xff1a;"abc" 输出…