生产者原理解析
生产者工作流程图:
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka 中;
RecordAccumulator主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B ,即32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即60秒。
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,
RecordAccumulator内部为每个分区都维护了一个双端队列,即Deque。
消息写入缓存时,追加到双端队列的尾部;
Sender读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;
与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。
ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看 ProducerBatch中是否还可以写入这个ProducerRecord,如果可以写入就直接写入,如果不可以则需要创建一个新的Producer Batch。在新建 ProducerBatch时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch。
Sender从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群broker节点。对于网络连接来说,生产者客户端是与具体broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node, List>的形式之后, Sender会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个Node了,这里的Request是Kafka各种协议请求;
请求在从sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<Nodeld, Deque>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per. connection ,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。通过比较 Deque 的size与这个参数的大小来判断对应的 Node中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。
Producer往Broker发送消息应答机制
kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在构造producer 时通过acks参数指定(在 0.8.2.X 前是通过 request.required.acks 参数设置的)。这个参数支持以下三种值:
- acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。
- acks = 1:意味着leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader发生崩溃。
- acks = all(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
acks | 含义 |
---|---|
0 | Producer往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。 |
1 | Producer往集群发送数据只要 leader成功写入消息就可以发送下一条,只确保Leader 接收成功。 |
-1或all | Producer往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。 |
生产者将acks设置为all,是否就一定不会丢数据呢?
否!如果在某个时刻ISR列表只剩leader自己了,那么就算acks=all,收到这条数据还是只有一个点;
可以配合另外一个参数缓解此情况: 最小同步副本数>=2
其他的生产者参数
-
acks
acks是控制kafka服务端向生产者应答消息写入成功的条件;生产者根据得到的确认信息,来判断消息发送是否成功; -
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 1MB
一般情况下,这个默认值就可以满足大多数的应用场景了。
这个参数还涉及一些其它参数的联动,比如 broker 端(topic级别参数)的 message.max.bytes参数(默认1000012),如果配置错误可能会引起一些不必要的异常;比如将 broker 端的 message.max.bytes 参数配置为10B ,而 max.request.size参数配置为20B,那么当发送一条大小为 15B 的消息时,生产者客户端就会报出异常; -
retries和retry.backoff.ms ==> 间隔时间 避免无效的重试
retries参数用来配置生产者重试的次数,默认值为2147483647,即在发生异常的时候进行任何重试动作。
消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试 。如果将 retries参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于1的值,那可能会出现错序的现象:如果批次1消息写入失败,而批次2消息写入成功,那么生产者会重试发送批次1的消息,此时如果批次1的消息写入成功,那么这两个批次的消息就出现了错序。
对于某些应用来说,顺序性非常重要 ,比如MySQL binlog的传输,如果出现错误就会造成非常严重的后果;一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection 配置为1 ,而不是把retries配置为0,不过这样也会影响整体的吞吐。 -
compression.type
这个参数用来指定消息的压缩方式,默认值为“none",即默认情况下,消息不会被压缩。该参数还可以配置为 “gzip”,“snappy” 和 “lz4”。对消息进行压缩可以极大地减少网络传输、降低网络I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩; -
batch.size
每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在recordAccumulator里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。 -
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 时间,默认值为0。生产者客户端会在ProducerBatch填满或等待时间超过linger.ms 值时发送出去。 -
enable.idempotence
是否开启幂等性功能,详见后续原理加强;
幂等性,就是一个操作重复做,也不会影响最终的结果!
int a = 1;
a++; // 非幂等操作
val map = new HashMap()
map.put(“a”,1); // 幂等操作
在kafka中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性! -
partitioner.class
用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner
自定义partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口
消费者组再均衡分区分配策略
会触发rebalance(消费者)的事件可能是如下任意一种:
- 有新的消费者加入消费组。
- 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
- 有消费者主动退出消费组(发送LeaveGroupRequest 请求):比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
- 消费组所对应的 GroupCoorinator节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何rebalance也涉及到分区分配策略。
kafka有两种的分区分配策略:range(默认) 和 roundrobin(新版本中又新增了另外2种)
我们可以通过partition.assignment.strategy参数选择 range 或 roundrobin。
partition.assignment.strategy参数默认的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
Range Strategy
- 先将消费者按照client.id字典排序,然后按topic逐个处理;
- 针对一个topic,将其partition总数/消费者数得到商n和 余数m,则每个consumer至少分到n个分区,且前m个consumer每人多分一个分区;
Round-Robin Strategy
- 将所有主题分区组成TopicAndPartition列表,并对TopicAndPartition列表按照其hashCode 排序
- 然后,以轮询的方式分配给各消费者
Sticky Strategy
对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:
- 要去达成最大化的均衡
- 尽可能保留各消费者原来分配的分区
再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)
Cooperative Sticky Strategy
对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:
- 逻辑与sticky策略一致
- 支持cooperative再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配)
消费者组再均衡流程
消费组在消费数据的时候,有两个角色进行组内的各事务的协调;
角色1: Group Coordinator (组协调器) 位于服务端(就是某个broker)
组协调器的定位:
coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上
查找Group Coordinator的方式:
先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets 中的分区编号; 分区数
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数offset.topic.num.partitions来配置,默认值是50;
找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;
角色2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)
再均衡流程
eager协议的再均衡过程整体流程如下图:
Cooperative协议的再均衡过程整体流程如下图:
再均衡监听器
代码示例:
package com.doitedu;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
/**
* 消费组再均衡观察
*/
public class ConsumerDemo2 {
public static void main(String[] args) {
//1.创建kafka的消费者对象,附带着把配置文件搞定
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.订阅主题(确定需要消费哪一个或者多个主题)
//我现在想看看如果我的消费者组里面,多了一个消费者或者少了一个消费者,他有没有给我做再均衡
consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
/**
* 这个方法是将原来的分配情况全部取消,或者说把所有的分区全部回收了
* 这个全部取消很恶心,原来的消费者消费的好好的,他一下子就给他全部停掉了
* @param collection
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("我原来的均衡情况是:"+collection + "我已经被回收了!!");
}
/**
* 这个方法是当上面的分配情况全部取消以后,调用这个方法,来再次分配,这是在均衡分配后的情况
* @param collection
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
System.out.println("我是重新分配后的结果:"+collection);
}
});
while (true){
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
}
}
}