基于Kafka2.1解读Consumer原理

news2025/1/12 12:07:15

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
      • coordinator
      • fetcher
      • client
      • consumer#poll的主要流程
    • 全局总览
    • 小结

概要

继上一篇讲Producer原理的文章过去已经一个多月了,今天来讲讲Consumer的原理。
其实源码早就读了部分了,但是最近工作比较忙,一直没空写文章。

整体架构流程

Consumer组件图

技术名词解释

  • coordinator:Consumer协调器,负责管理Consumer需要加入到哪个消费组、消费哪个partition、提交offset等操作
  • fetcher:主要作用是获取待消费的records,也是Consumer端最重要的组件
  • keyDeserializer:对record中的key进行反序列化
  • valueDeserializer:对record中的value进行反序列化
  • client:执行RPC请求时的网络client,当然会包括一些Kafka内部的操作

技术细节

coordinator

其实协调器对于Consumer的处理分为几个阶段:

  1. Consumer加入的时候:负责判断Consumer加入到哪个Consumer group、协调消费哪个partition
  2. Consumer消费过程中:负责记录Consumer消费的partition的元数据、partition的消费状态、消费offset;更新partition的offset

fetcher

fetcher的数据结构
从Fetcher的数据结构里其实就可以猜到它的作用:缓存已Fetch到的records、去fetch更多的records

  • completedFetch:每次fetch请求得到的数据,拆分到topicPartition维度。因为fetch请求是基于server的node维度,请求回来的数据按照tp维度拆分,得到不同的completedFetch
  • completedFetchs: 已经fetch到的所有completedFetch
  • nextInLineRecords:当前正在被消费消息的completedFetch对应的所有records,由于对于同一个tp,当时Producer发消息时,是按照batch维度发送的,所以此时completedFetch里也包含多个batch,每个batch包含多个record,也就是records
    如果缓存里没有消息呢?
    也就是completedFetchs和nextInLineRecords都是空

client

类型是ConsumerNetworkClient,里面包含了一个NetWorkClient。至于NetWorkClient是如何进行数据处理及RPC的,可以参考Producer原理解析那篇文章
client示意图

  • unsent:保存的是当前需要发送的fetchRequest
  • pendingCompletion:需要被处理的已完成的请求,其实也就是之前的fetchRequest的response
  • client:该client是NetWorkClient,Producer端是直接使用了该client
    所以ConsumerNetworkClient的主要作用:1. 处理之前fetch回来的数据;2. 调用NetWorkClient将当前的fetchRequest发送出去

consumer#poll的主要流程

  1. 判断是否需要commit offset(默认情况下,5秒进行一次异步offset的commit)

  2. 读取Fetcher的缓存,如果有数据,直接跳转到5

  3. 缓存里没有数据,基于coordinator里保存的partition元数据,封装fetchRequest
    创建fetchRequest示意图

  4. 执行client#poll:1. 处理之前fetch回来的数据,解析为completedFetchs;2. 调用NetWorkClient将当前的fetchRequest发送出去;
    client#poll逻辑示意图

  5. 调用自定义的消费逻辑(程序员自己写的Consumer),处理records

全局总览

kafkaConsumer示意图

小结

可以看到Consumer和Producer在逻辑处理上还是有较大不同的。

组件处理请求处理方式
producer主要处理发送消息。对应RPC,主要是写请求将业务逻辑和IO逻辑解耦。业务逻辑:组装batch;IO逻辑:基于batch组装request并发送request
consumer既要发送fetchRequest,同时还要处理fetchResponse。对于RPC,读写请求都占比较大业务逻辑和IO逻辑解耦,但是串行化。业务逻辑:从fetcher里poll已经fetch到的数据;IO逻辑:基于partition元数据组装fetchRequest,处理fetchResponse,发送fetchRequest

Producer的IO是一个Sender线程在异步运行,为什么Consumer不这么干呢?
笔者觉得原因是:
Producer的逻辑是把消息往外发,所以Sender运行的越快,client这边为了维护batch而消耗的资源(内存和CPU越少);而如果Consumer也这么干,实际消费速度赶不上fetch速度的话,会需要额外的内存和CPU资源来维持更多的completedFetchs,更别说如果发生了rebalance的话,fetch过来的completedFetchs可能都是白fetch了。所以,总结下:1. 兼顾消费速度;2. 兼顾client的资源消耗&性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试使用vite搭建的uni-app打包app区分开发环境和生产环境

用脚手架搭建的uniapp项目,打包H5和小程序可以和web端一样,能够通过env.dev和env.prod区分开发环境和生产环境,但是不知道打包成app时如何区分开发环境和生产环境,在此做一个测试记录。 打开package.json文件,在scrip…

【提效工具开发】管理Python脚本执行系统实现页面展示

Python脚本执行:工具管理Python脚本执行系统 背景 在现代的软件开发和测试过程中,自动化工具和脚本的管理变得至关重要。为了更高效地管理工具、关联文件、提取执行参数并支持动态执行Python代码,我们设计并实现了一套基于Django框架的工具…

Qt-常用的显示类控件

QLabel QLabel有如下核心属性&#xff1a; 关于文本格式的验证&#xff1a; 其中<b>xxx<b>&#xff0c;就是加粗的意思。 效果&#xff1a; 或者再把它改为markdown形式的&#xff1a; 在markd中&#xff0c;#就是表示一级标题&#xff0c;我们在加上##后&#x…

2024 RISC-V中国峰会 安全相关议题汇总

安全之安全(security)博客目录导读 第四届 RISC-V 中国峰会(RISC-V Summit China 2024)于8月21日至23日在杭州成功举办。此次峰会汇聚了 RISC-V 国际基金会、百余家重点企业及研究机构,约3000人线下参与,并在19日至25日间举办了超过20场同期活动,与全球开发者共同…

聊一聊Elasticsearch的索引分片的恢复机制

1、什么是索引分片的恢复&#xff1f; 所谓索引分片的恢复指的是在某些条件下&#xff0c;索引分片丢失&#xff0c;ES会把某索引的分片复制一份来得到该分片副本的过程。 2、触发分片恢复的场景有哪些&#xff1f; 分片的分配 当集群中节点的数量发生变化&#xff0c;或者配…

典型的 SOME/IP 多绑定用例总结

SOME/IP 部署中 AP SWC 不自行打开套接字连接的原因 在典型的 SOME/IP 网络协议部署场景里&#xff0c;AP SWC 不太可能自己打开套接字连接与远程服务通信&#xff0c;因为 SOME/IP 被设计为尽可能少用端口。这一需求源于低功耗 / 低资源的嵌入式 ECU&#xff0c;并行管理大量…

MySQL查询执行(八):Memory引擎

思考&#xff1a;两个group by语句都用了order bynull&#xff0c; 为什么使用内存临时表得到的语句结果里&#xff0c; 0这个值在最后一行&#xff1b; 而使用磁盘临时表得到的结果里&#xff0c; 0这个值在第一行&#xff1f; 答&#xff1a;答案对应第一小节&#xff1a;内…

canva 画图 UI 设计

起因&#xff0c; 目的: 来源: 客户需求。 目的&#xff1a; 用数据讲故事。 数据可以瞎编&#xff0c;图表一定要漂亮。 文件分享地址 读者可以在此文件的基础上&#xff0c;继续编辑。 效果图 过程: 我还是喜欢 canva. figma&#xff0c; 我用的时候&#xff0c;每每都想…

ES分词环境实战

文章目录 安装下载1.1 下载镜像1.2 单节点启动 防火墙设置异常处理【1】iptable链路中断 参考文档 参加完2024年11月软考&#xff0c;对ES的分词进行考查&#xff0c;前期有【 Docker 环境下安装部署 Elasticsearch 和 kibana】和【 Docker 环境下为 Elasticsearch 安装IK 分…

论文精读: PRB LiVSe2 Zigzag链序实验与理论计算

DOI: 10.1103/PhysRevB.108.094107 摘要节选 在具有轨道自由度的过渡金属化合物中&#xff0c;组成元素在低温下自组装形成分子的现象普遍存在。 在本研究中从实验和理论两方面讨论了钒二维三角形晶格层状LiVX2 &#xff08;X O&#xff0c; S, Se&#xff09;体系中出现的三…

修改一下达梦disql 提示符

经常用disql的有时某些信息希望提示一下&#xff0c;默认的只显示SQL> 为了方便使用&#xff0c;可以在 glogin.sql 中增加些内容。 vi $DM_HOME/bin/disql_conf/glogin.sql增加以下几行 set time on set lineshow offcol global_name new_value global_name SELECT ins…

【蓝桥杯备赛】123(前缀和的复杂应用)

5. 前缀和的复杂应用 5.1. 123&#xff08;4 星&#xff09; 5.1.1. 题目解析 这道题仍然是求一段区间的和&#xff0c;很容易能够想到前缀和找规律&#xff1a; 1------------------1 号块 1 2----------------2 号块 1 2 3--------------3 号块 1 2 3 4------------4 号…

机器学习—学习曲线

学习曲线是帮助理解学习算法如何工作的一种方法&#xff0c;作为它所拥有的经验的函数。 绘制一个符合二阶模型的学习曲线&#xff0c;多项式或二次函数&#xff0c;画出交叉验证错误Jcv&#xff0c;以及Jtrain训练错误&#xff0c;所以在这个曲线中&#xff0c;横轴将是Mtrai…

数据库基础(MySQL)

1. 数据库基础 1.1 什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问题文件不利于数据查询和管理文件不利于存储海量数据文件在程序中控制不方便 数据库存储介质&#xff1a; 磁盘内存 为…

2024年11月HarmonyOS应用开发者基础认证全新题库

注意事项&#xff1a;切记在考试之外的设备上打开题库进行搜索&#xff0c;防止切屏三次考试自动结束&#xff0c;题目是乱序&#xff0c;每次考试&#xff0c;选项的顺序都不同 更新时间&#xff1a;2024年11月1日 这是基础认证题库&#xff0c;不是高级认证题库注意看清楚标…

静态时序分析--时序约束

目录 1.时钟约束1.1创建时钟1.2.生成时钟1.3虚拟时钟1.4 最小时钟脉宽 2.I/O延时约束2.1设置输入延时2.2设置输出延时 3.I/O环境建模约束3.1输入驱动建模3.2输出负载建模 4.时序例外4.1多周期路径设置&#xff08;multicycle path&#xff09;4.2伪路径设置&#xff08;false_p…

51单片机基础05 实时时钟-思路及代码参考2、3

目录 一、思路二 1、原理图 2、代码 二、思路三 1、原理图 2、代码 一、思路二 所有设定功能相关的操作均在矩阵键盘进行实现&#xff0c;并在定时器中扫描、计数等 1、原理图 2、代码 #include <AT89X52.h> //调用51单片机的头文件 //------------------…

【C++篇】深入剖析C++ Vector底层源码及实现机制

文章目录 须知 &#x1f4ac; 欢迎讨论&#xff1a;如果你在学习过程中有任何问题或想法&#xff0c;欢迎在评论区留言&#xff0c;我们一起交流学习。你的支持是我继续创作的动力&#xff01; &#x1f44d; 点赞、收藏与分享&#xff1a;觉得这篇文章对你有帮助吗&#xff1…

【代码pycharm】动手学深度学习v2-04 数据操作 + 数据预处理

数据操作 数据预处理 1.数据操作运行结果 2.数据预处理实现运行结果 第四课链接 1.数据操作 import torch # 张量的创建 x1 torch.arange(12) print(1.有12个元素的张量&#xff1a;\n,x1) print(2.张量的形状&#xff1a;\n,x1.shape) print(3.张量中元素的总数&#xff1…

鸿蒙HarmonyOS开发:一次开发,多端部署(工程级)三层工程架构

文章目录 一、工程创建1、先创建出最基本的项目工程。2、新建common、features、 products 目录 二、工程结构三、依赖关系1、oh-package.json52、配置ohpm包依赖 四、引用ohpm包中的代码1、定义共享资源2、在common模块index文件中导出3、在phone模块oh-package.json5文件中引…