Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

news2025/1/12 1:44:19

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

在这里插入图片描述

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    在这里插入图片描述

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

在这里插入图片描述

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

在这里插入图片描述

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

在这里插入图片描述
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

在这里插入图片描述
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

在这里插入图片描述
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

在这里插入图片描述

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

在这里插入图片描述
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

在这里插入图片描述
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

在这里插入图片描述
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

在这里插入图片描述

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1393333.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

el-table里面存在固定列获取video的ref的时候无法获取原始DOM

el-table里面存在固定列获取video的ref的时候无法获取原始DOM 问题复现 这是通过ref获取的dom实例&#xff0c;却变成了fixed固定出现了表格里面的video的实例 我现在的需求是修改里面的currentTime&#xff0c;但是获取的是固定列的video的ref&#xff0c;修改了&#xff0c…

Redis 笔记一

概览 1.Redis核心数据存储结构 2.Redis底层String编码int&embstr&raw 3.Redis底层压缩列表&跳表&哈希表 4.Redis底层Zset实现压缩列表和跳表如何选择 5.基于Redis实现微博&抢红包&12306核心业务 辅助学习&#xff1a;Redis 教程 | 菜鸟教程 1.Redis为什…

虚拟架桥:SD-WAN企业组网网络的智慧构筑

云桥通SD-WAN企业组网&#xff08;软件定义广域网&#xff09;代表着一项通过软件定义和虚拟化技术&#xff0c;将企业分支机构、数据中心和云服务等多种网络连接有机整合的创新解决方案。其核心框架涵盖了以下关键构成&#xff1a; 边缘设备&#xff1a; 在云桥通SD-WAN企业组…

小程序中使用上传图片,显示、删除、预览

一、功能介绍 需要哦用户点击加号上传图片&#xff0c;并展示所上传图片和能够删除和预览 二、功能实现 采用的uniapp&#xff0c;创建了一个view容器包裹加号图标和展示的图片。 内部展示图片超过9张时候&#xff0c;加号图片隐藏 <view class"img-list">/…

Django实现下载100G的超大CSV文件

关注我的公众号「DevOps724」&#xff0c;获取最新的内容分享&#xff0c;带你探索DevOps的无限可能&#xff01;分享最新的行业趋势、深入的技术分析和实用的工具&#xff0c;帮助你掌握自动化、云计算、持续集成和部署等核心概念。 在处理大数据集的时候&#xff0c;我们经常…

【前沿技术杂谈:智能对话的未来】深入比较ChatGPT与文心一言

【前沿技术杂谈&#xff1a;智能对话的未来】深入比较ChatGPT与文心一言 引言主体智能回复语言准确性知识库丰富度 深入分析&#xff1a;ChatGPT与文心一言的技术对比技术架构和算法数据处理和隐私用户界面和体验 应用场景分析未来展望技术进步的趋势潜在的挑战对社会的影响 结…

[计算机提升] 用户账户控制设置

4.11 用户账户控制设置 用户账户控制设置用来选择何时通知使用者关于计算机更改的消息&#xff0c;是一个比较有用的功能。有时候一些流氓软件在获得权限后可以在后台默认修改注册表或者下载或者安装软件&#xff0c;这个对用户而言&#xff0c;体验不是很好&#xff0c;而且更…

开发实践6_project

要求&#xff1a; ① 页面写入超链接&#xff0c;获取所有数据item&#xff0c;显示在另一个页面&#xff0c;1min内&#xff0c;即使数据有变化&#xff0c;页面内容不变&#xff0c;1min后点击超链接可获取最新信息&#xff1b; ② 使用middleware完成用户请求路径判断 &am…

web前端项目-贪吃蛇小游戏【附源码】

web前端项目-贪吃蛇小游戏 【贪吃蛇】是一款经典的小游戏&#xff0c;采用HTML、CSS和JavaScript技术进行开发&#xff0c;玩家通过控制一条蛇在地图上移动&#xff0c;蛇的目的是吃掉地图上的食物&#xff0c;并且让自己变得更长。游戏的核心玩法是控制蛇的移动方向和长度&am…

C++(12)——string

目录 1.insert: 1.1 string& insert (size_t pos, const string& str)&#xff1a; 1.2 string& insert (size_t pos, const char* s)&#xff1a; 1.3 string& insert (size_t pos, const char* s, size_t n)&#xff1a; 1.4 string& insert (…

【c++】栈(satck)和队列(queue)

目录 一、stack 1.stack的介绍 2.stack的使用 3.stack的模拟实现 二、queue 1.queue的介绍 2.queue的使用 3.queue的模拟实现 三、priority_queue 1.priority_queue的介绍 2.priority_queue的使用 一、stack 1.stack的介绍 &#xff08;1&#xff09;stack是一种容…

Baichuan2百川模型部署的bug汇总

1.4bit的量化版本最好不要在Windows系统中运行&#xff0c;大概原因报错原因是bitsandbytes不支持window&#xff0c;bitsandbytes-windows目前仅支持8bit量化。 2. 报错原因是机器没有足够的内存和显存&#xff0c;offload_folder设置一个文件夹来保存那些离线加载到硬盘的权…

包含广告或宣传性质的内容或参考资料不对应,百度百科词条怎么改

想要修改百度百科词条&#xff0c;却发现在编辑百度百科词条时经常提示“包含广告或宣传性质的内容”&#xff0c;又或者经常遇到“参考资料不对应”的情况&#xff0c;我们该如何正确修改百度百科词条才能推广&#xff0c;洛希爱做百科网为大家分享。 修改百科百度百科词条提示…

基于SSM的校园闲置物品交易平台设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

nexus3 npm-hosted仓库迁移

迁移背景&#xff1a; 从nexus 3.33 升级到 nexus 3.64 过程中&#xff0c;私服 npm-hosted 无法上传。由于这个 npm-hosted 和 npm-proxy 放的同一个 blob存储&#xff0c;无法单独拆除去&#xff0c;所以采用迁移的方式 迁移思路&#xff1a; down下来 npm-hosted 仓库&am…

e2studio开发三轴加速度计LIS2DW12(3)----检测活动和静止状态

e2studio开发三轴加速度计LIS2DW12.3--检测活动和静止状态 概述视频教学样品申请源码下载新建工程工程模板保存工程路径芯片配置工程模板选择时钟设置UART配置UART属性配置设置e2studio堆栈e2studio的重定向printf设置R_SCI_UART_Open()函数原型回调函数user_uart_callback ()…

【征服redis8】Redis的AOF持久化

Redis 支持多种持久化方式来保证数据的可靠性和持久性。前面我们介绍了RDB方式。我们我们介绍第二种方式——AOF&#xff08;Append Only File&#xff09;机制是一种常用的持久化方式&#xff0c;它记录了所有对 Redis 数据库进行修改的命令&#xff0c;在 Redis 重启时可以使…

【Java】HttpServlet类中前后端交互三种方式(query string、form表单、JSON字符串)

在前后端的交互中&#xff0c;前端通过以下三种方式来与后端进行交互&#x1f31f; ✅query string ✅form表单 ✅JSON字符串 下面我们将书写这三种方式的后端代码并进行讲解 1、Query String QueryString即在url中写入键值对&#xff0c;一般用doGet方法进行交互 代码如下 …

读AI3.0笔记02_起源

1. 起源 1.1. 1955年&#xff0c;28岁的麦卡锡进入了达特茅斯学院的数学系 1.2. 该领域的正式确立可以追溯到1956年由一位名叫约翰麦卡锡的年轻数学家在达特茅斯学院举办的一场小型研讨会 1.2.1. 在1956年&#xff0c;即便是最先进的计算机&#xff0c;其速度也达不到现代智…

HarmonyOS —— buildMode 设置(对比 Android Build Varient)

前言 在安卓中 Build Variant 主要依赖模块&#xff08;module&#xff09;中 build.gradle 的 BuildType 和 ProductFlavor 提供的属性和方法&#xff0c;我们可以使用 Build Type 可以配置不同的构建方式、ProductFlavor 主要用来进行多渠道打包。 在鸿蒙中要做到同样像效果…