Kafka源码分析(四) - Server端-请求处理框架

news2024/11/16 1:55:42

系列文章目录

Kafka源码分析-目录

一. 总体结构

先给一张概览图:
在这里插入图片描述

服务端请求处理过程涉及到两个模块:kafka.networkkafka.server

1.1 kafka.network

该包是kafka底层模块,提供了服务端NIO通信能力基础。

有4个核心类:SocketServer、Acceptor、Processor、RequestChannel。各自角色如下:

  • SocketServer:服务端的抽象,是服务端通信的入口;

  • Acceptor:Reactor通信模式中处理连接ACCEPT事件的线程/线程池所执行的任务;

  • Processor:Reactor通信模式中处理连接可读/可写事件的线程/线程池所执行的任务;

  • RequestChannel:请求队列,存储已经解析好的请求以等待处理;

对于上层模块而言,该基础模块有两个输入和一个输出

  1. 输入:IP+端口号,该模块会对目标端口实现监听;

  2. 输出:解析好的请求,通过RequestChannel进行输出;

  3. 输入:待发送的Response,通过Processor.responseQueue来完成输入;

1.2 kafka.server

该包在kafka.network的基础上实现各种请求的处理逻辑,主要包含KafkaServer和KafkaApis两个类。其中:

  • KafkaServer:Kafka服务端的抽象,统一维护Kafka服务端的各流程和状态;

  • KakfaApis:维护了各类请求对应的业务逻辑,通过KafkaServer.apis字段组合到KafkaServer之中;

二. Server的端口监听

整体流程如图:
在这里插入图片描述

接下来按调用顺序依次分析各方法

2.1 KafkaServer.startup()

关于端口监听的核心逻辑分4步,代码如下(用注释说明各部分的目的):

def startup() {
  // 省略无关代码
  ... ...

  // 1. 创建SocketServer
  socketServer = new SocketServer(config, metrics, time, credentialProvider)

  // 2. 启动端口监听
  // (在这里完成了Acceptor的创建和端口ACCEPT事件的监听)
  // (startupProcessors = false表示暂不启动Processor处理线程)
  socketServer.startup(startupProcessors = false)

  // 3. 启动请求处理过程中的相关依赖
  // (这也是第2步中不启动Processor处理线程的原因,有依赖项需要处理)
  ... ...

  // 4. 启动端口可读/可写事件处理线程(即Processor线程)
  socketServer.startProcessors()

  // 省略无关代码
  ... ...
}

2.2 SocketServer.startup(Boolean)

代码及说明性注释如下:

def startup(startupProcessors: Boolean = true) {
  this.synchronized {
    // 省略无关代码
    ... ...

    // 1. 创建Accetpor和Processor的实例,
    // 同时页完成了Acceptor对端口ACCEPT事件的监听
    createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)

    // 2. [可选]启动各Acceptor对应的Processor线程
    if (startupProcessors) {
      startProcessors()
    }
  }
}

2.3 ScocketServer.createAcceptorAndProcessor()

直接上注释版的代码,流程分3步:

// 入参解释
// processorsPerListener: 对于每个IP:Port, 指定Reactor模式子线程池大小, 
//                        即处理端口可读/可写事件的线程数(Processor线程);
// endpoints: 接收请求的IP:Port列表;
def createAcceptorAndProcessors(processorsPerListener: Int,
                                endpoints: Seq[EndPoint]): Unit = synchronized {
    // 省略无关代码
    ... ...

    endpoints.foreach { endpoint =>
      // 省略无关代码
      ... ...

      // 1. 创建Acceptor对象
      // 在此步骤中调用Acceptor.openServerSocket, 完成了对端口ACCEPT事件的监听
      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)

      // 2. 创建了与acceptor对应的Processor对象列表
      // (这里并未真正启动Processor线程)
      addProcessors(acceptor, endpoint, processorsPerListener)

      // 3. 启动Acceptor线程
      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()

      // 省略无关代码
      ... ...
    }
  }

2.4 Acceptor.openServerSocket()

该方法中没什么特殊点,就是java NIO的标准流程:

def openServerSocket(host: String, port: Int): ServerSocketChannel = {
  // 1. 构建InetSocketAddress对象
  val socketAddress =
    if (host == null || host.trim.isEmpty)
      new InetSocketAddress(port)
    else
      new InetSocketAddress(host, port)

  // 2. 构建ServerSocketChannel对象, 并设置必要参数值
  val serverChannel = ServerSocketChannel.open()
  serverChannel.configureBlocking(false)
  if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)

  // 3. 端口绑定, 实现事件监听
  try {
    serverChannel.socket.bind(socketAddress)
    info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
  } catch {
    case e: SocketException =>
      throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)
  }

  // 4. 返回ServerSocketChannel对象, 用于后续register到Selector中
  serverChannel
}

2.5 SocketServer.startProcessor()

从这步开始,仅剩的工作就是启动Processor线程,代码都非常简单。比如本方法只是遍历Acceptor列表,并调用Acceptor.startProcessors()

def startProcessors(): Unit = synchronized {
  acceptors.values.asScala.foreach { _.startProcessors() }
  info(s"Started processors for ${acceptors.size} acceptors")
}

2.6 Acceptor.startProcessors()

该方法很简明,直接上代码

def startProcessors(): Unit = synchronized {
  if (!processorsStarted.getAndSet(true)) {
    startProcessors(processors)
  }
}

def startProcessors(processors: Seq[Processor]): Unit = synchronized {
  processors.foreach { processor =>
    KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
      processor).start()
  }
}

三. 请求/响应的格式

3.1 格式概述

在这里插入图片描述
请求和响应都由两部分组成:Header和Body。RequestHeader中包含ApiKey、ApiVersion、CorrelationId、ClientId;ResponseHeader中只包含CorrelationId字段。接下来逐个讲解这些字段。

  • ApiKey

    2字节整型,指明请求的类型;比如0代表Produce请求,1代表Fetch请求;具体id和请求类型之间的映射关系可在 org.apache.kafka.common.protocol.ApiKeys 中找到;

  • ApiVersion

    随着API的升级迭代,各类型请求的请求体格式可能有变更;这个2字节的整型指明了请求体结构的版本;

  • CorrelationId

    4字节整型,在Response中传回,Kafka Server端不处理,用于客户端内部关联业务数据;

  • ClientId

    可变长字符串,标识客户端;

3.2 请求体/响应体的具体格式

各业务操作(比如Produce、Fetch等)对应的请求体和响应体格式都维护在 org.apache.kafka.common.protocol.ApiKeys 中。接下来以Produce为例讲解ApiKeys是如何表达数据格式的。

ApiKeys是个枚举类,其核心属性如下:

public enum ApiKeys {
  // 省略部分代码
  ... ...

  // 上文提到的请求类型对应的id
  public final short id;

  // 业务操作名称
  public final String name;

  // 各版本请求体格式
  public final Schema[] requestSchemas;

  // 各版本响应体格式
  public final Schema[] responseSchemas;

  // 省略部分代码
  ... ...
}

其中PRODUCE枚举项的定义如下

PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions())

可以看到各版本的请求格式维护在 ProduceRequest.schemaVersions(),代码如下

public static Schema[] schemaVersions() {
  return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
    PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
}

这里只是简单返回了一个Schema数组。一个Schema对象代表了一种数据格式。请求头中的ApiVersion指明了请求体的格式对应数组的第几项(从0开始)。

接下来我们看看Schema是如何表达数据格式的。其结构如下
在这里插入图片描述
Schema有两个字段:fields和fieldsByName。其中fields是体现数据格式的关键,它指明了字段的排序和各字段类型;而fieldsByName只是按字段名重新组织的Map,用于根据名称查找对应字段。

BoundField只是Field的简单封装。Field有两个核心字段:name和type。其中name表示字段名称,type表示字段类型。常见的Type如下:

Type.BOOLEAN;
Type.INT8;
Type.INT16;
Type.INT32;

// 可通过org.apache.kafka.common.protocol.types.Type查看全部类型
... ...

回到PRODUCE API,通过查看Schema的定义,能看到其V0版本的请求体和响应体的结构如下:
在这里插入图片描述

四. 请求的处理流程

在这里插入图片描述

  1. Acceptor监听到ACCEPT事件(TCP创建连接"第一次握手"的SYN);

  2. Acceptor将将连接注册到Processor列表内的其中一个,由该Processor监听这个连接的后续可读可写事件;

  3. Processor接收到完整请求后,会将Request追加到RequestChannel中进行排队,等待后续处理;

  4. KafkaServer中有个requestHandlerPool的字段,KafkaRequestHandlerPool类型,代表请求处理线程池;KafkaRequestHandler就是其中的线程,会从RequestChannel拉请求进行处理;

  5. KafkaRequestHandler将拉到的Request传入KafkaApis.handle(Request)方法进行处理;

  6. KafkaApis根据不同的ApiKey调用不同的方法进行处理,处理完毕后会将Response最终写入对应的Processor的ResponseQueue中等待发送;KafkaApis.handle(Request)的方法结构如下:

    def handle(request: RequestChannel.Request) {
      try {
        // 省略部分代码
        ... ...
        request.header.apiKey match {
          case ApiKeys.PRODUCE => handleProduceRequest(request)
          case ApiKeys.FETCH => handleFetchRequest(request)
          case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
          case ApiKeys.METADATA => handleTopicMetadataRequest(request)
          case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
          // 省略部分代码
          ... ...
        }
      } catch {
        case e: FatalExitError => throw e
        case e: Throwable => handleError(request, e)
      } finally {
        request.apiLocalCompleteTimeNanos = time.nanoseconds
      }
    }
    
  7. Processor从自己的ResponseQueue中拉取待发送的Respnose;

  8. Processor将Response发给客户端;

五. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1622019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring的bean创建流程源码解析

文章目录 IOC 和 DIBeanFactoryApplicationContext实现的接口1、BeanFactory接口2、MessageSource 国际化接口3、ResourcePatternResolver,资源解析接口4、EnvironmentCapable接口,用于获取环境变量,配置信息5、ApplicationEventPublisher 事…

GAN详解,公式推导解读,详细到每一步的理论推导

在看这一篇文章之前,希望熟悉掌握熵的知识,可看我写的跟熵相关的一篇博客https://blog.csdn.net/m0_59156726/article/details/138128622 1. GAN 原始论文:https://arxiv.org/pdf/1406.2661.pdf 放一张GAN的结构,如下&#xff1…

索引超详细解析

目录 索引概述 无索引时: 索引: 索引结构 介绍: 二叉树: B-Tree(多路平衡查找树): 经典BTree MySQL中B树 Hash索引 hash索引的特点: 存储引擎支持: 为什么InnoDB存储选择使用BTree…

升级 jQuery:努力打造健康的 Web 生态

jQuery 对 Web 的影响始终是显而易见的。当 jQuery 在 2006 年首次推出时,几乎立即成为 Web 开发人员的基本工具。它简化了 JavaScript 编程,使操作 HTML 文档、处理事件、执行动画等变得更加容易。从那时起,它在 Web 标准和浏览器功能的演变…

深度学习基础:循环神经网络中的Dropout

深度学习基础:循环神经网络中的Dropout 在深度学习中,过拟合是一个常见的问题,特别是在循环神经网络(RNN)等复杂模型中。为了应对过拟合问题,研究者们提出了许多方法,其中一种被广泛应用的方法…

CSS渐变色理论与分类、文字渐变色方案、炸裂渐变色方案以及主流专业渐变色工具网站推荐

渐变色彩可以增加视觉层次感和动态效果,使网页界面更加生动有趣,吸引用户注意力。另外,相较于静态背景图片,CSS渐变无需额外的HTTP请求,减轻服务器负载,加快页面加载速度;同时CSS渐变能够根据容…

应用软件运维服务方案(word原件)

信息化项目运维服务方案(投标,实施运维,交付) 1.项目整体介绍 2.服务简述 3.资源提供 软件全过程性,标准型,规范性文档(全套资料包)获取:本文末个人名片直接获取&…

WPS二次开发系列:WPS SDK打开在线文档

作者持续关注WPS二次开发专题系列,持续为大家带来更多有价值的WPS开发技术细节,如果能够帮助到您,请帮忙来个一键三连,更多问题请联系我(QQ:250325397) 目录 需求场景 效果展示 3、实现步骤 3.1 步骤一、申…

spring boot3单模块项目工程搭建-下(个人开发模板)

⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 🌊山高路远,行路漫漫,终有归途 目录 写在前面 上文衔接 常用依赖介绍以及整合 web组件 测试组件 样板代码生成 数据库连接器 常用工具包 面向切面编…

《QT实用小工具·三十九》仿 Windows10 画图3D 的颜色选择器, 但更加强大

1、概述 源码放在文章末尾 该项目实现了仿 Windows10 画图3D 的颜色选择器,功能更加丰富更加强大。 项目部分代码如下所示: import QtQuick 2.15 import QtQuick.Controls 2.15 import QtQuick.Layouts 1.15 import QtGraphicalEffects 1.15Item {id…

【踩坑】libtorch load 报错 No such file or directory

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,请不吝给个[点赞、收藏、关注]哦~ 目录 报错背景 报错原因 解决方法 方法一:把你的编译配置转为release版本 方法二:安装debug版本的libtorch 报错背景…

算法学习001-圆桌问题 中小学算法思维学习 信奥算法解析 c++实现

目录 算法学习001-圆桌问题 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 七、推荐资料 算法学习001-圆桌问题 一、题目要求 1、编程实现 圆桌边围坐着2n个人,其中n个人是好人&#xff0c…

Redis 安装及配置教程(Windows)【安装】

文章目录 一、简介一、 下载1. GitHub 下载2. 其它渠道 二、 安装1. ZIP2. MSI 软件 / 环境安装及配置目录 一、简介 Redis 官网地址:https://redis.io/   Redis 源码地址:https://github.com/redis/redis   Redis 官网安装地址(无Windo…

基于SSM的物业管理系统(含源码+sql+视频导入教程+文档+PPT)

👉文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于SSM的物业管理系统2拥有三种角色 管理员:用户管理、物业管理、房产信息管理、小区概况管理、开发商管理、收费标准管理、物业公司管理等 物业:住户管理、收费…

vector的使用(部分接口)

1.vector的使用 1.1vector的定义 (constructor)构造函数声明接口说明vector()无参构造vector (const vector& x)拷贝构造 1.2vector iterator 的使用 iterator的使用接口说明begin end获取第一个数据位置的iterator/const_iterator, 获取最后一个数据的下一个位…

【数据结构】单链表的特点

🎈个人主页:豌豆射手^ 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:数据结构 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进…

UML——类图详解

目录 1. 前言 2. 类图概述 3. 类图表示法 3.1 类的表示方式 3.2 类与类之间关系的表示方式 (1)继承(泛化)关系 (2)实现关系 (3)依赖关系 (4)一般关联关系 (5)聚合关系 (6)组合关系 1. 前言 UML全称(Unified Modeling Language),译为统一建模语言&#x…

Android自定义ListView单击事件失效的解决方法

因为自带的listView不能满足项目需求,通过实现自己的Adapter去继承ArrayAdapter 来实现自定义ListView的Item项目。 出现点击ListView的每一项都不会执行setOnItemClickListener 里面的onItemClick 方法。 原因是item里面存在一些子控件,默认点击获取的…

使用 PhpMyAdmin 安装 LAMP 服务器

使用 PhpMyAdmin 安装 LAMP 服务器非常简单。按照下面所示的步骤,我们将拥有一个完全可运行的 LAMP 服务器(Linux、Apache、MySQL/MariaDB 和 PHP)。 什么是 LAMP 服务器? LAMP 代表 Linux、Apache、MySQL 和 PHP。它们共同提供…

如何在PostgreSQL中实现分布式事务,特别是在多节点集群环境中?

文章目录 解决方案:使用Citus实现分布式事务步骤一:安装和配置Citus步骤二:定义分布式表和分布键步骤三:执行分布式事务示例代码 总结 在PostgreSQL中实现分布式事务,特别是在多节点集群环境中,是一个复杂但…