1.数据可靠
• 0:生产者发送过来的数据,不需要等数据落盘应答。
风险:leader挂了之后,follower还没有收到消息。。。。
• 1:生产者发送过来的数据,Leader收到数据后应答。
风险:leader应答完成之后,还没有开始同步副本。。。。
生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。
可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
2.数据去重
3.生产者事务
说明:开启事务,必须开启幂等性。
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransaction {
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop100:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
// 4. 调用 send 方法,发送消息
try{
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","atguigu " + i));
int i1 = 1 / 0;
}
kafkaProducer.commitTransaction();
}catch (Exception e){
kafkaProducer.abortTransaction();
}finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}
4.数据乱序