Kafka-服务端-网络层

news2024/11/15 9:18:02

Reactor模式

Kafka网络层采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程应该了解JavaNIO提供了实现Reactor模式的API。常见的单线程Java NIO的编程模式如图所示。

在这里插入图片描述
为了满足高并发的需求,也为了充分利用服务器的资源,服务端需要使用多线程来执行业务逻辑。我们对上述架构稍作调整,将网络读写的逻辑与业务处理的逻辑进行拆分,让其由不同的线程池来处理,从而实现多线程处理。设计架构如图所示。

在这里插入图片描述
通过将网络处理与业务逻辑进行切分后实现了上述设计,此设计中读取、写入、业务处理都实现了多线程处理,不再存在性能瓶颈。

但是,如果同一时间出现大量I/O事件,单个Selector就可能在分发事件时阻塞(或延时)而成为瓶颈。

我们可以将上述设计中单独的Selector对象扩展成多个,让它们监听不同的I/O事件,这样就可以避免单个Selector带来的瓶颈问题。设计如图所示。

在这里插入图片描述

SocketServer

Kafka的网络层是采用多线程、多个Selector的设计实现的。核心类是SocketServer,其中包含一个Acceptor用于接受并处理所有的新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。每个Acceptor对应多个Handler线程,主要用于处理请求并将产生响应返回给Processor线程。Processor线程与Handler线程之间通过RequestChannel进行通信。整个网络层的结构如图所示。

在这里插入图片描述
下面介绍SocketServer的具体实现。首先来看SocketServer依赖的组件,如图所示。

在这里插入图片描述

AbstractServerThread

Acceptor和Processor都继承了AbstractServerThread,如图所示,AbstractServerThread是实现了Runnable接口的抽象类。在AbstractServerThread中为Acceptor和Processor提供了一些启动关闭相关的控制类方法。

在这里插入图片描述

Acceptor

Acceptor的主要功能是接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。

Acceptor中有两个比较重要的字段:一个是Java NIO Selector,注意不要与前面介绍的KSelector混淆;二是用于接收客户端请求的ServerSocketChannel对象。在创建Acceptor时会初始化上面两个字段,同时还会创建并启动其管理的Processors线程。

Acceptor.accept()方法实现了对OP_ACCEPT事件的处理,它会创建SocketChannel并将其交给Processoraccept方法处理,同时还会增加ConnectionQuotas中记录的连接数。

Processor

Processor主要用于完成读取请求和写回响应的操作,Processor不参与具体业务逻辑的处理。Processor的核心字段如下所述,在创建Processor对象时会初始化这些字段。

  • newConnections:ConcurrentLinkedQueue[SocketChannel]类型,其中保存了由此Processor处理的新建的SocketChannel。
  • inflightResponses:保存未发送的响应。有读者可能会将inflightResponses与客户端的InFlightRequests进行类比,但也要注意其区别,客户端并不会对服务端发送的响应消息再次发送确认,所以infightResponse中的响应会在发送成功后移除,而InFlightRequests中的请求是在收到响应后才移除。
  • selector:KSelector类型,负责管理网络连接。
  • requestChannel:Processor与Handler线程之间传递数据的队列。

在Acceptoraccept方法中创建的SocketChannel会通过Processor.accept方法交给Processor进行处理。

Processoraccpet方法接收到一个新的SocketChannel时会先将其放入newConnections队列中,然后会唤醒Processor线程来处理newConnections队列。

注意,newConnections队列由Acceptor线程和Processor线程并发操作,所以选择线程安全的ConcurrentLinkedQueue。

在Processor.run()方法中实现了从网络连接上读写数据的功能。run()方法的流程如图所示。

在这里插入图片描述
如果Response是SendAction类型,表示该Response需要发送给客户端,则查找对应的KafkaChannel,为其注册OP_WRITE事件,并将KafkaChannel.send字段指向待发送的Response对象。

同时还会将Response从responseQueue队列中移出,放入infightResponses中。如果关心OP_WRITE事件的取消时机,可以回顾KafkaChannel.send方法,即发送完一个完整的响应后,会取消此连接注册的OP_WRITE事件。

如果Response是NoOpAction类型,表示此连接暂无响应需要发送,则为KafkaChannel注册OP_READ,允许其继续读取请求。

如果Response是CloseConnectionAction类型,则关闭对应的连接。

RequestChannel

Processor线程与Handler线程之间传递数据是通过RequestChannel完成的。

在RequestChannel中包含了一个requestQueue队列和多个responseQueues队列,每个Processor线程对应一个responseQueue。

Processor线程将读取到的请求存入requestQueue中,Handler线程从requestQueue队列中取出请求进行处理;Handler线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程从其对应的responseQueue中取出响应并发送给客户端。RequestChannel的结构如图所示。

在这里插入图片描述
在RequestChannel中保存的是RequestChannel.Request和RequestChannel.Response两个类的对象。

RequestChannel.Request会对请求进行解析,形成requestld(请求类型 ID)、header(请求头)、body(请求体)等字段,供Handler线程使用,并提供了一些记录操作时间的字段供监控程序使用。

RequestChannel.Response需要注意其responseAction字段,有SendAction、NoOpAction、CloseConnectionAction三种 类 型。

当请求放入RequestChannel.requestQueue之后,会有多个Handler线程并发处理从其中取出请求处理,那如何保证客户端请求的顺序性呢?在Processorrun方法,其中有多处注册/取消OP_READ事件以及注册/取消OP_WRITE事件的操作,通过这些操作的组合可以保证每个连接上只有一个请求和一个对应的响应,从而实现请求的顺序性。

现在回头来总结一个请求数据从生产者发送到服务端的流转过程,如图所示。

在这里插入图片描述
KafkaProducer线程创建ProducerRecord后,会将其缓存进RecordAccumulator。

Sender线程从RecordAccumulator中获取缓存的消息,放入KafkaChannel.send字段中等待发送,同时放入InFlightRequests队列中等待响应。

之后,客户端会通过KSelector将请求发送出去。

在服务端,Processor线程使用KSelector读取请求并暂存到stageReceives队列中,KSelector.poll方法结束后,请求被移转移到completeReceives队列中。之后,Processor将请求进行一些解析操作后,放入RequestChannel.requestQueue队列。

Handler线程会从RequestChannel.requestQueue队列中取出请求进行处理,将处理之后生成的响应放入RequestChannel.responseQueue队列。

Processor线程从其对应的RequestChannel.responseQueue队列中取出响应并放入inflightResponses队列中缓存,当响应发送出去之后会将其从inflightResponse中删除。生产者读取响应的过程与服务端读取请求的过程类似,主要的区别是生产者需要对InFlightRequest中的请求进行确认。

Kafka网络层的设计原理和实现就介绍到这里了。在高性能的分布式框架中经常采用这种Reactor模式的设计,例如,HDFS RPC框架的服务端、ZooKeeper等。也有实现了Reactor模式的框架,例如,Netty和Mina。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1403486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Wimdows如何修改自己权限不够的文件

使用管理员身份运行cmd cd 文件目录 cd C:\Windows\System32\drivers\etc 打开文件 notepad 文件名 进行修改,保存就能成功!

基于springboot+vue的教师工作量管理系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

部署Filebeat+Kafka+ELK 集群

目录 Kafka 概述 为什么需要消息队列(MQ) 使用消息队列的好处 消息队列的两种模式 Kafka 定义 Kafka 简介 Kafka 的特性 Kafka 系统架构 在zookeeper集群的基础上部署 kafka 集群 部署zookeeper集群 部署kafka集群 下载安装包 安装 Kafka Ka…

Leetcode—22.括号生成【中等】

2023每日刷题&#xff08;七十九&#xff09; Leetcode—22.括号生成 算法思想 实现代码 class Solution { public:vector<string> generateParenthesis(int n) {vector<string> ans;int m n * 2;string path(m, 0);function<void(int, int)> dfs [&…

LabVIEW电火花线切割放电点位置

介绍了一个电火花线切割放电点位置分布评价系统&#xff0c;特别是在系统组成、硬件选择和LabVIEW软件应用方面。 本系统由两个主要部分组成&#xff1a;硬件和软件。硬件部分包括电流传感器、高速数据采集卡、开关电源、电阻和导线。软件部分则由LabVIEW编程环境构成&#xf…

数学建模学习笔记||层次分析法

评价类问题 解决评价类问题首先需要想到一下三个问题 我们评价的目标是什么我们为了达到这个目标有哪几种可行方案评价的准则或者说指标是什么 对于以上三个问题&#xff0c;我们可以根据题目中的背景材料&#xff0c;常识以及网上收集到的参考资料进行结合&#xff0c;从而筛…

java多线程(线程池)

1、创建一个可缓存线程池&#xff0c;如果线程池长度超过处理需要&#xff0c;可灵活回收空闲线程&#xff0c;若无可回收&#xff0c;则新建线程。 public static void main(String[] args) {ExecutorService cachedThreadPool Executors.newCachedThreadPool();for (int i …

基于Altium Designer 10设计双层印刷电路板的详细步骤

基于Altium Designer 10设计双层印刷电路板的详细步骤 一、基于Altium Designer 10设计双层印刷电路板总纲二、、基于Altium Designer 10设计双层印刷电路原理图三、制作集成库(包括原理图、PCB封装库、PCB 3D库)1、新建集成库2、新建原理图库3、绘制原理图库(1)、手工绘制…

02_Collection

文章目录 集合Java的集合类 Collectioniterator方法Collection的遍历 集合 在Java中&#xff0c;指的就是存放数据的容器&#xff0c;是一个载体&#xff0c;可以一次容纳多个对象。 解决Bug的两种方法&#xff1a; 打印 System.out.println();log.info(); debug 检查数据 …

CentOs7 安装Mysql(5.7和8.0版本)密码修改跳过 超详细教程

CSDN 成就一亿技术人&#xff01; 今天出一期Centos下安装Mysql&#xff08;详细教程&#xff09;包括数据库密码跳过修改 CSDN 成就一亿技术人&#xff01; 目录 1.获取安装包 2.安装程序 安装下载的rpm包 查看安装包 修改5.7版本&#xff08;重要&#xff09; 安装M…

【RHCE服务搭建实验】之DNS

目录 一、DNS简介二、安装DNS 一、DNS简介 域名系统&#xff08;DNS&#xff09;是一个分层的分布式数据库。它存储用于将Internet主机名映射到IP地址&#xff08;反之亦然&#xff09;的信息、邮件路由信息以及Internet应用程序使用的其他数据。 客户端通过调用解析器库在DNS…

股权众筹模式介绍(上)

众筹&#xff0c;是指个人或小企业通过互联网向大众筹集资金的一种项目融资方式&#xff0c;根据众筹的筹集目的和回报方式&#xff0c;可以分为债权众筹、回报众筹、股权众筹和捐赠众筹四大类。本文重点介绍我国股权众筹的几种模式。 一、众筹当事方 一个众筹项目&#xff0…

【ARMv8M Cortex-M33 系列 7 -- RA4M2 移植 RT-Thread 问题总结】

请阅读【嵌入式开发学习必备专栏 】 文章目录 问题小结栈未对齐 经过几天的调试&#xff0c;成功将rt-thead 移植到 RA4M2&#xff08;Cortex-M33 核&#xff09;上&#xff0c;thread 和 shell 命令已经都成功支持。 问题小结 在完成 rt-thread 代码 Makefile 编译系统搭建…

【LeetCode: 12. 整数转罗马数字 + 模拟 + 有序表】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

WampServer

开发笔记 推荐链接php无法保存SESSION问题部署SSL时候产生的问题 推荐链接 链接目录 php无法保存SESSION问题 php.ini文件和phpForApache.ini 文件 里面都有 对路径的控制&#xff0c;相关路径问题可能也需要进行修改&#xff0c;打开文件搜索wamp64或wamp 就可以看到了&…

PowerShell install 一键部署grafana

grafana 前言 Grafana 是一款开源的数据可视化和监控仪表盘工具。它提供了丰富的数据查询、可视化和报警功能,可用于实时监控、数据分析和故障排除等领域。 通过 Grafana,您可以连接到各种不同的数据源,包括时序数据库(如 Prometheus、InfluxDB)和关系型数据库(如 MySQ…

透明拼接屏生产商:如何选择合格供应商

随着透明拼接屏市场的不断扩大&#xff0c;越来越多的生产商加入其中。对于需求方而言&#xff0c;选择一家合格的生产商至关重要。本文将围绕如何选择透明拼接屏生产商展开讨论&#xff0c;同时结合对尼伽OLED显示屏的了解&#xff0c;为您推荐这一领域的优质供应商。 一、透明…

springboot102基于web的音乐网站

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的基于web的音乐网站 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 …

网络防御保护1

网络防御保护 第一章 网络安全概述 网络安全&#xff08;Cyber Security&#xff09;是指网络系统的硬件、软件及其系统中的数据受到保护&#xff0c;不因偶然的或者恶意的原因而遭受到破坏、更改、泄露&#xff0c;系统连续可靠正常地运行&#xff0c;网络服务不中断 随着数…

vue3前端开发,如何引入element-plus前端框架及配置参数

vue3前端开发,如何引入element-plus前端框架及配置参数&#xff01;这是一个简单的教程&#xff0c;帮助大家快速在自己的项目中引入element-plus框架。 主要是介绍的引入流程和参数的配置情况。 如图&#xff0c;这个就是elment-plus前端框架里面的一个主按钮展示。表示我们配…