Kafka入门-基础概念及参数

news2025/1/13 10:18:41

一、Kafka术语

        Kafka属于分布式的消息引擎系统,它的主要功能是提供一套完备的消息发布与订阅解决方案。可以为每个业务、每个应用甚至是每类数据都创建专属的主题。

        Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成,Broker负责接收和处理客户端发送过来的请求, 以及对消息进行持久化。常见的做法是将不同的Broker分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有Broker进程都挂掉了,其他机器上的Broker也依然能够对外提供服务。这其实就是Kafka提供高可用的手段之一。

实现高可用的另一个手段就是备份机制(Replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在Kafka中被称为副本 (Replica)。Kafka定义了两 类副本:领导者副本(Leader Replica)追随者副本(Follower Replica)

        领导者副本(Leader Replica):提供与客户端程序进行交互的作用。

        追随者副本(Follower Replica):不能与外界进行交互,只是被动地追随领导者副本。

副本的工作机制生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息追随者副本向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步

虽然有了副本机制可以保证数据的持久化或消息不丢失,但没有解决伸缩性的问题。伸缩性即所谓的Scalability,是分布式系统中非常重要且必须要谨慎对待的问题。什么是伸缩性呢?我们拿副本来说,虽然现在有了领导者副本和追随者副本,但倘若领导者副本积累了太多的数据以至于单台Broker机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的Broker上? 这种机制就是所谓的分区(Partitioning)

Kafka中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中。Kafka的分区编号是从0开始的,如果Topic有100个分区,那么它们的分区号就是从0到99

副本如何与这里的分区联系在一起呢;实际上,副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0、1、2、…、9。

Kafka的三层消息架构:

       1.  第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本。

       2.  第二层是分区层,每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N-1个副本是追随者副本,只是提供数据冗余之用。

        3. 第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增。

 最后,客户端程序只能与分区的领导者副本进行交互。

Kafka Broker是如何持久化数据的。

        总的来说,Kafka使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Appendonly)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也是实现Kafka高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此Kafka必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment) 机制。在Kafka底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

总结:

        消息:Record。服务代理节点,Kafka服务实例。 n个组成一个Kafka集群,通常一台机器部署一个Kafka实例,一个实例挂了其他实例仍可以使用,体现了高可用。

        主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

        分区:Partition。一个topic 可以拥有若干个partition(从 0 开始标识partition ),分布在不同的broker 上, 实现发布与订阅时负载均衡。producer 通过自定义的规则将消息发送到对应topic 下某个partition,以offset标识一条消息在一个partition的唯一性。一个partition拥有多个replica,提高容灾能力。partition在机器磁盘上以log 体现,采用顺序追加日志的方式添加新消息、实现高吞吐量

        消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。

        副本:Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。leader副本负责读写请求,follower 副本负责同步leader副本消息,通过副本选举实现故障转移。

        生产者:Producer。向主题发布新消息的应用程序。

        消费者:Consumer。从主题订阅新消息的应用程序。

        消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。

        消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。

        重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。

二、集群参数配置

(1)Broker端参数

Broker端参数也被称为静态参数(Static Configs)。静态参数,是指你必须在Kafka的配置文件server.properties中进行设置的参数,不管你是新增、修改还是删除。同时,你必须重启Broker进程才能令它们生效。

1. 针对存储信息的重要参数

Broker是需要配置存储信息的,即Broker使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:

        log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。

        log.dir:注意这是dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。

只需要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个CSV格式, 也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

        1. 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。

        2. 能够实现故障转移:即Failover。这是Kafka 1.1版本新引入的强大功能。要知道在以前,只要Kafka Broker使用的任何一块磁盘挂掉了,整个Broker进程都会关闭。但是自1.1开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker还能正常工作。

2. 与ZooKeeper相关的设置 

ZooKeeper是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。 

Kafka与ZooKeeper相关的最重要的参数当属zookeeper.connect。这也是一个CSV格式的参数,如指定它的值为zk1:2181,zk2:2181,zk3:2181。2181是ZooKeeper的默认端口。

让多个Kafka集群使用同一套ZooKeeper集群,那么这个参数应该怎么设置呢?

如果你有两套Kafka集群,假设分别叫它们kafka1和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

3. 与Broker连接相关的

即客户端程序或其他Broker如何与该Broker进行通信的设置。有以下三个参数:

        listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。格式为<协议名称,主机名,端口号>,比如CONTROLLER: //localhost:9092。一旦自定义了协议名称,还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示 CONTROLLER这个自定义协议底层使用明文不加密传输数据。

         advertised.listeners:和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,就是说这组监听器是Broker用于对外发布的。

        host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

最好全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。

4. 第四组参数是关于Topic管理

        auto.create.topics.enable:是否允许自动创建Topic。

        unclean.leader.election.enable:是否允许UncleanLeader选举。

        auto.leader.rebalance.enable:是否允许定期进行Leader选举。

auto.create.topics.enable最好设置成false,即不允许自动创建Topic。

unclean.leader.election.enable是关闭UncleanLeader选举的。何谓Unclean?还记得Kafka有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的Leader副本。这些副本只有保存数据比较多的那些副本才有资格竞选。如果设置成false坚决不能让那些落后太多的副本竞选Leader。这样做的后果是这个分区就不可用了,因为没有Leader了。反之如果是true,那么Kafka允许你从那些“跑得慢”的副本中选一个出来当Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全。建议把它设置成false。

auto.leader.rebalance.enable设置它的值为true表示允许Kafka定期地对一些Topic分区进行Leader重选举,需要满足一定的条件才会发生。严格来说它与上一个参数中Leader选举的最大不同在于,它不是选Leader,而是换Leader!比如Leader A一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后Leader A就要被强行卸任换成Leader B。所以建议设置成false。

4. 数据留存方面

        log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hours最低。

        log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。

        message.max.bytes:控制Broker能够接收的最大消息大小。

log.retention.{hours|minutes|ms}三兄弟,举例:log.retention.hours=168表示默认保存7天的数据,自动删除7天前的数据。很多公司把Kafka当做存储来使用,那么这个值就要相应地调大。

log.retention.bytes,这个值默认是-1,表明你想在这台Broker上保存多少数据都可以,至少在容量方面Broker绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的Kafka集群:设想你要做一个云上的Kafka服务,每个租户只能使用100GB的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。 

(2)Topic级别参数

如果同时设置了Topic级别参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数的值,而每个Topic都能设置自己的参数值,这就是所谓的Topic级别参数。

Topic级别参数的设置有两种方式可以设置:

        * 创建Topic时进行设置

        * 修改Topic时设置

1. 保存消息(创建时)

        retention.ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值。

        retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。

例1:

        设想你的部门需要将交易数据发送到Kafka进行处理,需要保存最近半年的交易数据,同时这些 数据很大,通常都有几MB,但一般不会超过5MB。现在让我们用以下命令来创建Topic: 

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

Kafka开放了kafka-topics命令供我们来创建Topic即可。对于上面这样一条命令,请注意结尾处的--config设置,我们就是在config后面指定了想要设置的Topic级别参数。

例2(推荐使用):

        自带的命令kafka-configs来修改Topic级别参数。假设我们现在要发送最大值是10MB的消息,该如何修改呢?命令如下:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

(3)JVM参数

Kafka服务器端代码是用Scala语言编写的,但终归还是编译成Class文件JVM上运行,因此JVM参数设置对于Kafka集群的重要性不言而喻。

个人通用的建议:将JVM堆大小设置成6GB

kafka设置下面这两个环境变量:

        KAFKA_HEAP_OPTS:指定堆大小。

        KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。

例:在启动Kafka Broker之前,先设置上这两个环境变量:

$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g

$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true

$> bin/kafka-server-start.sh config/server.properties

(4)操作系统参数

Kafka并不需要设置太多的OS参数,但有些因素最好还是关注一下,比如下面这几个:

        文件描述符限制

        文件系统类型

        Swappiness

        提交时间

文件描述符限制(ulimit -n): 其实设置这个参数一点 都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。例如:ulimit -n 1000000

文件系统类型:指的是如ext3ext4XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能要强于ext4,可以自行设置。

swap的调优:设置其为0,将swap完全禁掉以防止Kafka进程使用swap空间;尽量不要设置成0比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成0,当物理内存耗尽时,操作系统会触发OOMkiller这个组件,它会随机挑选一个进程然后kill掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用swap空间时,你至少能够观测到Broker性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,建议将swappniess配置成一个接近0但不为0的值,比如1。

提交时间:或者说是Flush落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1863783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一款开源、高颜值的AI物联网数据平台

介绍 AIOT人工智能物联网平台是一站式物联网开发基础平台&#xff0c;帮助企业快速实现数字化、精细化数据管理。核心系统为&#xff1a;物联网平台 数据中台&#xff08;数据底座&#xff09; AI。 同时支持文生图、语音合成等。大模型支持陆续也会慢慢开发。 物联系统介绍…

STM32存储左右互搏 模拟U盘桥接QSPI总线FATS读写FLASH W25QXX

STM32存储左右互搏 模拟U盘桥接QSPI总线FATS读写FLASH W25QXX STM32的USB接口可以模拟成为U盘&#xff0c;通过FATS文件系统对连接的存储单元进行U盘方式的读写。 这里介绍STM32CUBEIDE开发平台HAL库模拟U盘桥接Quad SPI总线FATS读写W25Q各型号FLASH的例程。 FLASH是常用的一种…

【学习】科大睿智解读ITSS通过后仍需关注和改进IT服务的原因

为了确保IT服务的质量和效率&#xff0c;很多企业拿到ITSS资质证书后&#xff0c;仍然需要持续关注和改进IT服务&#xff0c;科大睿智总结主要原因有以下几点&#xff1a; 1、随着企业发展业务和市场行情的变化&#xff0c;可能涉及到运维服务中新的业务流程、技术需求或者用户…

[深度学习] 门控循环单元GRU

门控循环单元&#xff08;Gated Recurrent Unit, GRU&#xff09;是一种用于处理序列数据的递归神经网络&#xff08;Recurrent Neural Network, RNN&#xff09;变体&#xff0c;它通过引入门控机制来解决传统RNN在处理长序列时的梯度消失问题。GRU与长短期记忆网络&#xff0…

Arduino 旋转编码器

Arduino 旋转编码器 电位计 Arduino - Rotary Encoder In this tutorial, we are going to learn how to use the incremental encoder with Arduino. In detail, we will learn: 在本教程中&#xff0c;我们将学习如何将增量编码器与Arduino一起使用。详细来说&#xff0c;…

iptables(11)target(SNAT、DNAT、MASQUERADE、REDIRECT)

简介 前面我们已经介绍了ACCEPT、DROP、REJECT、LOG,这篇文章我们介绍SNAT、DNAT、MASQUERADE、REDIRECT,这几个参数的定义我们在上篇文章中都有介绍,我这里再列出回顾一下 DNAT(目标地址转换)和 SNAT(源地址转换) 原理:修改数据包的源或目标 IP 地址。通常用于 NAT(…

Maven高级理解属性

属性 在这一章节内容中&#xff0c;我们将学习两个内容&#xff0c;分别是 属性版本管理 属性中会继续解决分模块开发项目存在的问题&#xff0c;版本管理主要是认识下当前主流的版本定义方式。 4.1 属性 4.1.1 问题分析 讲解内容之前&#xff0c;我们还是先来分析问题: …

Games101 透视投影矩阵推导

目录 齐次坐标 透视投影 透视投影的四棱锥体挤压为正交投影的长方体 变换规定 转换过程 观察1 观察2 关于任意一点挤压后向哪里移动的问题&#xff0c;简单推导了一下 齐次坐标 如下&#xff0c;(x, y, z, 1) 表示空间中的xyz点&#xff0c;让它每个分量乘以k&#…

MySQL 基础概念

MySQL逻辑架构 MySQL 服务器逻辑架构图 最上层的服务并不是MySQL所独有的&#xff0c;大多数基于网络的客户端/服务器的工具或者服务都有类似的架构&#xff0c;比如连接管理、授权认证、安全等等。 大多数MySQL的核心服务都在第二层&#xff0c;包括查询解析、分析、优化、…

空间的维度

空间的维度----中科院科学智慧火花 当今世界科学前沿&#xff0c;最具有挑战性和最激动人心的理论&#xff0c;莫过于弦理论&#xff0c;他不仅仅融合了相对论和量子理论&#xff0c;甚至被认为是终极理论。弦理论最核心的内容就是多维空间。由于时间和空间概念必然要写进物理…

Echarts 图表添加点击事件跳转页面,但只有图表部分点击才会跳转页面,坐标轴,区域缩放等点击不跳转。

默认的点击事件是这样的&#xff1a; myChart.on(click, function (param) {console.log(param) }) 这个事件需要点击具体图形才会触发&#xff0c;例如我上面的图&#xff0c;想选择a柱子&#xff0c;就需要明确点击到柱体才行&#xff0c;明显不符合正常的预期&#xff0c;正…

FullCalendar日历组件集成实战(16)

背景 有一些应用系统或应用功能&#xff0c;如日程管理、任务管理需要使用到日历组件。虽然Element Plus也提供了日历组件&#xff0c;但功能比较简单&#xff0c;用来做数据展现勉强可用。但如果需要进行复杂的数据展示&#xff0c;以及互动操作如通过点击添加事件&#xff0…

【Linux】Wmware Esxi磁盘扩容

目录 一、概述 1.1 磁盘分区概念 1.2 LVM概念 二、扩容步骤 二、报错 一、概述 1.1 磁盘分区概念 在 Linux 中&#xff0c;每一个硬件设备都映射到一个系统的文件&#xff0c;对于硬盘、光驱等 IDE 或 SCSI 设备也不例外。Linux把各种 IDE 设备分配了一个由 hd 前缀组成的文…

使用VBA隐藏图表中的系列

Excel中很多图表相关的操作&#xff0c;并不能通过录制宏得到代码&#xff0c;这个场景中&#xff0c;如下希望开发代码实现自动化&#xff0c;就会无从下手&#xff0c;其实只要找到相关的属性和方法&#xff0c;代码可能并不复杂。 Excel的线图如下所示&#xff0c;其中有三…

每日一学(面试考题)

1、ConCurrentHashMap为什么不允许key为null&#xff1f; 底层 putVal方法 中 如果key || value为空 抛出空指针异常 其实是为了避免在多线程并发场景下的歧义问题 在获取key 返回结果为null 无法判断是 put&#xff08;k&#xff0c;v&#xff09;的时候 value本身是n…

Python+Pytest+Allure+Yaml接口自动化测试框架详解

PythonPytestAllureYaml接口自动化测试框架详解 编撰人&#xff1a;CesareCheung 更新时间&#xff1a;2024.06.20 一、技术栈 PythonPytestAllureYaml 版本要求&#xff1a;Python3.7.0,Pytest7.4.4,Allure2.18.1,PyYaml6.0 二、环境配置 1、安装python3.7&#xff0c;并配置…

探索ONLYOFFICE桌面编辑器8.1:更强大的办公软件(新功能全新详解)

引入 时间到达2024年&#xff0c;办公软件已经成为不可或缺的的一部分。想到办公软件不知道大家首先想到那些产品 office 亦或是 WPS&#xff0c;但一个前者需要购买才能使用完整服务&#xff0c;一个漫天的弹广告不充会员什么都用不了。那难道世面上就没有一块正在好用无广告的…

一天跌20%,近500只下跌,低价可转债为何不香了?

6月以来&#xff0c;Wind可转债低价指数累计下跌7.3%&#xff0c;大幅跑输中价、高价转债。分析认为&#xff0c;市场调整的底层逻辑在于投资者对风险的重新评估和流动性的紧缩&#xff0c;宏观经济的波动和政策环境的不确定性、市场结构性的变化均对低价可转债市场产生了冲击。…

【 IM 服务】IM 翻译服务介绍

融云控制台 IM 翻译功能入口&#xff1a;IM 翻译 融云即时通讯业务提供 IM 翻译 插件&#xff0c;可为 IMLib 与 IMKit SDK 快速接入外部翻译服务&#xff0c;由融云服务端负责对接外部翻译服务供应商的鉴权、API 调用、账号管理、计费等流程。 提示&#xff1a; 1、该插件仅支…

COMSOL - 一个点光源是否总能照亮整个房间?

20 世纪 50 年代&#xff0c;数学家恩斯特施特劳斯&#xff08;Ernst Straus&#xff09;提出了一个有趣的问题&#xff1a;在一个侧壁由理想反射镜构成的任意形状的空房间里&#xff0c;一个点光源是否总能照亮整个房间&#xff1f;诺贝尔奖获得者罗杰彭罗斯&#xff08;Roger…