Kafka 消费进度

news2025/1/21 18:06:03

Kafka 消费进度

  • Kafka 自带命令
  • Java Consumer API
  • JMX 监控指标

监控消费进度 : 看滞后程度:消费者 Lag , Consumer Lag

滞后程度 : 消费者落后于生产者的程度

  • 如 : Kafka 生产者向某主题成功生产 100 万条消息,消费者消费 80 万条消息
  • 那消费者就滞后 20 w条,即 Lag = 20 w

Kafka 监控 Lag 是在分区上的层级 :

  • 主题的 Lag = 手动汇总主题下所有分区的 Lag

Lag : 反映消费者的运行情况

  • 正常工作的消费者,它的 Lag 很小,表明能及时消费消息,滞后程度很小
  • 当消费者 Lag 值很大,表明它无法跟上生产者的速度,最终 Lag 会越来越大,导致拖慢下游消息的处理速度

当消费者的速度无法匹及生产者的速度 :

  • 可能出现消费数据不在页缓存中,就无法享受 Zero Copy
  • 消费者就要从磁盘上读取数据,会拉大了与生产者的差距,出现马太效应
  • 那些 Lag 大的消费者会越来越慢,Lag 会越来越大

生产环境中要时刻关注消费者的消费进度 :

  • 出现 Lag 逐步增加的趋势,要定位问题,及时处理,避免造成业务损失

消费进度的监控方法 :

  • 命令行工具 kafka-consumer-groups
  • Java Consumer API 编程
  • JMX 监控指标

Kafka 自带命令

Kafka 自带的命令行工具 : bin/kafka-consumer-groups.sh

  • kafka-consumer-groups : 监控消费者消费进度的工具

该脚本在 Kafka bin 目录下,查看某个给定消费者的 Lag 值:

bin/kafka-consumer-groups.sh \
--bootstrap-server <Kafka broker 连接信息> \
--describe --group <group 名称>

Kafka 连接信息 = < 主机名:端口 >

  • group 名 : 消费者设置的 group.id 值

例子:

  • 主题、分区
  • LOG-END-OFFSET : 每个分区当前最新生产的消息的位移值
  • CURRENT-OFFSET : 该消费者组当前最新消费消息的位移值
  • LAG 值(前两者的差值)、消费者实例 ID
  • 消费者连接 Broker 的主机名 , 消费者的 CLIENT-ID 信息

在这里插入图片描述

Java Consumer API

Java Consumer API :

  • 查询当前分区最新消息位移
  • 查询消费者组最新消费消息位移

用 Consumer API 监控消费者组的 Lag 值:

  • 只适用于 Kafka 2.0.0 及以上的版本
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
    try (AdminClient client = AdminClient.create(props)) {
        // 获取给定消费者组的最新消费消息的位移
        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
        try {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            
            try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                // 获取订阅分区的最新消息位移
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), 
                                                                               // 执行减法操作,获取 Lag 值并封装进一个 Map 对象
                                                                               entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 处理中断异常
            // ...
            return Collections.emptyMap();
        } catch (ExecutionException e) {
            // 处理 ExecutionException
            // ...
            return Collections.emptyMap();
        } catch (TimeoutException e) {
        	throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
        }
    }
}

JMX 监控指标

Kafka 消费者的 JMX 指标 : kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

  • records-lag-max : 窗口内曾经达到的最大的 Lag 值
  • records-lead-min : 最小的 Lead 值

Lead : 消费者最新消费消息的位移与分区当前第一条消息位移的差值

  • Lag 越大,Lead 越小
  • Lead 快接近于 0 时,消费者就可能丢消息

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

  • records-lag-avg : 平均的 Lag 值
  • records-lead-avg : 平均的 Lead 值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/397447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ccc-pytorch-卷积神经网络实战(6)

文章目录一、CIFAR10 与 lenet5二、CIFAR10 与 ResNet一、CIFAR10 与 lenet5 第一步&#xff1a;准备数据集 lenet5.py import torch from torch.utils.data import DataLoader from torchvision import datasets from torchvision import transformsdef main():batchsz 128C…

基于嵌入式libxml2的ARM64平台的移植(aarch64)

由于libxml在移植过程中依赖于zlib的库文件&#xff0c;因此本节内容包含zlib&#xff08;V1.2.13&#xff09;的移植libxml2(V2.10.3)的移植两部分组成。 &#xff08;一&#xff09;zlib的移植&#xff08;基于arm64&#xff09; 1、在github上下载zlib的最新源码压缩包&am…

【C++的OpenCV】第十课-OpenCV图像常用操作(七):直方图和直方图同等化(直方图均衡化)

&#x1f389;&#x1f389;&#x1f389;欢迎各位来到小白piao的学习空间&#xff01;\color{red}{欢迎各位来到小白piao的学习空间&#xff01;}欢迎各位来到小白piao的学习空间&#xff01;&#x1f389;&#x1f389;&#x1f389; &#x1f496;&#x1f496;&#x1f496…

看完书上的链表还不会实现?不进来看看?

1.1链表的概念定义&#xff1a;链表是一种物理存储上非连续&#xff0c;数据元素的逻辑顺序通过链表中的指针链接次序&#xff0c;实现的一种线性存储结构。特点&#xff1a;链表由一系列节点组成&#xff0c;节点在运行时动态生成 &#xff08;malloc&#xff09;&#xff0c;…

【react】类组件

React类创建组件&#xff0c;通过继承React内置的Component来实现的 class MyComponent extends React.Component{render() {console.log(this)// render是放在哪里的 —— 类(即&#xff1a;MyComponent)的原型对象上&#xff0c;供实例使用return <h2>我是用函数定义的…

python实现波士顿房价预测

波士顿房价预测 目标 这是一个经典的机器学习回归场景&#xff0c;我们利用Python和numpy来实现神经网络。该数据集统计了房价受到13个特征因素的影响&#xff0c;如图1所示。 对于预测问题&#xff0c;可以根据预测输出的类型是连续的实数值&#xff0c;还是离散值&#xff…

QGraphicsItem的简单自定义图形项

QGraphicsItem的继承重写序言重点函数QRectF boundingRect() constQPainterPath shape() constvoid paint(QPainter *painter, const QStyleOptionGraphicsItem *option, QWidget *widget 0)序言 学习途中记录一下&#xff0c;可谓是精华点 重点函数 QRectF boundingRect()…

农产品销售系统/商城,可运行

文章目录项目介绍一、项目功能介绍1、用户模块主要功能包括&#xff1a;2、商家模块主要功能包括&#xff1a;3、管理员模块主要功能包括&#xff1a;二、部分页面展示1、用户模块部分功能页面展示2、商家模块部分功能页面展示3、管理员模块部分功能页面展示三、部分源码四、底…

蓝牙 - 设备类型设置: Class of Device

在电脑或手机上&#xff0c;搜寻和连接蓝牙设备时&#xff0c;不同的蓝牙设备显示的图标是不同的&#xff0c;比如搜到或连接上的设备是一个蓝牙键盘&#xff0c;显示的就会是键盘图标&#xff0c;如果搜索到的设备是一个手柄&#xff0c;显示的就是一个手柄图标。 显示的图标是…

进程(操作系统408)

进程的概念和特征 概念&#xff1a; 进程的多个定义&#xff1a; 进程是程序的一次执行过程 进程是一个程序及其数据在处理机上顺序执行时所发生的活动 进程时具有独立功能的程序在一个数据集合上运行的过程&#xff0c;它是系统进行资源分配和调度的一个独立单位 上面所说…

JVM的基本知识

JVM JVM是java的虚拟机,是一个十分复杂的东西,所以掌握的要求比较高.本文主要是研究JVM的三大话题 JVM内存划分JVM类加载JVM的垃圾回收 JVM内存划分 java程序要执行的时候,JVM会先申请一块空间,这里就涉及到JVM的内存划分 堆 : 放的是new 出来的对象栈: 放的是方法之间的调…

rabbitmq集群-镜像模式

上文参考&#xff1a; rabbitmq集群-普通模式 1. 什么是镜像模式 它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上&#xff0c;而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据&#xff08;副本数据&#xff09;。每次写入…

3月8号作业

题目&#xff1a;题目一&#xff1a;vmlinux可执行文件如何产生题目二&#xff1a;整理内核编译流程&#xff1a;uImage&#xff0c;zImage,Image,vmlinux之间的关系答案一&#xff1a;在内核源码目录下vi Makefile&#xff0c;搜索vmlinux目标&#xff0c;vmlinux: scripts/li…

MongoDB学习(java版)

MongoDB概述 结构化数据库 ​ 结构化数据库是一种使用结构化查询语言&#xff08;SQL&#xff09;进行管理和操作的数据库&#xff0c;它们的数据存储方式是基于表格和列的。结构化数据库要求数据预先定义数据模式和结构&#xff0c;然后才能存储和查询数据。结构化数据库通常…

Android Camera SDK NDK NDK_vendor介绍

Android Camera JNI NDK NDK_vendor介绍前言主要有哪几种interface&#xff1f;Android SDKCamera API 1Camera API 2小结Android NDKNDK InterfaceNDK Vendor Interface小结Camera VTS Testcase总结Reference前言 本篇博客是想介绍Android camera从application layer到camera…

谷歌插件Fetch在不同页面之间Cookie携带情况详解

content script 和 script inject 表现情况 在碰到content script 注入和用script标签注入一样&#xff0c;即使服务端有写入Cookie到域名下在该tab标签应用下也不会被保存&#xff0c;所以在发送时也无法自动携带&#xff0c;所以通过content script和<script>这种方式…

微信小程序第二节 —— 自定义组件。

&#x1f449;微信小程序第一节 —— 自定义顶部、底部导航栏以及获取胶囊体位置信息。 一、前言 &#x1f4d6;&#x1f4d6;&#x1f4d6;书接上回 &#xff0c;dai ga hou啊&#xff01;我是 &#x1f618;&#x1f618;&#x1f618;是江迪呀。在进行微信小程序开发中&am…

多维数组的地址,通过指针引用多维数组详解

通过指针引用一维数组可以参考这篇文章&#xff1a; 通过指针引用数组的几种方法的原理和差异&#xff1b;以及利用指针引用数组元素的技巧_juechen333的博客-CSDN博客一个数组包含若干元素&#xff0c;每个数组元素都占用存储单元&#xff0c;所以他们都有相应的地址&#xf…

《Ansible模块篇:debug模块详解》

一、简介 平时我们在使用ansible执行playbook时&#xff0c;经常可能会遇到一些错误&#xff0c;有的时候不知道问题在哪里 &#xff0c;这个时候可以使用-vvv参数打印出来详细信息&#xff0c;不过很多时候-vvv参数里很多东西并不是我们想要的&#xff0c;这时候就可以使用官方…

python第四天作业~函数练习

目录 作业4、判断以下哪些不能作为标识符 A、a B、&#xffe5;a C、_12 D、$a12 E、false F、False 作业5&#xff1a; 输入数&#xff0c;判断这个数是否是质数&#xff08;要求使用函数 for循环&#xff09; 作业6&#xff1a;求50~150之间的质数是…