Kafka 消费进度
- Kafka 自带命令
- Java Consumer API
- JMX 监控指标
监控消费进度 : 看滞后程度:消费者 Lag , Consumer Lag
滞后程度 : 消费者落后于生产者的程度
- 如 : Kafka 生产者向某主题成功生产 100 万条消息,消费者消费 80 万条消息
- 那消费者就滞后 20 w条,即 Lag = 20 w
Kafka 监控 Lag 是在分区上的层级 :
- 主题的 Lag = 手动汇总主题下所有分区的 Lag
Lag : 反映消费者的运行情况
- 正常工作的消费者,它的 Lag 很小,表明能及时消费消息,滞后程度很小
- 当消费者 Lag 值很大,表明它无法跟上生产者的速度,最终 Lag 会越来越大,导致拖慢下游消息的处理速度
当消费者的速度无法匹及生产者的速度 :
- 可能出现消费数据不在页缓存中,就无法享受 Zero Copy
- 消费者就要从磁盘上读取数据,会拉大了与生产者的差距,出现马太效应
- 那些 Lag 大的消费者会越来越慢,Lag 会越来越大
生产环境中要时刻关注消费者的消费进度 :
- 出现 Lag 逐步增加的趋势,要定位问题,及时处理,避免造成业务损失
消费进度的监控方法 :
- 命令行工具
kafka-consumer-groups
- Java Consumer API 编程
- JMX 监控指标
Kafka 自带命令
Kafka 自带的命令行工具 : bin/kafka-consumer-groups.sh
kafka-consumer-groups
: 监控消费者消费进度的工具
该脚本在 Kafka bin 目录下,查看某个给定消费者的 Lag 值:
bin/kafka-consumer-groups.sh \
--bootstrap-server <Kafka broker 连接信息> \
--describe --group <group 名称>
Kafka 连接信息 = < 主机名:端口 >
对
- group 名 : 消费者设置的 group.id 值
例子:
- 主题、分区
- LOG-END-OFFSET : 每个分区当前最新生产的消息的位移值
- CURRENT-OFFSET : 该消费者组当前最新消费消息的位移值
- LAG 值(前两者的差值)、消费者实例 ID
- 消费者连接 Broker 的主机名 , 消费者的 CLIENT-ID 信息
Java Consumer API
Java Consumer API :
- 查询当前分区最新消息位移
- 查询消费者组最新消费消息位移
用 Consumer API 监控消费者组的 Lag 值:
- 只适用于 Kafka 2.0.0 及以上的版本
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
// 获取给定消费者组的最新消费消息的位移
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 获取订阅分区的最新消息位移
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
// 执行减法操作,获取 Lag 值并封装进一个 Map 对象
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理 ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
JMX 监控指标
Kafka 消费者的 JMX 指标 : kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
- records-lag-max : 窗口内曾经达到的最大的 Lag 值
- records-lead-min : 最小的 Lead 值
Lead : 消费者最新消费消息的位移与分区当前第一条消息位移的差值
- Lag 越大,Lead 越小
- Lead 快接近于 0 时,消费者就可能丢消息
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
- records-lag-avg : 平均的 Lag 值
- records-lead-avg : 平均的 Lead 值