本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白
文章目录
- 数据可靠性
- 数据不重复
- 数据有序性
数据可靠性
首先数据的可靠性指的是:
- 消息不会意外丢失
- 消息不会重复传递
那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
1: 同理,leader收到了,还没应答时挂了,也会丢数据
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
这就引出了ISR队列的概念了
ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的
那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?
主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。
我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件
- 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
数据不重复
上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
kafka默认启用数据幂等性,即设置 enable.idempotence = true
在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条
PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID
Partition:又叫分区编号,即这条消息要发往的分区的paritionid
SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)
这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。
但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?
这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释
我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送
注意:开启事务,必须开启幂等性。
kafka使用事务,有5个API
// 初始化事务
void initializeTransactions ();
// 开启事务
void beginTransaction () throws ProducerFencedException;
// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction () throws ProducerFencedException;
// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;
举个例子:
package com.atguigu.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Test {
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put("bootstrap.servers", "hadoop102:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
properties.put("transactional.id", "transaction_id_0");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用 send 方法,发送消息
// 发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
}
// int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}
数据有序性
如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列
如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的
我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上