Kafka之Consumer源码

news2025/1/16 1:02:51

Consumer源码解读

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

选举Consumer Leader

跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1470721.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】周末作业

1.new(struct list_head*)malloc(sizeof(struct list_head*)); if(newNULL) { printf("失败\n"); return; } new->nextprev->next; prev->nextnew; return; 2.struct list_head* pprev->next; prev->nextp->next; p->next->prevpr…

[AutoSar]BSW_Com03 DBC详解 (二)

目录 关键词平台说明一、前言二、DBC Attributes2.1 添加方法2.2 GenMsgCycleTime2.3 GenMsgSendType2.4 GenSigStartValue 三、DBC Attributes 对照表 关键词 嵌入式、C语言、autosar、OS、BSW 平台说明 项目ValueOSautosar OSautosar厂商vector ,芯片厂商TI 英…

通俗易懂理解EfficicentNet系列网络模型

一、参考资料 原始论文:[1] github代码:efficientnet EfficientNet系列(1): EfficientNetV2网络详解 EfficientNet网络详解 二、EfficientNet相关介绍 在之前的文章中,单独增加图像分辨率或增加网络深度或单独增加网络的宽度&#xff0…

matlab滤波器设计

1、内容简介 略 51-可以交流、咨询、答疑 2、内容说明 略 3、仿真分析 略 matlab滤波器设计-butter、ellip、cheby1、cheby2_哔哩哔哩_bilibili 4、参考论文 略

自动驾驶---行业发展及就业环境杂谈

进入21世纪以来,自动驾驶行业有着飞速的发展,自动驾驶技术(L2---L3)也逐渐落地量产到寻常百姓家。虽然最早期量产FSD的特斯拉有着深厚的技术积累,但是进入2010年以后,国内的公司也逐渐发展起来自己的自动驾…

CSS 的盒模型

CSS 的盒模型 在HTML里,每一个元素就相当于是一个矩形的 “盒子” ,这个盒子由以下这几个部分构成:1.边框border,2.内容content,3.内边距padding,4.外边距margin 边框border 基础属性描述border-width粗…

基于容器和集群技术的数据自动化采集设计和实现

目标:部署mysql服务容器并使用docker构建包含python爬虫脚本的容器采集数据到mysql数据库。 环境:Centos7、已配置Kubernetes集群及docker。 环境配置请参考以下文章: CentOS7搭建Kubernetes集群 Kubernetes集群信息如下(虚拟机主机名和IP…

apache 模式、优化、功能 与 nginx优化、应用

一、I/O模型——Input/Output模型 1.同步/异步 A程序需要调用B程序的某一个功能,A发送一个请求需要B完成一个任务 同步:B不会主动去通知A是否完成需要A自己去问 异步:B会主动通知A是否完成 2.阻塞/非阻塞 A发送一个请求需要B完成一个任务 …

【大数据】Flink SQL 语法篇(四):Group 聚合

Flink SQL 语法篇(四):Group 聚合 1.基础概念2.窗口聚合和 Group 聚合3.SQL 语义4.Group 聚合支持 Grouping sets、Rollup、Cube 1.基础概念 Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 G…

Python及Pycharm专业版下载安装教程(Python 3.11版)附JetBrains学生认证教程

目录 一、Python下载及安装1、Python下载2、Python安装3、验证是否安装成功 二、PyCharm下载及安装1、PyCharm下载2、PyCharm安装3、激活PyCharm 三、JetBrains学生认证 本篇主要介绍Python和PyCharm专业版的下载及安装方式,以及通过两种方式进行JetBrains学生认证。…

第十篇【传奇开心果系列】Python的文本和语音相互转换库技术点案例示例:Microsoft Azure开发语音翻译应用程序经典案例

传奇开心果博文系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、雏形示例代码二、扩展思路介绍三、Azure多语种支持示例代码四、Azure实时对话模式示例代码五、Azure自定义翻译模型示例代码六、Azure语音合成示例代码七、Azure用户界面优化示…

openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore

文章目录 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore229.1 设计原理229.2 核心优势229.3 使用指导 openGauss学习笔记-229 openGauss性能调优-系统调优-配置Ustore Ustore存储引擎,又名In-place Update存储引擎(原地更新&#xff09…

java面试(网络)

TCP和UDP有什么区别?TCP三次握手不是两次? TCP:面向连接,可靠的,传输层通信协议。点对点,占用资源多,效率低。 UDP:无连接,不可靠,传输层通信协议。广播&…

Nvidia Jetson Orin NX配置环境

Nvidia Jetson Orin NX配置环境配置环境 一、安装jetson5.1.2二、安装jtop三、配置CUDA和cuDNN四、安装Pytorch 先导片:Jetson采用arm64架构 一、安装jetson5.1.2 安装好jetson自带cuda、cudnn和tensorRT 官方文档 更换源 sudo vi /etc/apt/sources.list.d/nvidia…

力扣技巧题:丢失的数字

先排后找可以让结果更简单 int cmp(const void* a, const void* b){return *(int*)a - *(int*)b; } int missingNumber(int* nums, int numsSize){qsort(nums, numsSize, 4, cmp);for(int i0; i<numsSize; i){if(nums[i] i){continue;}else{return i;}}return numsSize; }…

10 在线逻辑分析仪的使用

在线逻辑分析仪简介 传统的 FPGA 板级调试是将逻辑分析仪连接到 FPGA 的 IO 引脚上 &#xff0c;然后将内部信号引出至 IO 引脚&#xff0c;再进行板级调试&#xff0c;这种方法的缺点是我们需要一个逻辑分析仪&#xff0c;且还要在 PCB 中预留测试点。在线逻辑分析仪克服了以…

使用 C++23 协程实现第一个 co_yield 同步风格调用接口--Qt计算排列组合

在C23的协程特性里&#xff0c; co_yield 用于从协程执行过程中返回值。这个功能乍一听起来很奇怪&#xff0c;网上的例子大多是用一个计数器来演示多次中断协程函数&#xff0c;返回顺序的计数值。这看起来毫无意义。 其实这个功能主要想演示的就是协程 co_yield 具备打断一个…

【数据结构】图——最短路径

最短路径问题&#xff1a;从在带权有向图G中的某一顶点出发&#xff0c;找出一条通往另一顶点的最短路径&#xff0c;最短也就是沿路径各边的权值总和达到最小。 最短路径分为图中单源路径和多源路径。 本文会介绍Dijkstra和Bellman-Ford解决单源路径的问题 Floyd-Warshall解…

SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的多特征分类预测

SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的故障多特征分类预测 目录 SCI一区 | Matlab实现ST-CNN-MATT基于S变换时频图和卷积网络融合多头自注意力机制的故障多特征分类预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍…

【牛客】2024牛客寒假算法基础集训营6ABCDEGHIJ

文章目录 A 宇宙的终结题目大意主要思路代码 B 爱恨的纠葛题目大意主要思路代码 C 心绪的解剖题目大意主要思路代码 D 友谊的套路题目大意主要思路代码 E 未来的预言题目大意主要思路代码 G 人生的起落题目大意主要思路代码 I 时空的交织题目大意主要思路代码 J 绝妙的平衡题目…