Kafka中Consumer源码解读

news2025/1/11 12:57:24

Consumer源码解读

本课程的核心技术点如下:

1、consumer初始化
2、如何选举Consumer Leader
3、Consumer Leader是如何制定分区方案

4、Consumer如何拉取数据
5、Consumer的自动偏移量提交

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

如何选举Consumer Leader

回顾之前的内容

image.png

那么如何完成以上的逻辑的,我们跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

回顾之前的内容

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/993435.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ubuntu22.04 设置网卡开机自启

配置文件路径 在Ubuntu中,网络配置文件通常位于/etc/netplan/目录下,其文件名以.yaml为后缀。Netplan是Ubuntu 17.10及更高版本中默认的网络配置工具,用于配置网络接口、IP地址、网关、DNS服务器等。 我们可以看到配置文件为 01-network-ma…

【K210+ESP8266图传上位机开发】TCP server + JPEG图像解析上位机开发

本文章主要记录基于 【K210-ESP8266】 图传和显示的过程,上位机开发过程,系统架构和下位机开发请参考文章: 【K210-ESP8266】开发板上传图像数据到服务器并实时显示 💖 作者简介:大家好,我是喜欢记录零碎知…

easyrecovery 2023最新数据恢复软件免费下载激活教程

对于每天电脑不离身,键盘不离手的法律人来说,最惨痛的事故莫过于“没保存”了,意外断电、系统崩溃、介质故障、计算机病毒、文件误删除、系统升级、文件同步更新、程序运行意外中止、人为故意删改等各种原因都可能导致我们的文件数据损坏和丢…

eNSP-抓包实验

拓扑结构图: 实验需求: 1. 按照图中的设备名称,配置各设备名称 2. 按照图中的IP地址规划,配置IP地址 3. 使用Wireshark工具进行抓ping包,并分析报文 4. 理解TCP三次握手的建立机制 实验步骤: 1、配置P…

synchronized,volatile关键字

目录 一,synchronized的特性 1.1 互斥性 1.2 可重入性 二, 死锁 2.1 死锁产生的原因 三,volatile 关键字 3.1 能保证内存可见性 一,synchronized的特性 1.1 互斥性 当两个线程对同一个对象加锁时,后加锁的线程…

Error: [WinError 10013] 以一种访问权限不允许的方式做了一个访问套接字的尝试

Error: [WinError 10013] 以一种访问权限不允许的方式做了一个访问套接字的尝试 django运行时报错 报错原因 django运行时的端口被其他服务占用了,此时需要关闭占用端口的服务, django的默认端口为8000 解决办法1 netstat -ano | findstr 8000 // cmd操作获取占用…

天翼云HBlock:盘活存储资源,释放数据价值

马克安德森曾言:“软件正在吞噬世界。” 回顾数据存储过去二十年发展,硬件的更新迭代固然重要,但软件的价值亦不可低估。从2013年软件定义存储首次入围Gartner技术成熟度曲线,再到在虚拟化和云计算环境中大显身手,软件…

运维Shell脚本小试牛刀(八): case模式忽略命令行参数大小写演示

运维Shell脚本小试牛刀(一) 运维Shell脚本小试牛刀(二) 运维Shell脚本小试牛刀(三)::$(cd $(dirname $0); pwd)命令详解 运维Shell脚本小试牛刀(四): 多层嵌套if...elif...elif....else fi_蜗牛杨哥的博客-CSDN博客 Cenos7安装小火车程序动画 运维Shell脚本小试…

SpringCloud-GetWay 路由网关

接上文 SpringCloud-Hystrix 服务降级与熔断 微服务也是如此,不是所有微服务需要直接暴露给外部调用,就需要使用路由机制,添加一层防护,让所有的请求全部通过路由来转发到各个微服务,并转发给多个相同微服务实例&#…

Linux编辑器 VI VIM

vim 命令模式 插入模式 ex模式 \ 命令模式 /查找关键字后,按n键在找到的结果之前来换的切换、 EX模式

响应式布局(3种) + flex计算

响应式布局 1.媒体查询2.使用百分比、rem、vw、vh等相对单位来设置元素的宽度、高度、字体大小等1.rem与em2.vw、vh、vmax、vmin 3.Flexboxflexbox计算题 响应式布局是指同一个页面在不同屏幕尺寸下有不同的布局。 1.媒体查询 媒体查询是最基础的实现响应式的方式 使用media关键…

再见 MySQL 5.7 !

点击下方名片,设为星标! 回复“1024”获取2TB学习资源! 对从事互联网 IT 技术岗位的来说,数据库也是我们日常必备的技能之一,而 MySQL 数据库更是常见、常用的数据库之一。 根据 DB-Engines 的数据显示,MyS…

Matlab之DICOM(数字图像和通信医学)格式图像数据读取函数dicomread

一、DICOM是什么? DICOM是数字图像和通信医学格式的图像数据,在MATLAB中,可以使用dicomread函数读取DICOM格式的图像数据。 二、dicomread函数 使用方法如下: imageData dicomread(filename);其中,filename表示DI…

fail-safe 机制与 fail-fast 机制

Fail-fast 表示快速失败,在集合遍历过程中,一旦发现容器中的数据被修改了,会立刻抛出 ConcurrentModificationException 异常,从而导致遍历失败,像这种情况 定义一个 Map 集合,使用 Iterator 迭代器进行数据…

live-server安装

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

【算法专题突破】滑动窗口 - 长度最小的子数组(9)

目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:209. 长度最小的子数组 - 力扣(Leetcode) 要注意的是,题目给的是正整数, 而题目要求并不难理解,就是找最短的…

听说背包问题很难? 这篇总结篇来拯救你了

文章转自代码随想录 已经把背包问题都讲完了,那么现在要对背包问题进行总结一番。 背包问题是动态规划里的非常重要的一部分,所以我把背包问题单独总结一下,等动态规划专题更新完之后,我们还会在整体总结一波动态规划。 关于这…

【HCIE】02.IGP高级特性

OSPF转发地址 5类LSA报文格式 5类LSA有一个Forwarding address字段,防止次优路由产生 次有路径的产生 如图,A与C运行OSPF协议,A与B运行ISIS协议,现在将ISIS导入到了OSPF中,C去访问X默认会先经过ASBR即R1,…

【多线程】Timer任务定时器实现与盲等原子性问题的解决

目录 一、定时器 二、标准库中的Timer 三、代码实现 四、死锁 一、定时器 代码中的定时器通常是在一定的时间执行对应的代码逻辑 二、标准库中的Timer public static void main(String[] args){Timer timer new Timer();timer.schedule(new TimerTask() {Overridepublic…

【周末闲谈】如何利用AIGC为我们创造有利价值?

个人主页:【😊个人主页】 系列专栏:【❤️周末闲谈】 系列目录 ✨第一周 二进制VS三进制 ✨第二周 文心一言,模仿还是超越? ✨第二周 畅想AR 文章目录 系列目录前言AIGCAI写作AI绘画AI视频生成AI语音合成 前言 在此之…