分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

news2024/11/23 12:14:10

文章目录

    • 1. 自动提交消费位移
    • 2. 自动提交消费位移存在的问题?
    • 3. 手动提交消费位移
      • 1. 同步提交消费位移
      • 2. 异步提交消费位移
      • 3. 同步和异步组合提交消费位移
      • 4. 提交特定的消费位移
      • 5. 按分区提交消费位移
    • 4. 消费者查找不到消费位移时怎么办?
    • 5. 如何从特定分区位移处读取消息?
    • 6. 如何优雅地退出轮询循环消费?

1. 自动提交消费位移

最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能,默认为 true;
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒;

如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返回的最大偏移量,即将拉取到的每个分区中最大的消息位移进行提交。提交时间间隔通过 auto.commit.interval.ms 来设定,默认是5秒。与消费者中的其他处理过程一样,自动提交也是在轮询循环中进行的。消费者会在每次轮询时检查是否该提交偏移量了,如果是,就会提交最后一次轮询返回的偏移量。

① 启动消费者消费程序,并设置为自动提交消费者位移的方式:

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-ni");

        // 显式配置消费者自动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        // 显式配置消费者自动提交位移的事件间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        consumer.subscribe(Arrays.asList("ni"));

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

② 启动生产者程序发送3条消息,消息的内容都为 hello,kafka

③ 查看消费者消费的消息记录:

主题 = ni, 分区 = 0, 位移 = 0, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 1, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 2, 消息键 = null, 消息值 = hello,kafka

可以看到,消费者消费分区的最新消息的位移为 offset= 2,即消费者的消息位移为 offset =2;

④ 查看消费者提交的位移:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

[group-ni,ni,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1692168114999, expireTimestamp=None)

可以看到,消费者的消息位移为 offset =2,但是消费者的提交位移为 offset =3;

2. 自动提交消费位移存在的问题?

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用,再均衡完成之后,接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息)。可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。

在使用自动提交时,到了该提交偏移量的时候,轮询方法将提交上一次轮询返回的偏移量,但它并不知道具体哪些消息已经被处理过了。所以,在再次调用poll()之前,要确保上一次poll()返回的所有消息都已经处理完毕(调用close()方法也会自动提交偏移量)。通常情况下这不会有什么问题,但在处理异常或提前退出轮询循环时需要特别小心。

虽然自动提交很方便,但是没有为避免开发者重复处理消息留有余地。

3. 手动提交消费位移

在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。

开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false,让应用程序自己决定何时提交偏移量。手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

① 同步提交位移是指消费者在提交位移时会阻塞,直到提交完成并收到确认。它会提交 poll() 返回的最新偏移量,提交成功后马上返回,如果由于某些原因提交失败就抛出异常。 commitAsync() 方法有四个不同的重载方法,具体定义如下:

public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) 

② 异步提交位移在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

public void commitAsync() 
public void commitAsync(OffsetCommitCallback callback) 
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 

1. 同步提交消费位移

在消费消息的循环中,处理完当前批次的消息后,在轮询更多的消息之前,调用 commitSync() 方法提交当前批次最新的偏移量,这会阻塞当前线程,直到位移提交完成并收到确认。 只要没有发生不可恢复的错误,commitSync() 方法就会一直尝试直至提交成功。如果提交失败,就把异常记录到错误日志里。

public void commitSync()
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                // 业务处理拉取的消息
            }
            try{
                // 消费者手动提交消费位移:同步提交方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

还可以将消费者程序修改为批量处理+批量提交的方式:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            int minSize = 200;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            for (ConsumerRecord<String, String> record : consumerRecords) {
                buffer.add(record);
            }
            try{
                // 消费者手动提交消费位移:同步提交方式
                if(buffer.size()>minSize){
                    // 批量处理消息
                    // ...
                }
                // 手动提交位移:同步方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

上面的示例中将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是大于等于200个的时候,再做相应的批量处理,之后再做批量提交。

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我们可以将其捕获并做针对性的处理。

需要注意的是,同步提交位移时需要确保在处理完消息后再进行提交,因为 commitSync() 将会提交 poll() 返回的最新偏移量,如果你在处理完所有记录之前就调用了 commitSync(),那么一旦应用程序发生崩溃,就会有丢失消息的风险(消息已被提交但未被处理)。如果应用程序在处理记录时发生崩溃,但 commitSync() 还没有被调用,那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理——这或许比丢失消息更好,或许更坏。

2. 异步提交消费位移

同步提交有一个缺点,在broker对请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,则会增加潜在的消息重复。这个时候可以使用异步提交API。只管发送请求,无须等待broker做出响应。

public void commitAsync() 
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 业务逻辑处理
            }
            // 异步提交消费位移
            consumer.commitAsync();
        }
    }
}

在提交成功或碰到无法恢复的错误之前,commitSync() 会一直重试,但commitAsync()不会,这是commitAsync() 的一个缺点。之所以不进行重试,是因为 commitAsync() 在收到服务器端的响应时,可能已经有一个更大的位移提交成功。假设我们发出一个提交位移2000的请求,这个时候出现了短暂的通信问题,服务器收不到请求,自然也不会做出响应。与此同时,我们处理了另外一批消息,并成功提交了位移3000。如果此时 commitAsync() 重新尝试提交位移2000,则有可能在位移3000之后提交成功。这个时候如果发生再均衡,就会导致消息重复。

之所以提到这个问题并强调提交顺序的重要性,是因为 commitAsync() 也支持回调,回调会在broker返回响应时执行。回调经常被用于记录位移提交错误或生成指标,如果要用它来重试提交位移,那么一定要注意提交顺序。

public void commitAsync(OffsetCommitCallback callback)
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 业务逻辑处理
            }
            // 异步提交消费位移
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, Exception exception) {
                    if(exception!=null){
                        log.info("fail to commit offsets:{}",offsetAndMetadataMap,exception);
                    }
                }
            });
        }
    }
}

异步提交中如何实现重试:我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。

3. 同步和异步组合提交消费位移

一般情况下,偶尔提交失败但不进行重试不会有太大问题,因为如果提交失败是由于临时问题导致的,后续的提交总会成功。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;但如果这是发生在消费者被关闭或再均衡前的最后一次提交,则要确保提交是成功的,可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll( Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
					// 业务逻辑处理
                }
                // 异步提交位移
                consumer.commitAsync();
            }
        } catch (Exception e) {
            log.error("Unexpected error", e);
        } finally {
            try {
                // 同步提交位移
                consumer.commitSync();
            }finally{
                consumer.close();
            }
        }
    }
}

4. 提交特定的消费位移

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。但如果想要更频繁地提交位移该怎么办?如果 poll() 返回了一大批数据,那么为了避免可能因再均衡引起的消息重复,想要在批次处理过程中提交位移该怎么办?这个时候不能只是调用 commitSync() 或commitAsync(),因为它们只会提交消息批次里的最后一个位移。

幸运的是,消费者API允许在调用 commitSync() 和 commitAsync() 时传给它们想要提交的分区和位移:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

在这里插入图片描述

如图:消费者的提交位移=当前一次poll拉取的分区消息的最大位移offset + 1,这个提交位移就是下次

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        ConcurrentHashMap<TopicPartition,OffsetAndMetadata> offsets = new ConcurrentHashMap<>();
        int count = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 消息所属的主题和分区
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                // 消费者提交的消费位移=当前消费消息的位移+1
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                offsets.put(topicPartition, offsetAndMetadata);
                if(count % 1000 == 0){
                    consumer.commitAsync(offsets,null);
                }
                count++;
            }
        }
    }
}

5. 按分区提交消费位移

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 获取拉取的消息包含的所有分区列表
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition partition : partitions) {
                // 获取当前分区要消费的消息
                List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                // 获取当前分区消息的最大位移
                long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 当前分区的消费位移提交 = 当前分区消息的最大位移 + 1
                Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1));
                consumer.commitSync(topicPartitionOffsetAndMetadataMap);
            }
        }
    }
}

4. 消费者查找不到消费位移时怎么办?

当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。当 Kafka 中没有初始位移或服务器上不再存在当前位移时,该怎么办?

此时会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,auto.offset.reset 参数的取值如下:

  • latest(默认值):表示从分区末尾开始消费消息。
  • earliest: 表示消费者会从起始处,也就是0开始消费。
  • none:查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。如果能够找到消费位移,那么配置为“none”不会出现任何异常。

如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常。

auto.offset.reset 参数用于指定消费者在启动时,如果找不到消费位移应该从哪里开始消费消息。 如果能够找到消费位移,那么消费者会从该位移处开始消费消息,那么 auto.offset.reset 参数并不会奏效,只有在找不到消费位移时才会生效。如果发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,那么 auto.offset.reset 参数也会生效。

5. 如何从特定分区位移处读取消息?

如果消费者能够找到消费位移,使用 poll() 可以从各个分区的最新位移处读取消息, 而且提供的 auto.offset.reset 参数也可以在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。但是有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们得以追前消费或回溯消费。

public void seek(TopicPartition partition, long offset)
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

① seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        // 执行一次poll() 方法完成分区分配的逻辑
        //  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> topicPartitions = consumer.assignment();
        for (TopicPartition topicPartition : topicPartitions) {
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // ...
        }
    }
}

② 如果 poll() 方法中的参数为0,此方法立刻返回,那么 poll() 方法内部进行分区分配的逻辑就会来不及实施,也就是说,消费者此时并未分配到任何分区,那么 topicPartitions 便是一个空列表。那么这里的 timeout 参数设置为多少合适呢?太短会使分配分区的动作失败,太长又有可能造成一些不必要的等待。我们可以通过 KafkaConsumer的 assignment()方法来判定是否分配到了相应的分区:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Set<TopicPartition> topicPartitions = consumer.assignment();
        // 此时说明还未完成分区分配
        while (topicPartitions.size()==0){
            consumer.poll(Duration.ofMillis(100));
            topicPartitions = consumer.assignment();
        }
        for (TopicPartition topicPartition : topicPartitions) {
            // 重置每个分区的消费位置为10
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消费消息
        }
    }
}

③ 如果对未分配到的分区执行seek() 方法,那么会报出 IllegalStateException 的异常。类似在调用subscribe() 方法之后直接调用seek() 方法:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));


        // 未完成分区分配,直接调用seek方法,重置分区1的消费位置为10
        consumer.seek(new TopicPartition("topic-01",1),10);

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消费消息
        }
    }
}

报错:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition topic-01-1

④ 如果消费组内的消费者在启动的时候能够找到消费位移,那么消费者就会从该位移处开始消费消息。除非发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,否则 auto.offset.reset 参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek() 方法的帮助了,指定从分区末尾开始消费:

endOffsets() 方法用来获取指定分区的末尾的消息位置, endOffsets 的具体方法定义如下:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

其中 partitions 参数表示分区集合,而 timeout 参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么 endOffsets() 方法的等待时间由客户端参数 request.timeout.ms 来设置,默认值为 30000。与 endOffsets 对应的是 beginningOffset() 方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加,beginningOffsets() 方法的具体定义如下:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

beginningOffsets() 方法中的参数内容和含义都与 endOffsets() 方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法来实现这两个功能,这两个方法的具体定义如下:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

⑤ 有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek() 方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes() 方法,通过timestamp来查询与此对应的分区位置:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

offsetsForTimes() 方法的参数 timestampsToSearch 是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之间的使用方法,首先通过 offsetsForTimes() 方法获取一天之前的消息位置,然后使用 seek() 方法追溯到相应位置开始消费:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Map<TopicPartition,Long> timestampToSearch = new HashMap<>();
        Set<TopicPartition> topicPartitionSet = consumer.assignment();
        // 查询的分区以及查询的时间戳
        for (TopicPartition topicPartition : topicPartitionSet) {
            timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
        }

        // 获取时间戳大于等于待查询时间的第一条消息对应的位置和时间戳
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);
        for (TopicPartition topicPartition : topicPartitionSet) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            // seek 方法重置消费的位移
            if(offsetAndTimestamp != null){
                consumer.seek(topicPartition,offsetAndTimestamp.offset());
            }
        }
    }
}

⑥ 位移越界也会触发 auto.offset.reset 参数的执行,位移越界是指知道消费位置却无法在实际的分区中查找到,比如原本拉取位置为101(fetch offset 101),但已经越界了(out of range),所以此时会根据 auto.offset.reset 参数的默认值来将拉取位置重置(resetting offset)为100,我们也能知道此时分区中最大的消息 offset 为99。

我们讲述了如何进行消费位移的提交,正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。

试想一下,当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当 __consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。按照默认的配置,消费者会从9开始进行消费(9是下一条要写入消息的位置),更加确切地说是从9开始拉取消息。如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费。

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,这个poll()方法中的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:

public void seek(TopicPartition partition, long offset)  
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)  

seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。seek()方法的使用示例如代码清单3-5所示(只列出关键代码)。

6. 如何优雅地退出轮询循环消费?

如何优雅地退出轮询循环,如果你确定马上要关闭消费者(即使消费者还在等待一个poll()返回),那么可以在另一个线程中调用consumer.wakeup()。如果轮询循环运行在主线程中,那么可以在ShutdownHook里调用这个方法。需要注意的是,consumer.wakeup() 是消费者唯一一个可以在其他线程中安全调用的方法。调用 consumer.wakeup() 会导致poll()抛出WakeupException,如果调用 consumer.wakeup() 时线程没有在轮询,那么异常将在下一次调用 poll() 时抛出。不一定要处理WakeupException,但在退出线程之前必须调用consumer.close() 。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/893844.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaSE-17 【异常】

第一章 什么是异常 1.1 异常的概念 异常&#xff1a;指的是程序在执行过程中&#xff0c;出现的非正常的情况&#xff0c;最终会导致JVM的非正常停止 在Java中&#xff0c;异常本身就是一个类&#xff0c;产生异常就是创建一个异常对象并且抛出一个异常对象的过程 Java处理…

如何快速高效地进行 API 自动化测试

我们的研发团队最需要应对的就是各种新需求。软件越来越快的更新速度也让整个系统也变得越来越复杂&#xff0c;这让 测试 工作面临着巨大的挑战。测试人员必须与开发人员沟通&#xff0c;确定测试范围&#xff0c;并及时获取最新的接口用例数据来验证功能。但是&#xff0c;由…

【Apollo】Apollo版本特点与改进

特点与改进 概述里程碑6.0版本特点及改进7.0版本特点及改进8.0版本特点及改进代码差异 主页传送门&#xff1a;&#x1f4c0; 传送 概述 Apollo (阿波罗)是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快…

【Web开发指南】MyEclipse XML编辑器的高级功能简介

MyEclipse v2023.1.2离线版下载 1. 在MyEclipse中编辑XML 本文档介绍MyEclipse XML编辑器中的一些可用的函数&#xff0c;MyEclipse XML编辑器包括高级XML编辑&#xff0c;例如&#xff1a; 语法高亮显示标签和属性内容辅助实时验证(当您输入时)文档内容的源&#xff08;Sou…

协同过滤推荐算法-基于Django+mysql的智能水果销售系统设计(可做计算机毕设)

随着科技的不断发展&#xff0c;智能化已经成为各行各业的趋势&#xff0c;水果销售行业也不例外。智能水果销售系统就是应运而生的一种智能化解决方案&#xff0c;它可以为用户提供更加便捷、高效的购物体验。其中&#xff0c;系统模块是智能水果销售系统的重要组成部分。 系…

postgresql 谨慎使用正则删除(%,_)

建表 CREATE TABLE public.ellistest (id bigserial NOT NULL,"name" varchar null,primary key (id) );插入数据 删除含有_线的数据 你会发现表被清空了 delete from ellistest where name like %_%原因 百分号(%)用于表示0、1或多个字符或数字。 下划线通配符…

Java【动态规划】图文详解 “路径问题模型“ , 教你手撕动态规划

文章目录 一、不同路径I1, 题目2, 思路分析2.1, 状态表示2.2, 状态转移方程2.3, 初始化2.4, 填表顺序2.5, 返回值 3, 代码 二、不同路径II1, 题目2, 思路分析2.1, 状态表示2.2, 状态转移方程2.3, 初始化2.4, 填表顺序2.5, 返回值 3, 代码 三、礼物最大价值1, 题目2, 思路分析2.…

从零基础到精通IT:探索高效学习路径与成功案例

文章目录 导语&#xff1a;第一步&#xff1a;明确学习目标与方向选择适合的IT方向设定具体的学习目标咨询和调研 第二步&#xff1a;系统学习基础知识选择适合的编程语言学习数据结构和算法掌握操作系统和计算机网络基础 第三步&#xff1a;实践项目锻炼技能选择合适的项目编写…

C语言:初阶测试错题(查漏补缺)

题一&#xff1a;字符串倒置 示例1 输入 I like beijing. 输出 beijing. like I 思路一&#xff1a; 定义字符串数组arr[ ] ,利用gets()将要倒置的字符串输入&#xff0c;记录字符串长度len&#xff0c;此时写一个逆置函数Inversion()&#xff0c;第一步将整个字符串逆置&…

基于决策树(Decision Tree)的乳腺癌诊蚓

决策树(DecisionTree)学习是以实例为基础的归纳学习算法。算法从--组无序、无规则的事例中推理出决策树表示形式的分类规则,决策树也能表示为多个If-Then规则。一般在决策树中采用“自顶向下、分而治之”的递归方式,将搜索空间分为若千个互不相交的子集,在决策树的内部节点(非叶…

C语言刷题训练DAY.7

1.及格分数 解题思路&#xff1a; 这里直接用while语句控制循环&#xff0c;if else语句判断即可。 解题代码&#xff1a; #include<stdio.h> int main() {int a 0;while(scanf("%d", &a) ! EOF){if (a >60)printf("Pass\n");elseprintf…

HCIP学习--交换技术

前置学习 HICA学习&#xff08;第一天&#xff09;--网络基础_板栗妖怪的博客-CSDN博客 HCIA学习--VLAN一些常识及在ensp上实现VLAN配置_ensp vlan_板栗妖怪的博客-CSDN博客 一个小知识 在一个公司内部使用的路由技术很少&#xff0c;用的是交换技术&#xff0c;使用几个三…

【gitkraken】gitkraken自动更新问题

GitKraken 会自动升级&#xff01;一旦自动升级&#xff0c;你的 GitKraken 自然就不再是最后一个免费版 6.5.1 了。 在安装 GitKraken 之后&#xff0c;在你的安装目录&#xff08;C:\Users\<用户名>\AppData\Local\gitkraken&#xff09;下会有一个名为 Update.exe 的…

【从零学习python 】47. 面向对象编程中的继承概念及基本使用

文章目录 继承的基本使用代码逐行讲解说明:进阶案例 继承的基本使用 在现实生活中&#xff0c;继承一般指的是子女继承父辈的财产&#xff0c;父辈有的财产&#xff0c;子女能够直接使用。 程序里的继承 继承是面向对象软件设计中的一个概念&#xff0c;与多态、封装共为面向对…

在远程服务器上安装环境

第一步&#xff1a;下载anaconda 进入官网https://www.anaconda.com/download#downloads,点击linux的小企鹅 选择下载linux64位版本。 第二步&#xff1a;安装 打开xftp&#xff0c;将文件上传到服务器中。 然后在你自己的文件夹中输入bash Anaconda3-2023.07-2-Linux-x86…

视频云存储/视频汇聚/视频监控EasyCVR平台CDN转推的操作流程

视频汇聚/视频云存储/集中存储/视频监控管理平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;实现视频资源的鉴权管理、按需调阅、全网分发、云存储、智能分析等&#xff0c;视频智能分析平台EasyCVR融合性强、开放度…

ARM体系结构学习笔记:寄存器

前段时间通篇概览一遍汇编语言程序设计——基于ARM体系结构(第4版), 总感觉纸上得来终觉浅, 并不能够让我产生一种读汇编就跟读C代码一样那种流畅的感觉. 如果我们越熟悉, 越发觉得他们是有规律可循的, 这里做一下对应的记录, 互相共勉. 通用寄存器并不通用 表面上arm为我们提…

git版本管理加合并笔记

1.创建空文件夹&#xff0c;右键Bash here打开 2.打开链接&#xff0c;点击克隆下载&#xff0c;复制SSH链接 3.输入git SSH链接 回车 遇到问题&#xff1a; 但明明我已经有权限了&#xff0c; 还是蹦出个这 4.换成https在桌面上进行克隆仓库就正常了 5.去vscode里改东西 …

暑期关爱儿童安全“守护儿童远离烧烫伤 我是小小宣导员”活动走进德安社区

夏季是烧烫伤的高发季节&#xff0c;随着气温的升高&#xff0c;衣物的减少&#xff0c;皮肤外漏多&#xff0c;儿童自我保护能力弱&#xff0c;更容易受到烧烫伤害。为了守护儿童安全&#xff0c;8月11日下午&#xff0c;由中国社会福利基金会烧烫伤关爱公益基金主办&#xff…