文章目录
- 1. 分区的分配以及再平衡
- 2. Range 分区分配以及再平衡
- 3. RoundRobin 分区分配以及再平衡
- 4. Sticky 分区分配以及再平衡
1. 分区的分配以及再平衡
一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range +CooperativeSticky。Kafka可以同时使用多个分区分配策略。
2. Range 分区分配以及再平衡
Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜。
需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用Range 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
Range 分区分配策略案例:
① 修改主题 hh为 7 个分区
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: hh Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,1,0
Topic: hh Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 3,0,2
Topic: hh Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic hh --partitions 7
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh PartitionCount:7 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: hh Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,0,1
Topic: hh Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3
Topic: hh Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 2,1,3
Topic: hh Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: hh Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: hh Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: hh Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[root@hadoop101 kafka_2.12-2.2.1]#
② 创建3个消费者,消费者组名相同,这样3个消费者属于同一个组
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("hh");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
"分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value()
}
}
}
}
③ 创建生产者用来向主题hh发送消息,随机发送到不同的分区
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// kafka生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for(int i=0;i<50;i++){
kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if(exception==null){
// 消息发送成功
System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
}else{
// 消息发送失败
exception.printStackTrace();
}
}
});
Thread.sleep(2);
}
// 关闭资源
kafkaProducer.close();
}
}
④ 测试:先启动3个消费者,然后启动生产者发送消息
Consumer1消费者消费 0、1、2 号分区的数据
Consumer消费者消费 3、4 号分区的数据
Consumer2消费者消费 5、6分区的数据
Range 分区分配再平衡案例:
① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer 消费者的任务会整体被分配到 Consumer1 消费者或者 Consumer2 消费者。
说明:Consumer 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
② 再次重新发送消息观看结果(45s 以后):
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
3. RoundRobin 分区分配以及再平衡
RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用RoundRobin 分区分配策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
RoundRobin 分区分配策略案例:
① 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin,同时修改消费者组名为 test-group
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("hh");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
}
}
}
}
② 启动生产者发送消息测试
RoundRobin 分区分配再平衡案例:
① 停止掉Consumer消费者,快速重新发送消息观看结果(45s 以内,越快越好):
Consumer消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 2号和5号分区数据,分别由Consumer1消费者和 Consumer2消费者消费。
② 再次重新发送消息观看结果(45s 以后):
说明:消费者 Consumer 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。
4. Sticky 分区分配以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
需求:设置主题为 hh,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
① 修改3 个消费者分区分配策略为粘性,重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("hh");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+"数据:"+consumerRecord.value());
}
}
}
}
② 启动生产者发送消息测试:可以看到会尽量保持分区的个数近似划分分区。
Sticky 分区分配再平衡案例:
① 停止掉 Consumer 消费者,快速重新发送消息观看结果(45s 以内,越快越好):Consumer消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 Consumer1 消费者或者 Consumer2 消费者消费。
② 再次重新发送消息观看结果(45s 以后):消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。
消费者 Consumer 已经被踢出消费者组,所以重新按照粘性方式分配。