14、Kafka 请求是怎么被处理的

news2024/11/18 8:35:40

Kafka 请求是怎么被处理的

  • 1、处理请求的 2 种常见方案
    • 1.1、顺序处理请求
    • 1.2、每个请求使用单独线程处理
  • 2、Kafka 是如何处理请求的?
  • 3、控制类请求和数据类请求分离

无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过 “请求 / 响应” 的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。

Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。

下面会详细讨论一下 Kafka Broker 端处理请求的全流程。

1、处理请求的 2 种常见方案

1.1、顺序处理请求

缺陷是,吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。

1.2、每个请求使用单独线程处理

好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。
缺陷是,为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

2、Kafka 是如何处理请求的?

kafka 处理请求使用 Reactor 模式。Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景

Reactor 模式的架构如下图所示:
在这里插入图片描述
从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。

在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。

为 Kafka 画一张类似的图的话,那它应该是这个样子的:
在这里插入图片描述
这两张图长得差不多。Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。

好了,你现在了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。

在这里插入图片描述

当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。

IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。

比如,如果你的机器上 CPU 资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。

细心的你一定发现了请求队列和响应队列的差别:请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。

我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的 “炼狱” 组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

到这里,Kafka 请求流程解析的故事就已经讲完了

到目前为止,这里提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。

3、控制类请求和数据类请求分离

在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。

我来举个例子说明一下。假设我们有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时你如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。

这时,一个尴尬的场面就出现了:如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。

设想一下,如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。即使 acks 不是 all,积压的 PRODUCE 请求能够成功写入 Leader 副本的日志,但处理 LeaderAndIsr 之后,Broker 0 上的 Leader 变为了 Follower 副本,也要执行显式的日志截断(Log Truncation,即原 Leader 副本成为 Follower 后,会将之前写入但未提交的消息全部删除),依然做了很多无用功。

再举一个例子,同样是在积压大量数据类请求的 Broker 上,当你删除主题的时候,Kafka 控制器(我会在专栏后面的内容中专门介绍它)向该 Broker 发送 StopReplica 请求。如果该请求不能及时处理,主题删除操作会一直 hang 住,从而增加了删除主题的延时。

那么,社区是如何解决的呢?社区实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1322710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉搜索树第大K节点,剑指offer,力扣

目录 题目地址: 题目: 我们直接看题解吧: 解题方法: 难度分析: 审题目事例提示: 解题分析: 解题思路: 代码实现: 代码补充: 代码实现(非递归&…

20倍压缩比!微软提出大模型提示压缩框架LLMLingua

近期,越来越多研究在探索大型语言模型(LLM)在实际应用中的推理和生成能力。随着 ChatGPT 等模型的广泛研究与应用,如何在保留关键信息的同时,压缩较长的提示成为当前大模型研究的问题之一。 为了加速模型推理并降低成本…

ViewBinding与DataBinding(视图绑定与数据双向绑定)

前言:心中纵是有所盼 严寒没有减 风很冷 我的手已渐蓝 前言 控件查找对于Android开发来说也是一部血泪史,一直为更有效的方案进行了多种方案的研究和探讨。findViewById() 过于繁琐,强制转换不安全;butterkniife 会存在众多臃肿的…

【【UART 传输数据实验】】

UART 传输数据实验 通信方式在日常的应用中一般分为串行通信(serial communication)和并行通信(parallel communication)。 我们再来了解下串行通信的特点。串行通信是指数据在一条数据线上,一比特接一比特地按顺序传…

随笔记录-springboot_LoggingApplicationListener+LogbackLoggingSystem

环境:springboot-2.3.1 加载日志监听器初始化日志框架 SpringApplication#prepareEnvironment SpringApplicationRunListeners#environmentPrepared EventPublishingRunListener#environmentPrepared SimpleApplicationEventMulticaster#multicastEvent(Applicati…

字符设备驱动的加载与卸载

一. 简介 前面几篇文章编写了 字符设备驱动模块加载与卸载框架代码,设置了开发板启动方式。文章地址如下: 字符设备驱动框架的编写-CSDN博客 字符设备驱动模块的编译-CSDN博客 字符设备驱动的加载与卸载前工作-CSDN博客 本文学习如何加载与卸载驱动…

windows10 固定电脑IP地址操作说明

windows10 固定电脑IP地址操作说明 一、无线网络的IP地址设置方法二、有线网络的IP地址设置方法 本文主要介绍,windows10操作系统下,不同的网络类型,对应的电脑IP地址设置方法。 一、无线网络的IP地址设置方法 在桌面右下角,点击…

st.pp.normalize_total(data) # NOTE: no log1p

这段代码在使用 stlearn 包中的 st.pp.normalize_total 函数对数据进行总体计数标准化。标准化后,每个细胞的总计数都将等于 median(total_counts)。 NOTE: no log1p 这行注释表示在标准化后,数据不会进行 log1p 转换。log1p 转换将每个计数值增加 1&a…

Java如何创建线程?到底有几种方式创建线程?

文章目录 继承Thread类实现Runnable接口实现Callable接口匿名内部类形式的线程创建实现接口 VS 继承Thread到底有几种创建线程的方式?参考 继承Thread类 定义一个线程类,重写实现run方法(因为 Thread类也实现了 Runable接口),在其中定义线程…

Pytorch神经网络的参数管理

目录 一、参数访问 1、目标参数 2、一次性访问所有参数 3、从嵌套块收集参数 二、参数初始化 1、内置初始化 2、自定义初始化 3、参数绑定 在选择了架构并设置了超参数后,我们就进入了训练阶段。此时,我们的目标是找到使损失函数最小化的模型参数…

矩阵式键盘实现的电子密码锁

#include<reg51.h> //包含51单片机寄存器定义的头文件 sbit P14P1^4; //将P14位定义为P1.4引脚 sbit P15P1^5; //将P15位定义为P1.5引脚 sbit P16P1^6; //将P16位定义为P1.6引脚 sbit P17P1^7; //将P17位定义为P1.7引脚 sbit soundP3^7; //将so…

新媒体宣传与广州迅腾文化传播有限公司:品牌知名度提升的新动力

新媒体宣传与广州迅腾文化传播有限公司&#xff1a;品牌知名度提升的新动力 随着科技的飞速发展和互联网的普及&#xff0c;新媒体已经成为现代社会不可或缺的一部分。新媒体平台具有传播速度快、覆盖面广的特点&#xff0c;为企业品牌宣传提供了前所未有的机会。广州迅腾文化…

黑马点评07 秒杀优化 加阻塞队列

实战篇-22.秒杀优化-异步秒杀思路_哔哩哔哩_bilibili 1.流程回顾 1.1超卖问题 判断秒杀时间&#xff0c;加乐观锁&#xff08;比较标记/版本&#xff09;&#xff0c;检查库存是否大于0 1.2一人一单问题 看看数据库里有没有这个这个人下的订单&#xff1a; 1.单机模式中…

自动化测试 (五) 读写64位操作系统的注册表

自动化测试经常需要修改注册表 很多系统的设置&#xff08;比如&#xff1a;IE的设置&#xff09;都是存在注册表中。 桌面应用程序的设置也是存在注册表中。 所以做自动化测试的时候&#xff0c;经常需要去修改注册表 Windows注册表简介 注册表编辑器在 C:\Windows\regedit…

第二百一十五回 如何创建单例模式

文章目录 1. 概念介绍2. 思路与方法2.1 实现思路2.2 实现方法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"分享三个使用TextField的细节"沉浸式状态样相关的内容&#xff0c;本章回中将介绍 如何创建单例模式.闲话休提&#xff0c;让我们一起Talk Flutter吧。 …

@KafkaListener 注解配置多个 topic

见如下示例 主要见 KafkaListener 中 topics 属性的配置 其中 ${xxxx.topic1} 为从springBoot 配置文件中读取的属性值 KafkaListener(topics {"${xxxx.topic1}", "${xxxx.topic2}"}, groupId "${xxxx.groupId}",containerFactory "xxx…

易点易动打通OA系统,实现固定资产高效管理

近年来,随着信息化建设的不断深入,OA系统在企业管理工作中的应用也日趋广泛。传统的固定资产管理存在数据分散,管理效率低等问题。深度整合易点易动和OA系统,可以打通各系统之间的数据通道,实现固定资产通过OA系统的全流程管理。这不仅可以提升管理效率,减轻人工管理成本,也更方…

部署LVS的NAT模式

实验准备 #负载调度器# 192.168.116.40 #内网 12.0.0.100 #外网 先添加双网卡 #web服务器# 192.168.116.20 #web1 192.168.116.30 #web2 #nfs共享服务# 192.168.116.10 #nfs systemctl stop firewalld setenforce 0 1.nfs共享文件 1…

Python-Selenium-使用 pywinauto 实现 Input 上传文件

当前环境&#xff1a;Win10 Python3.7 pywinauto0.6.8&#xff0c;selenium3.14.1 示例代码 from pywinauto import Desktop import osapp Desktop() dialog app[打开] dialog[Edit].set_edit_text(os.getcwd() .\\example-01.jpg) dialog[Button].click() 其他方法&…

接口测试的工具(3)----postman+node.js+newman

1.安装newman&#xff1a;输入命令之后 一定注意 什么都不要操作 静静的等待结束就行了。 2.安装失败的对此尝试不行 在用下面的方法 解压一下就行了 3.验证是否成功 多次尝试是可以在线安装成功的