- 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
- 📕系列专栏:Spring源码、JUC源码、Kafka原理
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
- 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- Kafka系统基本架构
- broker 服务器
- 生产者 producer
- 消费者 consumer
- 主题 topic 和分区 partition
- topic
- partition 分区
- 分区副本 replica
- 分区 leader
- 分区 follower
- 消息偏移量 offset
- ISR 同步副本列表
- kafka 的数据存储结构
- 物理存储目录结构
- 消息 message 存储结构
- kafka 关键原理加强
- 日志分段切分条件
- controller 控制器
- controller 的职责
- 分区的负载分布
- 分区 Leader 的选举机制
- 生产者原理解析
- 生产者工作流程
- 重要的生产者参数
- acks
- Producer 往 Broker 发送消息应答机制
- acks = 0:
- acks = 1:
- acks = all:
- 重要的生产者参数
- max.request.size
- retries 和 retry.backoff.ms
- compression.type
- batch.size
- linger.ms
- enable.idempotence
- partitioner.class
Kafka系统基本架构
自我推导设计:
- kafka 是用来存数据
- 现实世界数据有分类,所以存储系统也应有数据分类管理功能,如 mysql 的表;kafka 有 topic;
- 如一个 topic 的数据全部交给一台 server 存储和管理,则读写吞吐量有限
- 所以,一个 topic 的数据应该可以分成多个部分(partition)分别交给多台 server存储和管理;
- 如一台 server 宕机,这台 server 负责的 partition 将不可用,所以,一个 partition 应有多个副本;
- 一个 partition 有多个副本,则副本间的数据一致性难以保证,因此要有一个 leader 统领读写;
- 一个 leader 万一挂掉,则该 partition 又不可用,因此还要有 leader 的动态选举机制;
- 集群有哪些 topic,topic 有哪几个分区,server 在线情况,等等元信息和状态信息需要在集群内部及客户端之间共享,则引入了 zookeeper;
- 客户端在读取数据时,往往需要知道自己所读取到的位置,因而要引入消息偏移量维护机制;
broker 服务器
一台 kafka 服务器就是一个 broker。一个 kafka 集群由多个 broker 组成。
生产者 producer
消息生产者,就是向 kafka broker 发消息的客户端。
消费者 consumer
consumer :消费者,从 kafka broker 取消息的客户端。
consumer group:消费组,单个或多个 consumer 可以组成一个消费组;
消费组是用来实现消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段
消费者可以对消费到的消息位置(消息偏移量)进行记录;
老版本是记录在 zookeeper 中;新版本是记录在 kafka 中一个内置的 topic 中(__consumer_offsets)
主题 topic 和分区 partition
topic
Kafka 中存储数据的逻辑分类;你可以理解为数据库中“表”的概念;
比如,将 app 端日志、微信小程序端日志、业务库订单表数据分别放入不同的 topic
partition 分区
topic 中数据的具体管理单元;
- 每个 partition 由一个 kafka broker 服务器管理;
- 每个 topic 可以划分为多个 partition,分布到多个 broker 上管理;
- 每个 partition 都可以有多个副本;
分区对于 kafka 集群的好处是:实现 topic 数据的负载均衡。提高写入、读出的并发度,提高吞吐量。
分区副本 replica
每个 topic 的每个 partition 都可以配置多个副本(replica),以提高数据的可靠性;
每个 partition 的所有副本中,必有一个 leader 副本,其他的就是 follower 副本(observer 副本);follower
定期找 leader 同步最新的数据;对外提供服务只有 leader;
分区 leader
partition replica 中的一个角色,在一个 partition 的多个副本中,会存在一个副本角色为 leader;
producer 和 consumer 只能跟 leader 交互(读写数据)。
分区 follower
partition replica 中的一个角色,它通过心跳通信不断从 leader 中拉取、复制数据(只负责备份)。
如果 leader 所在节点宕机,follower 中会选举出新的 leader;
消息偏移量 offset
partition 中每条消息都会被分配一个递增 id(offset);通过 offset 可以快速定位到消息的存储位置;
kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
因为broker将数据写入分区存储文件时,永远都是追加,所以kafka把自己的数据存储文件称之为log。
ISR 同步副本列表
ISR 概念:(同步副本)。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编 号 , 只 有 跟 得 上 Leader 的 follower 副 本 才 能 加 入 到 ISR 里 面 , 这 个 是 通 过replica.lag.time.max.ms =10000(默认值)参数配置的,只有 ISR 里的成员才有被选为 leader 的可能
踢出 ISR 和重新加入 ISR 的条件:
- 踢出 ISR 的条件: 由 replica.lag.time.max.ms =10000 决定,如上图;
- 重新加入 ISR 的条件: OSR 副本的 LEO(log end offset)追上 leader 的 LEO;
kafka 的数据存储结构
物理存储目录结构
- 存储目录 名称规范: topic 名称-分区号
注:“t1"即为一个 topic 的名称;
而“t1-0 / t1-1"则表明这个目录是 t1 这个 topic 的哪个 partition;
- 数据文件 名称规范:
生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka采取了分片和索引机制 ;
- 每个 partition 的数据将分为多个 segment 存储
- 每个 segment 对应两个文件:“.index"文件和“.log"文件。
index 和 log 文件以当前 segment 的第一条消息的 offset。
index 索引文件中的数据为: 消息 offset -> log 文件中该消息的物理偏移量位置;
Kafka 中的索引文件以稀疏索引( sparse index )的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引;每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096 ,即 4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes 的值,对应地可以缩小或增加索引项的密度;
查询指定偏移量时,使用二分查找法来快速定位偏移量的位置。
消息 message 存储结构
在客户端编程代码中,消息的封装类有两种:ProducerRecord、ConsumerRecord;
简单来说,kafka 中的每个 massage 由一对 key-value 构成:
Kafka 中的 message 格式经历了 3 个版本的变化了:v0 、 v1 、 v2
各个字段的含义介绍如下:
- crc:占用 4 个字节,主要用于校验消息的内容;(循环冗余校验码)
- magic:这个占用 1 个字节,主要用于标识 Kafka 版本。Kafka 0.10.x magic 默认值为 1;
- attributes:占用 1 个字节,这里面存储了消息压缩使用的编码以及 Timestamp 类型。目前 Kafka 支持 gzip、snappy 以及 lz4(0.8.2 引入) 三种压缩格式;[0,1,2]三位 it 表示压缩类型。[3]位表示时间错类型(0,create time;1,append time),[4,5,6,7]位保留;
- key length:占用 4 个字节。主要标识 Key 的内容的长度;
- key:占用 N 个字节,存储的是 key 的具体内容;(相当于读多少位之后这个就是key,知道key有多长就能读对key)
- value length:占用 4 个字节。主要标识 value的内容的长度;
- value:value 即是消息的真实内容,在 Kafka 中这个也叫做 payload。(知道value有多长就能读对value)
通过这些设置,在发送的时候,如果网络中出现了波动,那么我们就能够知道消息是有问题的。
kafka 关键原理加强
日志分段切分条件
(1)当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值log.segment.bytes 参数的默认值为 1073741824,即 1GB
(2)当前日志分段中消息的最小时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高默认情况下,只配置了 log.roll.hours 参数,其值为 168,即 7 天。
(3)偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size .max.bytes 的默认值为 10485760,即 10MB
(4)追加的消息的偏移量与当前日志分段的起始偏移量之间的差值大于 Integer.MAX_VALUE, 即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。
controller 控制器
Controller 简单来说,就是 kafka 集群的状态管理者
在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责维护整个集群中所有分区和副本的状态及分区 leader 的选举。
当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中的控制器选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller 这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中 version 在目前版本中固定为 1,brokerid 表示成为控制器的 broker 的 id 编号,timestamp 表示竞选成为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试去读取 zookeeper 上的/controller 节点的 brokerid 的值,如果读取到 brokerid 的值不为-1,则表示已经有其它 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 Zookeeper 中不存在/controller 这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller 这个节点,当前 broker 去创建节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。
controller 竞选机制:简单说,先来先上!
controller 的职责
监听 partition 相关变化
对 Zookeeper 中的/admin/reassign_partitions 节点注册 PartitionReassignmentListener,用来处理分区重分配的动作。
对 Zookeeper 中的/isr_change_notification 节点注册 IsrChangeNotificetionListener,用来处理ISR 集合变更的动作。
对 Zookeeper 中的/admin/preferred-replica-election 节点添加PreferredReplicaElectionListener,用来处理优先副本选举。
监听 topic 增减变化
对 Zookeeper 中的/brokers/topics 节点添加 TopicChangeListener,用来处理 topic 增减的变化;
对 Zookeeper 中的/admin/delete_topics 节点添加 TopicDeletionListener,用来处理删除 topic 的动作
监听 broker 相关的变化
对 Zookeeper 中的/brokers/ids/节点添加 BrokerChangeListener,用来处理 broker 增减的变化。
更新集群的元数据信息
从 Zookeeper 中读取获取当前所有与 topic、partition 以及 broker 有关的信息并进行相应的管理。对各topic 所对应的
Zookeeper 中的/brokers/topics/[topic]节点添加 PartitionModificationsListener,用来监听 topic 中的分区分配变化。并将最新信息同步给其他所有 broker
启动并管理分区状态机和副本状态机。
如果参数 auto.leader.rebalance.enable 设置为 true,则还会开启一个名为“auto-leader-rebalance-task”
的定时任务来负责维护分区的 leader
分区的负载分布
topic中的每个分区的每个副本及其leader副本,在集群中的众多broker中,如何分布的。
有两大策略:
策略1:kafka自动分布
策略2:topic的创建者手动指定
客户端请求创建一个 topic 时,每一个分区副本在 broker 上的分配,是由集群 controller 来决定;其分布策略源码如下:
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) {
nextReplicaShift += 1
}
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1) {
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex:
Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
副本因子不能大于 Broker 的个数;
partition_0 的第 1 个副本(leader 副本)放置位置是随机从 brokerList 选择的;
其他分区的第 1 个副本(leader)放置位置相对于 paritition_0 分区依次往后移(也就是如果我们有 5 个 Broker,5 个分区,假设 partition0 分区放在 broker4 上,那么 partition1 将会放在 broker5上;patition2 将会放在 broker1 上;partition3 在 broker2,依次类);
各分区剩余的副本相对于分区前一个副本偏移随机数 nextReplicaShift
分区 Leader 的选举机制
分区 leader 副本的选举由控制器 controller 负责具体实施
当创建分区(创建主题或增加分区都有创建分区的动作)或 Leader 下线(此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作。
选举策略:按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中;
一个分区的 AR 集合在 partition 分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变
生产者原理解析
生产者工作流程
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
在主线程中由 kafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
Sender 线程负责从 RecordAccumulator 获取消息并将其发送到 Kafka 中;
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B ,即 32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,RecordAccumulator 内部为每个分区都维护了一个双端队列,即 Deque消息写入缓存时,追加到双端队列的尾部;
Sender 读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。
ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以写入,如果不可以则需要创建一个新的 Producer Batch。在新建ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size参数的大小来创建 ProducerBatch。
如果生产者客户端需要向很多分区发送消息, 则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。
Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List>的形式,其中 Node 表示 Kafka 集群 broker 节点。
对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。
在转换成<Node,List>的形式之后, Sender 会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是 Kafka 各种协议请求;
但是还是存在发送失败的情况,所以请求在从 sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per. connection , 默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。
在上图中还可以看到,当发送成功了之后,会将Response返回给InFlightRequests 中,然后从队列里面将对应的Request移除,说明这个请求就再也不需要存着了。保存的意义其实也就是为了可以重试,当把Response返回的时候,如果失败了,那么其实是可以重试的,并不是无穷的,而是有一个参数的上限。
通过比较 Deque 的 size 这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。
但是可能存在一个有序性的问题。
假如代码中:
send(a);
send(b);
有没有一种可能,到达服务端broker的log数据文件中的时候,文件中的数据长这样?
b
a
比如 a先失败了,进缓存, b成功了,a又发送成功了。也就是producer内部的重试机制,有可能造成数据在服务端存储的乱序。
这个流程不像我们代码里自己设置,如果发送失败了去重试,那样其实就变成同步了,只有一条成功了才继续发吓一条。
重要的生产者参数
acks
acks 是控制 kafka 服务端向生产者应答消息写入成功的条件;
生产者根据得到的确认信息,来判断消息发送是否成功;
那么涉及到ack的有两个问题?其参数起什么作用?ack参数有哪几种类型,分别有什么特点。
Producer 往 Broker 发送消息应答机制
kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副 本 才 算 消 息 发 送 成 功 。可 以 在 构 造 producer 时 通 过 acks 参 数 指 定 ( 在 0.8.2.X 前 是 通过 request.required.acks 参数设置的)。这个参数支持以下三种值:
acks = 0:
意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。
acks = 1:
意味着 leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader 发生崩溃。
acks = all:
(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
acks | 含义 |
---|---|
0 | Producer 往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。 |
1 | Producer 往集群发送数据只要 leader 成功写入消息就可以发送下一条,只确保 Leader 接收成功。 |
-1 或 all | Producer 往集群发送数据需要所有的 ISR Follower 都完成从 Leader 的同步才会发送下一条,确保Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。 |
生产者将 acks 设置为 all,是否就一定不会丢数据呢?(-1是代表所有的ISR副本拿到)
否!如果在某个时刻 ISR 列表只剩 leader 自己了,那么就算 acks=all,收到这条数据还是只有一个节点;
可以配合另外一个参数缓解此情况: 最小同步副本数>=2
BROKER 端参数: min.insync.replicas(默认 1)
如果让min.insync.replicas = 分区副本总数,那么这个分区的可用性就会极大程度的降低(经常不可用,因为只要有一个副本掉队,那么整个分区就不可用了,导致分区经常不能读写)
所以综上所述,kafka中没有一个尽善尽美的方案,都是在取舍中进行的。
但是还存在一个问题,就是如果我的数据又不能丢,产生数据的速度还特别快,那么一定得配置 ack = -1
但是数据写入速度就跟不上产生得速度,那就会在我的生产端出现数据积压。
解决办法就是:**加分区,扩集群 ** 生产者写的时候,有的写0 有的写1 有的写2 这样就增加了吞吐量。也就是加分区增加并行度,扩展了吞吐量。
一般情况 配置 ack = 1,中庸之道,毕竟节点不可能老挂。
kafka本身,作为流失计算的缓冲用的,流式计算,多半是无法保证百分之一百正确的。
重要的生产者参数
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB 一般情况下,这个默认值就可以满足大多数的应用场景了。
这个参数还涉及一些其它参数的联动,比如 broker 端(topic 级别参数)的 message.max.bytes 参数( 默 认 1000012), 如 果 配 置 错 误 可 能 会 引 起 一 些 不 必 要 的 异 常 ; 比 如 将 broker 端 的message.max.bytes 参数配置为 10 ,而 max.request.size 参数配置为 20,那么当发送一条大小为 15B的消息时,生产者客户端就会报出异常;
retries 和 retry.backoff.ms
retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
如果将 retries 参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于 1 的值,那可能会出现错序的现象:如果批次 1 消息写入失败,而批次 2 消息写入成功,那么生产者会重试发送批次 1 的消息,此时如果批次 1 的消息写入成功,那么这两个批次的消息就出现了错序。对于某些应用来说,顺序性非常重要 ,比如 MySQL binlog 的传输,如果出现错误就会造成非常严重的后果;
一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为 1 ,而不是把 retries 配置为 0,不过这样也会影响整体的吞吐。
compression.type
这个参数用来指定消息的压缩方式,默认值为“none ",即默认情况下,消息不会被压缩。
该参数还可以配置为 “gzip”,“snappy” 和 “lz4”。
对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。
**消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;这个也是一种权衡 **
batch.size
每个 Batch 要存放 batch.size 大小的数据后,才可以发送出去。比如说 batch.size 默认值是 16KB,那么里面凑够 16KB 的数据才会发送。
理论上来说,提升 batch.size 的大小,可以允许更多的数据缓冲在 recordAccumulator 里面,那么一次Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。
但是 batch.size 也不能过大,要是数据老是缓冲在 Batch 里迟迟不发送出去,那么发送消息的延迟就会很高。
一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。
所以对应参数的设置需要 最佳实践 和 均衡问题来实现。
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入ProducerBatch 时间,默认值为 0。生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms
增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
linger.ms = 0
batchsize = 100
那么是不是batchsize 就无效了。
设置了linger.ms = 0 理论上应该马上走,但是此时 sender线程忙不过来,所以只能阻塞着。
linger.ms = 0 相当于一旦有车就立马接走,没车就阻塞着。
enable.idempotence
是否开启幂等性功能,详见后续原理加强;
int a = 1;
a++; // 非幂等操作
val map = new HashMap()
map.put(“a”,1); // 幂等操作
在 kafka 中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性!
所以,producer内部有重试机制
1.有可能造成服务端数据的乱序
2.有可能造成服务端数据的重复
partitioner.class
用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner
默认分区器的分区规则:
- 如果数据中有 key,则按 key 的 murmur hash 值 % topic 分区总数得到目标分区
- 如果数据只有 value,则在各个分区间
自定义 partitioner 需要实现 org.apache.kafka.clients.producer.Partitioner 接口
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
// 这里可以根据自己的业务逻辑计算出要发送到哪个分区
int numPartitions = cluster.partitionCountForTopic(topic);
if (key == null) {
return 0;
}
return Math.abs(key.hashCode() % numPartitions);
}
@Override
public void close() {
// do nothing
}
@Override
public void configure(Map<String, ?> configs) {
// do nothing
}
}
使用自定义的Partitioner也很简单,只需要在创建Producer时指定即可。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props);