1. ack级别
上文中我们提到过kafka是存在确认应答机制的,也就是数据在发送到kafka的时候,kafka会回复一个确认信息,这个确认信息是存在等级的。
ack=0 这个等级是最低的,这个级别中数据sender线程复制完毕数据默认kafka已经接收到数据。
ack=1 这个级别中,sender线程复制完毕数据leader分区拿到数据放入到自己的存储并且返回确认信息
ack= -1 这个级别比较重要,sender线程复制完毕数据,主分区接受完毕数据并且从分区都同步完毕数据然后在返回确认信息
那么以上的等级在使用的时候都会出现什么问题呢?
ack = 0 会丢失数据
ack=0时,在异步复制过程中,leader可能会丢失leader分区和follower分区的数据。
ack=1
ack=1的时候leader虽然接收到数据存储到本地,但是没有同步给follower节点,这个时候主节点宕机,从节点重新选举新的主节点,主节点是不含有这个数据的,数据会丢失.
ack = -1
这个模式不会丢失数据,但是如果leader接受完毕数据并且将数据同步给不同的follower,从节点已经接受完毕,但是还没有返回给sender线程ack的时候,这个时候leader节点宕机了,sender没有接收到这个ack,它人为没有发送成功还会重新发送数据过来,会造成数据重复。
一般前两种都适合在数据并不是特别重要的时候使用,而最后一种效率会比较低下,但是适用于可靠性比较高的场景使用
所以一般使用我们都会使用ack = -1 retries = N 联合在一起使用
那么我们如何能够保证数据的一致性呢?
2. 幂等性
在kafka的0.10以后的版本中增加了新的特性,幂等性,主要就是为了解决kafka的ack = -1的时候,数据的重复问题,设计的原理就是在kafka中增加一个事务编号。
数据在发送的时候在单个分区中的seq事物编号是递增的,如果重复的在一个分区中多次插入编号一致的两个信息,那么这个数据会被去重掉
在单个分区中序号递增,也就是我们开启幂等性也只能保证单个分区的数据是可以去重的
整体代码如下:
pro.put(ProducerConfig.RETRIES_CONFIG,3);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
设定retries = 3 ,enable.idempotence = true
幂等性开启的时候,ack默认设定为-1。
幂等性的工作原理很简单,每条消息都有一个「主键」,这个主键由 <PID, Partition, SeqNumber> 组成,他们分别是:
- PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
- Partition:消息需要发往的分区号
- SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识
对于主键相同的数据,Kafka 是不会重复持久化的,它只会接收一条,但由于是原理的限制,幂等性也只能保证单分区、单会话内的数据不重复,如果 Kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据,这就需要另一个特性来保证了 ——Kafka 事务。
3. kafka的事务
Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。
Kafka 事务分为生产者事务和消费者事务,但它们并不是强绑定的关系,消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。
3.1 如何开启事务
创建一个 Producer,指定一个事务 ID:
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置事务ID,必须
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
使用事务发送消息:
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
//发送10条消息往kafka,假如中间有异常,所有消息都会发送失败
try {
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("topic-test", "a message" + i));
}
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 终止事务
producer.abortTransaction();
} finally {
producer.close();
}
3.2 事务工作原理
1)启动生产者,分配协调器
在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。
事务协调器的分配涉及到一个特殊的主题 __transaction_state,该主题默认有 50 个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50
计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。
分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。
2)发送消息
生产者分配到 PID 后,要先告诉事务协调器要把消息发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。
当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应。
3)确认事务
当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state
中。
当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的。
如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果。
如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据。
事物不仅可以保证多个数据整体成功失败,还可以保证数据丢失后恢复。
3.3 代码实现
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerWithTransaction {
public static void main(String[] args) {
Properties pro = new Properties();
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaciton_test");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
producer.initTransactions();
producer.beginTransaction();
try{
for(int i=0;i<5;i++){
producer.send(record);
}
// int a = 1/0;
producer.commitTransaction();
}catch (Exception e){
producer.abortTransaction();
}finally {
producer.close();
}
}
}
4. 一致性语义
在大数据场景中存在三种时间语义,分别为
At Least Once 至少一次,数据至少一次,可能会重复
At Most Once 至多一次,数据至多一次,可能会丢失
Exactly Once 精准一次,有且只有一次,准确的消息传输
那么针对于以上我们学习了ack已经幂等性以及事务。
所以我们做以下分析:
如果设定ack = 0 或者是 1 出现的语义就是At Most Once 会丢失数据
如果设定ack = - 1 会出现At Least Once 数据的重复
在ack = -1的基础上开启幂等性会解决掉数据重复问题,但是不能保证一个批次的数据整体一致,所以还要开启事务才可以。
5. 参数调节
参数 | 调节 |
---|---|
buffer.memory | record accumulator的大小,适当增加可以保证producer的速度,默认32M |
batch-size | 异步线程拉取的批次大小,适当增加可以提高效率,但是会增加延迟性 |
linger.ms | 异步线程等待时长一般根据生产效率而定,不建议太大增加延迟效果 |
acks | 确认应答一般设定为-1,保证数据不丢失 |
enable.idempotence | 开启幂等性保证数据去重,实现exactly once语义 |
retries | 增加重试次数,保证数据的稳定性 |
compression.type | 增加producer端的压缩 |
max.in.flight.requests.per.connection | sender线程异步复制数据的阻塞次数,当没收到kafka的ack之前可以最多发送五个写入请求,调节这个参数可以保证数据的有序性 |
全部代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerWithMultiConfig {
public static void main(String[] args) throws InterruptedException {
Properties pro = new Properties();
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
pro.put(ProducerConfig.RETRIES_CONFIG, 3);
pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
producer.send(record);
producer.close();
}
}
其中max.in.flight.requests.per.connection参数设定后可以增加producer的阻塞大小
在未开启幂等性的时候,这个值设定为1,可以保证单个批次的数据有序,在分区内部有序
如果开启了幂等性可以设定最大值不超过5,可以保证五个request请求单个分区内有序
因为没有开启幂等性的时候如果第一个请求失败,第二个请求重新发送的时候需要二次排序
要是开启幂等性了会保留原来的顺序性,不需要重新排序
总而言之kafka可以保证单分区有序但是整体是无序的