六、consumer源码解读

news2024/10/6 14:39:27

Consumer源码解读

本课程的核心技术点如下:

1、consumer初始化
2、如何选举Consumer Leader
3、Consumer Leader是如何制定分区方案

4、Consumer如何拉取数据
5、Consumer的自动偏移量提交

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

如何选举Consumer Leader

回顾之前的内容

image.png

那么如何完成以上的逻辑的,我们跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

回顾之前的内容

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/744019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

40.RocketMQ之高频面试题大全

消息中间件如何选型 RabbitMQ erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。 RocketMQ java开发,面向互联网集群化功能丰富,对在线业…

Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长; 但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程; 最后想说一句君子不隐其短,不知则问,不能则…

移动互联网应用程序(app)个人信息安全测试能力验证-流程介绍

ILONGYU 产品简介 为规范检验检测市场,提升检验检测机构技术能力,根据《检验检测机构资质认定管理办法》《实验室能力验证实施办法》等有关规定,市场监管总局决定在社会重点关注的部分检验检测领域,组织开展2020年国家级检验检测…

【IMX6ULL驱动开发学习】17.内核定时器(按键消抖)

1. 内核定时器初始化 setup_timer(struct timer_list *timer, void (*function)(unsigned long), unsigned long data);timer : 定时器结构体 struct timer_list function: 定时器处理函数 data: 参数 2. 设置定时器的超时时间 timer.exp…

数据备份与恢复

目录 数据备份 1、备份单个数据库中的所有表 2、备份数据库中的某些表 3、备份所有数据库 4、只备份emp表结构 数据库恢复 方法1:使用mysql 命令恢复 方法2:进入数据库,使用source加载备份文件恢复 MySQL表的导入导出 数据备份 MySQLdump备份数据库语句的…

报名开启 | DolphinDB 粉丝节,与你相约上海

作为量化爱好者,你是否在寻找更多志同道合的朋友? 作为技术达人,想探索因子挖掘、深度学习、AI领域的前沿技术? 7月22日 机会来了! DolphinDB 首届线下粉丝节将于7月22日下午在上海举行! 来现场&#xf…

VoxelNet End-to-End Learning for Point Cloud Based 3D Object Detection 论文学习

代码:VoxelNet: https://github.com/skyhehe123/VoxelNet-pytorch 论文:VoxelNet End-to-End Learning for Point Cloud Based 3D Object Detection 1. 解决了什么问题? 对点云做 3D 检测是许多应用得以落地的关键,如自动驾驶和…

想要避免项目延期,项目经理要关注这三点

在项目交付的过程中,出现项目进度与计划有较大的偏差是常见的现象。这种偏差的原因可能是多种多样的。 为了避免项目延期,项目经理需要认真分析引起进度延期的原因,以及采取相应的措施进行规避。 1、导致进度延期之计划不清晰 在项目开始…

基于灰色神经网络的订单需求预测代码

目录 1 概述 2 代码 3 结果 1 概述 BP(Back Propagation)神经网络模型是一种信息前向传播,误差反向传播的神经网络模型0,能够通过训练样本反向传播调节网络的阈值和权值,使误差平方最小。 BP神经网络是目前应用最广泛的神经网络模型之一。 灰色人工神经网络模型建模过程: (1)利…

快速排序的三种方法 hoare,挖坑法,前后指针法

文章目录 快速排序的整体介绍hoare思路代码实现 挖坑法思路代码实现 前后指针法思路代码实现 快速排序的整体介绍 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法,其基本思想为:任取待排序元素序列中的某元素作为基准值,按照该排…

spring data jpa(概述、快速入门、内部原理剖析、查询使用方式)

一、概述 1.1 Spring Data JPA概述 Spring Data JPA 是 Spring 基于 ORM 框架、JPA 规范的基础上封装的一套JPA应用框架,可使开发者用极简的代码即可实现对数据库的访问和操作。 它提供了包括增删改查等在内的常用功能,且易于扩展!学习并使…

Hexo博客部署腾讯云服务器

✅作者简介:大家好,我是Cisyam,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Cisyam-Shark的博客 💞当前专栏: 前端相关 ✨特色专栏&…

ModaHub魔搭社区:AI原生云向量数据库MIlvus Cloud的倒置文件索引?

目录 VF 总结 VF 平面索引很不错,但它无法扩展。这就是向量搜索的数据结构发挥作用的地方。通过牺牲准确性来减少运行时间,以便显著提高查询速度和吞吐量。现在有很多索引策略,其中最常用的是倒置文件索引(IVF)。 抛开花哨的名字,IVF 实际上是相当简单的。IVF 通过将…

用C语言进行学生成绩排序(选择排序)

一.选择排序 选择排序的基本思想是:每一趟(如第i趟)在后面n-i1 (i1,2…,n-1) 个待排序元素中选取关键字最小的元素,作为有序子序列的第i个元素,直到第n-1趟做完,待排序元素只剩下1个,就不用再选了。选择排序中的堆排序算法是历年考…

Vue数据项加圆点

目录 Html 样式 方法 Html <el-table-column prop"status" label"数据状态" header-align"center" width"200"><template slot-scope"scope"><div style"display: flex; justify-content: center; a…

六大组件助力大屏一键升级!老板当场拍案叫绝!

上个礼拜参加高中同学聚会&#xff0c;大家在饭桌上聊自己的工作&#xff0c;各自吐槽后发现大家真的是各有各的不容易&#xff01;有个和我一样做数据分析工作的兄弟&#xff0c;喝了点小酒后&#xff0c;情绪上头直接在饭桌上大吐苦水&#xff0c;疯狂diss他领导。 他说本来…

java项目之二手车交易网站(ssm+mysql+jsp)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的二手车交易网站。技术交流和部署相关看文章末尾&#xff01; 开发环境&#xff1a; 后端&#xff1a; 开发语言&#xff1a;Java 框架&a…

【图形学入门】概述(Overview)

本文基于GAMES 101课程进行记录和总结。 概念 计算机图形学&#xff08;Computer Graphics&#xff0c;俗称CG&#xff09;&#xff0c;是一种使用数学算法将二维或三维图形转化为计算机显示器的栅格形式的科学&#xff08;或使用计算机合成和操作视觉/图像图形的信息&#xf…

【Linux操作系统】多线程初步概念

文章目录 多线程初步概念线程的优点线程的缺点线程异常线程用途Linux进程VS线程 多线程初步概念 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;。更准确的定义是&#xff1a;线程是“一个进程内部的控制序列”。一个进程至少都有一个执行线程。线程是一…

pandas 笔记 style 调整DataFrame格式

1 format 1.0 数据 # Visual Python: Data Analysis > File vp_df pd.read_csv(https://raw.githubusercontent.com/visualpython/visualpython/main/visualpython/data/sample_csv/iris.csv) vp_dfvp_df[:5] vp_df.at[0,sepal_length]np.nan vp_df.at[2,sepal_length]10…