🌻🌻 目录
- 一、Kafka 消费者
- 1.1 Kafka 消费方式
- 1.2 Kafka 消费者工作流程
- 1.2.1 消费者总体工作流程
- 1.2.2 消费者组原理
- 1.2.3 消费者重要参数
- 1.3 消费者 API
- 1.3.1 独立消费者案例(订阅主题)
- 1.3.2 独立消费者案例(订阅分区)
- 1.3.3 消费者组案例
- 1.4 生产经验——分区的分配以及再平衡
- 1.4.1 Range以及再平衡
- 1.4.2 RoundRobin以及再平衡
- 1.4.3 Sticky以及再平衡
- 1.5 offset 位移
- 1.5.1 offset 的默认维护位置
- 1.5.2 自动提交 offset
- 1.5.3 手动提交 offset
- 1.5.4 指定 Offset 消费
- 1.5.5 指定时间消费
- 1.5.6 漏消费和重复消费
- 1.6 生产经验——消费者事务
- 1.7 生产经验——数据积压(消费者如何提高吞吐量)
一、Kafka 消费者
1.1 Kafka 消费方式
1.2 Kafka 消费者工作流程
1.2.1 消费者总体工作流程
1.2.2 消费者组原理
1.2.3 消费者重要参数
1.3 消费者 API
1.3.1 独立消费者案例(订阅主题)
1)需求:
创建一个独立消费者,消费first主题中数据。
注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。
2)实现步骤
(1)创建包名:com.gansu.kafka.consumer
(2)编写代码:类名:CustomConsumer
package com.gansu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
// 配置序反列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题 first 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
3)测试
(1)在IDEA中执行消费者程序。
(2)在Kafka集群控制台,创建Kafka生产者,并输入数据。
(3)在IDEA控制台观察接收到的数据。
① 使用服务器生产者发送
② 使用 idea 生产者发送
1.3.2 独立消费者案例(订阅分区)
1)需求:创建一个独立消费者,消费first主题0号分区的数据。
2)实现步骤
(1)代码编写。类名:CustomConsumerPartition
package com.gansu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPartition {
public static void main(String[] args) {
//配置 反序列化 必须
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置消费组 必须 名字任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//创建消费
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//消费某个主题的 某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
//这里如果配置主题 分区为 1则对应的消费主题分区 是收不到消息的
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
3)测试
(1)在IDEA中执行消费者程序。
(2)在IDEA中执行前面创建的生产者程序CustomProducterCallbackPartitions
在控制台观察生成几个0号分区的数据,如果是1,再次查看消费者是否还能接收到数据。
(3)在IDEA控制台,观察接收到的数据,只能消费到0号分区数据表示正确。
1.3.3 消费者组案例
1)需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
2)案例实操
(1)复制一份基础消费者的代码,在IDEA中同时启动,即可启动同一个消费者组中的两个消费者。
(2)启动代码中的生产者发送消息,在IDEA控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码Thread.sleep(2);
)。
这里消费者没有出现,我直接把集群修改成了单体的
再次查看测试:
(3)重新发送到一个全新的主题中,由于默认创建的主题分区数为1,可以看到只能有一个消费者消费到数据。
1.4 生产经验——分区的分配以及再平衡
1.4.1 Range以及再平衡
1)Range分区策略原理
2)Range分区分配策略案例
- (1)修改主题first为7个分区。
注意:分区数可以增加,但是不能减少。
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --alter --topic second --partitions 7
- (2)复制CustomConsumer类,创建CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2组成消费者组,组名都为“test”,同时启动3个消费者。
(3)启动CustomProducer生产者,发送500条消息,随机发送到不同的分区。
说明:Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略。
(4)观看3个消费者分别消费哪些分区的数据。
3)Range分区分配再平衡案例
- (1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到3、4号分区数据。
2号消费者:消费到5、6号分区数据。
0号消费者的任务会整体被分配
到1号消费者或者2号消费者。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
- (2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、1、2、3号分区数据。
2号消费者:消费到4、5、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。
1.4.2 RoundRobin以及再平衡
1)RoundRobin分区策略原理
2)RoundRobin分区分配策略案例
(1)依次在CustomConsumer
、CustomConsumer1
、CustomConsumer2
三个消费者代码中修改分区分配策略为RoundRobin
。并修改分区策略组。
官网 ctrl+f RoundRobinAssignor
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
(2)重启3个消费者,重复发送消息的步骤,观看分区结果。(我只启动了1,2)
3)RoundRobin分区分配再平衡案例
- (1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者
:消费到2、5号分区数据
2号消费者
:消费到4、1号分区数据
0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
- (2)再次重新发送消息观看结果(45s以后)。
1号消费者
:消费到0、2、4、6号分区数据
2号消费者
:消费到1、3、5号分区数据
说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。
1.4.3 Sticky以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
- 粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
1)需求
- 设置主题为first,7个分区;准备3个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
2)步骤
- (1)修改分区分配策略为粘性。
注意:3个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
官网 ctrl+f StickyAssignor
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
- (2)使用同样的生产者发送500条消息。
可以看到会尽量保持分区的个数近似划分分区。
3)Sticky分区分配再平衡案例
- (1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者
:消费到2、5、3号分区数据。
2号消费者
:消费到4、6号分区数据。
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
- (2)再次重新发送消息观看结果(45s以后)。
1号消费者
:消费到2、3、5号分区数据。
2号消费者
:消费到0、1、4、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。
1.5 offset 位移
1.5.1 offset 的默认维护位置
__consumer_offsets
主题里面采用key和value
的方式存储数据。key
是group.id+topic+分区号
,value就是当前offset的值。每隔一段时间,kafka内部会对这个topic
进行compact
,也就是每个group.id+topic+分区号
就保留最新数据。
1)消费offset案例
- (0)思想:
__consumer_offsets
为Kafka
中的topic
,那就可以通过消费者进行消费。- (1)在配置文件
config/consumer.properties
中添加配置exclude.internal.topics=false
,
默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。
官网 ctrl+f exclude.internal.topics
cd /usr/local/kafka/config
vi consumer.properties
①
②
再无需重启 kafka
(2)采用命令行方式,创建一个新的topic
。
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic Daniel88 --partitions 1 --replication-factor 1
(3)启动生产者往Daniel88
生产数据。
(4)启动消费者消费Daniel88
数据。
注意:指定消费者组名称,更好观察数据存储位置(key是group.id+topic+分区号)。
(5)查看消费者消费主题__consumer_offsets
。
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server linux-102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
1.5.2 自动提交 offset
参数名称 | 描述 |
---|---|
enable.auto.commit | 默认值为true ,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s 。 |
1)消费者自动提交offset
复制之前的的类 CustomConsumer
重命名为一份为 CustomConsumerAutoOffset
//是否自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
测试:
1.5.3 手动提交 offset
1)同步提交 offset
由于同步提交
offset
有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交offset
的示例。
2)异步提交offset
虽然同步提交
offset
更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset
的方式。
以下为异步提交offset的示例:
复制前面的类CustomConsumer
重命名为 CustomConsumerByHandSync
package com.gansu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
// 配置序反列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
//关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题 first 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("second");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
//同步提交offset
//kafkaConsumer.commitSync();
//异步提交 offset
kafkaConsumer.commitAsync();
}
}
}
1.5.4 指定 Offset 消费
auto.offset.reset = earliest | latest | none
默认是latest
。
- 当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
- (1)
earliest
:自动将偏移量重置为最早的偏移量,–from-beginning。- (2)
latest
(默认值):自动将偏移量重置为最新偏移量。- (3)
none
:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
(4)任意指定offset位移开始消费
注意:每次执行完,需要修改消费者组名;
创建类:CustomConsumerSeek
package com.gansu.kafka.consumer;
import com.gansu.kafka.producter.CustomProducter;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSeek {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接 key value反序列化
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test66");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("second");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定offset从1700的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 0);
}
// 3 消费该主题数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
指定offSet 从400开始消费
1.5.5 指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
操作步骤:
复制类:CustomConsumerSeek
重命名为 CustomConsumerSeekTime
package com.gansu.kafka.consumer;
import com.gansu.kafka.producter.CustomProducter;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSeekTime{
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接 key value反序列化
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test6");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("second");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从1天前开始消费的每个分区的offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
1.5.6 漏消费和重复消费
重复消费
:已经消费了数据,但是offset
没提交。漏消费
:先提交offset
后消费,有可能会造成数据的漏消费。
思考:怎么能做到既不漏消费也不重复消费呢?详看消费者事务。
1.6 生产经验——消费者事务
1.7 生产经验——数据积压(消费者如何提高吞吐量)
参数名称 | 描述 |
---|---|
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者 获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条 |