1、回顾发送流程
2、ACK应答原理
0:生斥责发送过来的数据,不需要等数据落盘应答(数据可靠性分析:丢数)
1:生产者发送过来的数据,leader收到数据后应答(数据可靠性分析:丢数)
-1(all):生产者发送过来的数据,leader和ISR队列里面的所有节点收起后应答
思考:leader收到数据,所有follower都开始同步数据,但是因为某种故障,迟迟不能与leader进行同步,那这个问题怎么解决
3、可靠性总结
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,
对可靠性要求比较高的场景。
4、例子
package com.longer.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerAck {
public static void main(String[] args) {
//创建kafka配置对象
Properties properties = new Properties();
// 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
//序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置acks
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数retries,默认是int最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);
//创建kafka生产对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("first", "hi," + i));
}
//关闭资源
producer.close();
}
}