Kafka-服务端-网络层-源码流程

news2025/1/10 23:44:45

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {
       def startup() {
       	socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startupProcessors = false)
                /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, ...)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)
          }
     }

SocketServer

  def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      ...
      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
      if (startupProcessors) {
        startProcessors()
      }
    }
      
  private def createAcceptorAndProcessors(processorsPerListener: Int,
                                          endpoints: Seq[EndPoint]): Unit = synchronized {
	...
    endpoints.foreach { endpoint =>
	  ...
      val acceptor = new Acceptor(endpoint, ...)
      addProcessors(acceptor, endpoint, processorsPerListener)
      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
      acceptor.awaitStartup()
      acceptors.put(endpoint, acceptor)
    }
  }

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable) {
                  val processor = synchronized {
                    currentProcessor = currentProcessor % processors.size
                    processors(currentProcessor)
                  }
                  accept(key, processor)
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread, mod(numProcessors) will be done later
                currentProcessor = currentProcessor + 1
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // 在新分到的客户端连接上注册OP_READ事件
          configureNewConnections()
          // 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送
          processNewResponses()
          // selector轮询各种事件,读取请求或者发送响应
          poll()
          // 封装selector.completedReceives中的请求,放入requestQueue
          processCompletedReceives()
          // 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)
          processCompletedSends()
          processDisconnected()
        } catch {
          ...
        }
      }
    } finally {
      ...
    }
  }

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {
  ...
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(..., requestChannel, apis, time)
    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
  }
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {
  ...
  def run() {
    while (!stopped) {

      val req = requestChannel.receiveRequest(300)

      req match {
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            apis.handle(request)
          } catch {
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1884311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

易校网校园综合跑腿小程序源码修复运营版

简介&#xff1a; 易校网校园综合跑腿小程序源码修复运营版&#xff0c;带服务端客户端前端文档说明。 源码安装方法&#xff1a; 需要准备小程序服务号 服务器 备案域名 校园网跑腿小程序源码需要准备 1.小程序 2.服务器&#xff08;推荐配置2h4g3m&#xff09; 3.域名…

安卓实现微信聊天气泡

一搜没一个能用的&#xff0c;我来&#xff1a; 布局文件&#xff1a; <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/android"xml…

第十二章 路由器静态路由配置

实验目标 掌握静态路由的配置方法和技巧&#xff1b; 掌握通过静态路由方式实现网络的连通性&#xff1b; 熟悉广域网线缆的链接方式&#xff1b; 实验背景 学校有新旧两个校区&#xff0c; 每个校区是一个独立的局域网&#xff0c; 为了使新旧校区能够正常相互…

CTFHUB-SSRF-上传文件

通过file协议访问flag.php文件内容 ?urlfile:///var/www/html/flag.php 右键查看页面源代码&#xff0c;发现需要从内部上传一个文件这样才能正常获取到flag ?urlhttp://127.0.0.1/flag.php 发现无提交按钮&#xff0c;构造一个 <input type"submit" name&qu…

HarmonyOS开发探索:使用Snapshot Insight分析ArkTS内存问题

识别内存问题 当怀疑应用存在内存问题的时候&#xff0c;首先使用DevEco Profiler的Allocation Insight来度量内存在问题场景下的大小变化以及整体趋势&#xff0c;初步定界问题出现的位置&#xff08;Native Heap/ArkTS Heap/dev等&#xff09;。 在初步识别内存问题出现的位置…

阿里云物联网应用层开发:第二部分,云产品流转

文章目录 哔哩哔哩视频教程1、云产品流转概述2、我们需要创建多少个云产品流转&#xff1f;3、阿里云物联网平台产品云流转实现3-1 创建数据源3-2 创建数据目的3-2 创建解析器,并关联数据、编写脚本 哔哩哔哩视频教程 【阿里云物联网综合开发&#xff0c;STM32ESP8266微信小程…

2024年道路运输安全员(企业管理人员)备考题库资料。

46.危险货物道路运输随车携带的单据&#xff0c;下列选项不属于的是&#xff08;&#xff09;。 A.道路运输危险货物安全卡 B.运单或者电子运单 C.道路危险货物运输从业资格证 D.车辆检测报告 答案&#xff1a;D 47.危险货物运输驾驶人员在24小时内实际驾驶车辆时间累计不…

opengl箱子的显示

VS环境配置&#xff1a; /JMC /ifcOutput "Debug\" /GS /analyze- /W3 /Zc:wchar_t /I"D:\Template\glfwtemplate\glfwtemplate\assimp" /I"D:\Template\glfwtemplate\glfwtemplate\glm" /I"D:\Template\glfwtemplate\glfwtemplate\LearnOp…

接口测试流程及测试点!

一、什么时候开展接口测试 1.项目处于开发阶段&#xff0c;前后端联调接口是否请求的通&#xff1f;&#xff08;对应数据库增删改查&#xff09;--开发自测 2.有接口需求文档&#xff0c;开发已完成联调&#xff08;可以转测&#xff09;&#xff0c;功能测试展开之前 3.专…

ctfshow-web入门-命令执行(web75-web77)

目录 1、web75 2、web76 3、web77 1、web75 使用 glob 协议绕过 open_basedir&#xff0c;读取根目录下的文件&#xff0c;payload&#xff1a; c?><?php $anew DirectoryIterator("glob:///*"); foreach($a as $f) {echo($f->__toString(). ); } ex…

如何用大模型RAG做医疗问答系统

代码参考 https://github.com/honeyandme/RAGQnASystemhttps://github.com/LongxingTan/open-retrievals TLDR if 疾病症状 in entities and 疾病 not in entities:sql_q "match (a:疾病)-[r:疾病的症状]->(b:疾病症状 {名称:%s}) return a.名称" % (entitie…

Cosine 余弦相似度并行计算的数学原理与Python实现

背景 Cosine 我在LLM与RAG系列课程已经讲了很多次了&#xff0c;这里不在熬述&#xff0c;它在LLM分析中&#xff0c;尤其是在语义相似度的计算中至关重要&#xff0c;在dot attention机制中&#xff0c;也会看到他的身影。这里讲的是纯数学上的运算与python是如何运用相关库进…

鸿蒙开发Ability Kit(程序访问控制):【向用户申请单次授权】

申请使用受限权限 受限开放的权限通常是不允许三方应用申请的。当应用在申请权限来访问必要的资源时&#xff0c;发现部分权限的等级比应用APL等级高&#xff0c;开发者可以选择通过ACL方式来解决等级不匹配的问题&#xff0c;从而使用受限权限。 举例说明&#xff0c;如果应…

代码随想录算法训练营第55天(py)| 单调栈 | 42. 接雨水*、84.柱状图中最大的矩形

42. 接雨水* 力扣链接 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 思路1 暴力 按列来计算。每一列雨水的高度&#xff0c;取决于&#xff0c;该列 左侧最高的柱子和右侧最高的柱子中&#xff0c;…

【记录】IDEA2023的激活与安装

前言&#xff1a; 记录IDEA2023的激活与安装 第一步&#xff1a;官网下载安装包&#xff1a; 下载地址&#xff1a;https://www.jetbrains.com/idea/download/other.html 这个最好选择2023版本&#xff0c;用着很nice。 安装步骤就不详解了&#xff0c;无脑下一步就可以了…

09 - matlab m_map地学绘图工具基础函数 - 绘制区域填充、伪彩色、加载图像和绘制浮雕效果的有关函数

09 - matlab m_map地学绘图工具基础函数 - 绘制区域填充、伪彩色、加载图像和绘制浮雕效果的有关函数 0. 引言1. 关于m_pcolor2. 关于m_image3. 关于m_shadedrelief4. 关于m_hatch5. 结语 0. 引言 本篇介绍下m_map中区域填充函数&#xff08;m_hatch&#xff09;、绘制伪彩色图…

C++多态~~的两个特殊情况

目录 1.多态的概念 2.简单认识 &#xff08;1&#xff09;一个案例 &#xff08;2&#xff09;多态的两个满足条件 &#xff08;3&#xff09;虚函数的重写 &#xff08;4&#xff09;两个特殊情况 1.多态的概念 &#xff08;1&#xff09;多态就是多种形态&#xff1b; …

某腾X滑块验证码

⚠️前言⚠️ 本文仅用于学术交流。 学习探讨逆向知识,欢迎私信共享学习心得。 如有侵权,联系博主删除。 请勿商用,否则后果自负。 网址 aHR0cHM6Ly9jbG91ZC50ZW5jZW50LmNvbS9wcm9kdWN0L2NhcHRjaGE= 1. 先整体分析一下 1_1. 验证码信息下发接口 cap_union_prehandle ua:…

AI绘画工具Midjourney:和Discord互相成就

前言 提到文生图&#xff0c;很多人都会想到植根于根植于Discord社区的Midjourney&#xff0c;本篇文章就基于作者的使用体验思考&#xff0c;并结合了Discord来对Midjourney进行探讨&#xff0c;感兴趣的朋友一起来看看吧。 如果要说现在最火的文生图&#xff0c;不得不说到Mi…

openmetadata1.3.1 自定义连接器 开发教程

openmetadata自定义连接器开发教程 一、开发通用自定义连接器教程 官网教程链接&#xff1a; 1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors 2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector &#xff08;一&…