目录
- 一、ACK应答原理
- 1.1、应答级别
- 1.1.1、acks = 0
- 1.1.2、acks = 1
- 1.1.3、acks = -1(all)
- 1.2、问题思考
- 二、数据可靠性
- 2.1、数据可靠性分析
- 2.2、 数据完全可靠条件
- 2.3、ACK应答级别可靠性总结
- 三、数据可靠性代码示例
一、ACK应答原理
1.1、应答级别
1.1.1、acks = 0
- 生产者发送过来的数据 ,不需要等数据落盘应答。
- 数据可靠性分析:丢数据。
1.1.2、acks = 1
- 生产者发送过来的数据,Leader 收到数据后应答 。
- 数据可靠性分析:丢数据。
1.1.3、acks = -1(all)
- -1和all等价,生产者发送过来的数据,Leader 和ISR 队列里面的所有节点收齐数据后应答
1.2、问题思考
Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
- Leader维护了一个动态的in-sync replica set(ISR),意味着和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
- 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。
- 这样就不用等长期联系不上或者已经故障的节点。
二、数据可靠性
2.1、数据可靠性分析
- 如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。
2.2、 数据完全可靠条件
- 数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小 副本 数量大于等于2
2.3、ACK应答级别可靠性总结
- acks=0,生产者发送过来数据就不管了 , 可靠性差 , 效率高;
- acks=1,生产者发送过来数据Leader 应答 , 可靠性中等 ;
- acks=-1,生产者发送过来数据Leader 和ISR 队列里面所有Follwer 应答,可靠性高,效率低;
- 在生产环境中,acks=0 很少使用;acks=1,一般用于传输普通日志,允许丢个别据;acks=-1,一般用于传输和钱相关的数据 ,对可靠性要求比较高的场景 。
三、数据可靠性代码示例
-
示例代码中添加ack(确认)和retries(重试)次数设置即可。
// acks properties.put(ProducerConfig.ACKS_CONFIG,"1"); // 重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,3);
-
示例代码
package com.xz.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerAcks { public static void main(String[] args) { //1、创建 kafka 生产者的配置对象 Properties properties = new Properties(); //2、给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); //3、指定对应的key和value的序列化类型 key.serializer value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // acks properties.put(ProducerConfig.ACKS_CONFIG,"1"); // 重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,3); //4、创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //5、调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("news","hello kafka"+i)); } //6、关闭资源 kafkaProducer.close(); } }
-
在kafka集群上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
-
在 IDEA 中执行代码,观察kafka集群控制台中是否接收到消息。