文章目录
- 一、Controller选举
- 二、生产消息
- 2.1、创建待发送数据
- 2.2、创建生产者对象,发送数据
- 2.3、发送回调
- 2.3.1、异步发送
- 2.3.2、同步发送
- 2.4、拦截器
- 2.5、序列化器
- 2.6、分区器
- 2.7、消息可靠性
- 2.7.1、acks = 0
- 2.7.2、acks = 1(默认)
- 2.7.3、acks = -1或all
- 2.8、部分重要的生产者参数
- 2.8.1、linger.ms
- 2.9、Producer总结
如果想要先了解kafka的基础基础架构,可以看 什么是topic、broker、分区、副本
一、Controller选举
一句话总结:先到先得。
Controller,是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。下面介绍一下Zookeeper的特点:
- 一个是在ZooKeeper软件中创建节点Node,创建一个Node时,我们会设定这个节点是持久化创建,还是临时创建。所谓的持久化创建,就是Node一旦创建后会一直存在,而临时创建,是根据当前的客户端连接创建的临时节点Node,一旦客户端连接断开,那么这个临时节点Node也会被自动删除,所以这样的节点称之为临时节点。
- ZooKeeper节点是不允许有重复的,所以多个客户端创建同一个节点,只能有一个创建成功。
- 另外一个是客户端可以在ZooKeeper的节点上增加监听器,用于监听节点的状态变化,一旦监听的节点状态发生变化,那么监听器就会触发响应,实现特定监听功能。
集群中的任意一台Broker都能充当Controller的角色,但是,在整个集群运行过程中,只能有一个Broker成为Controller。也就是说,每个正常运行的Kafka集群,在任何时刻都有且只有一个Controller。
最先在Zookeeper上创建临时节点/controller成功的Broker就是Controller。Controller重度依赖Zookeeper,依赖zookeepr保存元数据,依赖zookeeper进行服务发现。Controller大量使用Watch功能实现对集群的协调管理。如果此时,作为Controller的Broker节点宕掉了。那么zookeeper的临时节点/controller就会因为会话超时而自动删除。而监控这个节点的Broker就会收到通知而向ZooKeeper发出创建/controller节点的申请,一旦创建成功,那么创建成功的Broker节点就成为了新的Controller。
有一种特殊的情况,就是Controller节点并没有宕掉,而是因为网络的抖动,不稳定,导致和ZooKeeper之间的会话超时,那么此时,整个Kafka集群就会认为之前的Controller已经下线(退出)从而选举出新的Controller,而之前的Controller的网络又恢复了,以为自己还是Controller了,继续管理整个集群,那么此时,整个Kafka集群就有两个controller进行管理,那么其他的broker就懵了,不知道听谁的了,这种情况,我们称之为脑裂现象,为了解决这个问题,Kafka通过一个任期(epoch:纪元)的概念来解决,也就是说,每一个Broker当选Controller时,会告诉当前Broker是第几任Controller,一旦重新选举时,这个任期会自动增1,那么不同任期的Controller的epoch值是不同的,那么旧的controller一旦发现集群中有新任controller的时候,那么它就会完成退出操作(清空缓存,中断和broker的连接,并重新加载最新的缓存),让自己重新变成一个普通的Broker。
二、生产消息
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并发送到kafka中。
RecordAccumulator主要是用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为32MB
。
我们采用Java代码通过Kafka Producer API
的方式生产数据
2.1、创建待发送数据
在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型ProducerRecord(其实就是一个类):
相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而Key可以用于数据的分区定位。
2.2、创建生产者对象,发送数据
根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。
1、数据生产者
生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者。这里我们简单介绍一下内部的数据转换处理:
- 如果配置拦截器栈,那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
- 因为发送的数据为key-value数据,所以需要根据配置信息中的序列化对象对数据中key和value分别进行序列化处理。
- 计算数据所发送的分区位置。
- 将数据追加到数据收集器中。
2、数据收集器
用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
- 默认情况下,一个发送批次(下图中的
ProducerBatch
)的数据容量为16K
(不是指一个批次的最大容量就是16K,而是指超过16K后当前批次就不接收新数据了,每个批次是要保证数据完整性的),这个可以通过参数batch.size
进行改善。 - 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
- 如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送。
3、数据发送器(Sender线程)
线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中。
- 对网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层到网络IO层面的转换:即将原本
<分区,Deque<ProducerBatch>>
保存形式转变为<node,List<ProducerBatch>>
)。 - 将组合后的
<节点,List<ProducerBatch>>
的数据封装成客户端请求发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。 - Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。
1.数据量超过批次大小阈值(可通过参数配置),2.数据在缓冲区停留时间超过时间阈值(可通过参数配置),满足任一条件sender线程即发数据到kafka服务器。
2.3、发送回调
Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或Lambda表达式都可以。
public class KafkaProducerASynTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 10; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
});
}
producer.close();
}
}
结果:
2.3.1、异步发送
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
public class KafkaProducerASynTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 10; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
});
// TODO 发送当前数据
System.out.println("发送数据");
}
producer.close();
}
}
结果:
2.3.2、同步发送
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。
采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现的。实际上send()方法就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送结果。下面的示例中在执行send方法后直接调用get方法来阻塞等待kafka的响应,直到消息发送成功,或者发生异常。
public class KafkaProducerASynTest {
public static void main(String[] args) throws Exception {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 10; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
}).get();
// TODO 发送当前数据
System.out.println("发送数据");
}
producer.close();
}
}
结果:
通常,一个kafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用kafkaProducer的close()方法来回收资源。close()方法会阻塞等待之前所有的发送请求完成后再关闭kafkaProducer。
消息在通过send()方法发往broker的过程中,有可能经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正的发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
2.4、拦截器
生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验、整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以大家可以根据实际的情况自行选择使用。
但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。也可以通过实现ProducerInterceptor
接口来自定义拦截器。
2.5、序列化器
生产者需要使用序列化器(Serializer)把对象转换为字节数组才能通过网络发送给kafka,而在对侧,消费者需要使用反序列化器(Desirializer)把从kafka获得的字节数组转换为相应的对象。在上面的代码例子中,为了方便,消息的key和value都使用了字符串,对应程序中的序列化器也使用了客户端自带的StringSerializer,除了使用String类型的序列化器,还有其他类型的序列化器。当然也可以通过实现Serializer
接口来自定义序列化器。
2.6、分区器
Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时,也可以指定数据存储的分区编号。
指定分区传递数据是没有任何问题的。Kafka会进行基本简单的校验,比如是否为空,是否小于0之类的,但是你的分区是否存在就无法判断了,所以需要从Kafka中获取集群元数据信息,此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况,不指定分区还是一个不错的选择。
如果不指定分区,Kafka会根据集群元数据中的主题分区来通过算法来计算分区编号并设定,当然还可以通过实现Partitioner来自定义分区器,kafka分区的选择流程如下:
- 如果指定了分区,直接使用。
- 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用。
- 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号。
- 如果未指定数据Key,或不使用Key选择分区,那么Kafka会采用优化后的粘性分区策略进行分区选择。
2.7、消息可靠性
对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。
但是在更多的场景中,我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。
而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果我们也可以简称为ACK应答。根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置。acks是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。
2.7.1、acks = 0
当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,明显可以看出,这种应答方式,数据已经通过网络给Kafka发送了,但这其实并不能保证Kafka能正确地接收到数据,在传输过程中如果网络出现了问题,那么数据就丢失了。也就是说这种应答确认的方式,数据的可靠性是无法保证的。不过相反,因为无需等待Kafka服务节点的确认,通信效率倒是比较高的,也就是系统吞吐量会非常高。
2.7.2、acks = 1(默认)
当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件(保存到磁盘)后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经存储到了分区Leader副本中,那么数据相对来讲就比较安全了,也就是可靠性比较高。之所以说相对来讲比较安全,就是因为现在只有一个节点存储了数据,而数据并没有来得及进行备份到follower副本,那么一旦当前存储数据的broker节点出现了故障,数据也依然会丢失。
2.7.3、acks = -1或all
当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经同时存储到了分区Leader副本和follower副本中,那么数据已经非常安全了,可靠性也是最高的。此时,如果Leader副本出现了故障,那么follower副本能够开始起作用,因为数据已经存储了,所以数据不会丢失。
不过这里需要注意,如果假设我们的分区有5个follower副本,编号为1,2,3,4,5
但是此时只有3个副本处于和Leader副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们简称为ISR
。此时,Kafka只要保证ISR中所有的4个副本接收到了数据,就可以对数据请求进行响应了。无需5个副本全部收到数据。
2.8、部分重要的生产者参数
2.8.1、linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。增大这个参数的值会增加消息的延迟,但是同事能提升一定的吞吐量。
还有很多其他重要的生产者参数,可以参考相关帖子或书籍。
2.9、Producer总结
对于KafkaProducer而言,它是线程安全的,我们可以在多线程的环境中复用它,但是对于消费者客户端KafkaConsumer而言,它是非线程安全的,因为它具备了状态,具体怎么使用可以参见后面的文章。