数据传递语义
至少一次:ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
最多一次:ACK级别设置为0
总结:
At Least Once:可以保证数据不丢失,但是不能保证数据不重复
At Most Once:可以保证数据不重复,但是不能保证数据不丢失
精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失,Kafka 0.11版本以后,引入了重大特性:幂等性和事务
幂等性
幂等性就是值Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证不重复。
精确一次=幂等性+至少一次(ack=-1+分区副本数>=2+ISR最小副本数量>=2)
重复数据的判断标准:具有<PID,partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条,其中PID是Kafka每次重启都会分配一个新的;partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证再单分区会话内不回重复
开启幂等性参数
enable.idempotence默认为true,false关闭
生产者事务
开启事务,必须开启幂等性
producer在使用事务功能前,必须先自定义一个唯一的transactional.id,有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务
1、请求producer id 幂等性需要
2、返回producer id
3、发送消息到TopicA
4、发送commit请求
5、持久化commit请求
6、返回成功
7、后台发送commit请求
_transaction_state-分区-leader存储事务信息的特殊主题
默认又50个分区,每个分区负载一部分事务,事务划分根据transactional,id的hashcode值%50,计算出该分区事务属于哪个分区。该分区leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点
例子
package com.longer.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerTransactions {
public static void main(String[] args) {
//创建kafka配置对象
Properties properties = new Properties();
// 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
//序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");
//创建kafka生产对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//初始化事务
producer.initTransactions();
//开启事务
producer.beginTransaction();
//调用 send 方法,发送消息
try {
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("first", "hi," + i));
}
System.out.println(1/0);
//提交事务
producer.commitTransaction();
}catch (Exception e){
e.printStackTrace();
//终止事务
producer.abortTransaction();
}finally {
//关闭资源
producer.close();
}
}
}