kafka生产端之架构及工作原理

news2025/2/11 0:22:26

文章目录

  • 整体架构
  • 元数据更新

整体架构

消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。

在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据更新

上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
在这里插入图片描述

我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。

在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在 Windows 上使用 ZIP 包安装 MySQL 的详细步骤

以下是使用官方 ZIP 包在 Windows 上安装 MySQL 的详细步骤&#xff0c;确保能通过 mysql -uroot -p 成功连接。 步骤 1&#xff1a;下载 MySQL ZIP 包 访问 MySQL 官方下载页面&#xff1a; https://dev.mysql.com/downloads/mysql/选择 Windows (x86, 64-bit), ZIP Archive&…

记录 | WPF创建和基本的页面布局

目录 前言一、创建新项目注意注意点1注意点2 解决方案名称和项目名称 二、布局2.1 Grid2.1.1 RowDefinitions 行分割2.1.2 Row & Column 行列定位区分 2.1.3 ColumnDefinitions 列分割 2.2 StackPanel2.2.1 Orientation 修改方向 三、模板水平布局【Grid中套StackPanel】中…

SpringCloud - Nacos注册/配置中心

前言 该博客为Nacos学习笔记&#xff0c;主要目的是为了帮助后期快速复习使用 学习视频&#xff1a;7小快速通关SpringCloud 辅助文档&#xff1a;SpringCloud快速通关 源码地址&#xff1a;cloud-demo 一、简介 Nacos官网&#xff1a;https://nacos.io/docs/next/quickstar…

C++ 继承(1)

1.继承概念 我们平时有时候在写多个有内容重复的类的时候会很麻烦 比如我要写Student Teacher Staff 这三个类 里面都要包含 sex name age成员变量 唯一不同的可能有一个成员变量 但是这三个成员变量我要写三遍 太麻烦了 有没有好的方式呢&#xff1f; 有的 就是继承…

【C语言】传值调用与传址调用详解

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C语言 文章目录 &#x1f4af;前言&#x1f4af;传值调用1. 什么是传值调用&#xff1f;2. 示例代码&#xff1a;传值调用失败的情况执行结果&#xff1a; 3. 为什么传值调用无法修改外部变量&#xff1f; &#x1f4…

蓝桥杯C语言组:图论问题

蓝桥杯C语言组图论问题研究 摘要 图论是计算机科学中的一个重要分支&#xff0c;在蓝桥杯C语言组竞赛中&#xff0c;图论问题频繁出现&#xff0c;对参赛选手的算法设计和编程能力提出了较高要求。本文系统地介绍了图论的基本概念、常见算法及其在蓝桥杯C语言组中的应用&#…

windows通过网络向Ubuntu发送文件/目录

由于最近要使用树莓派进行一些代码练习&#xff0c;但是好多东西都在windows里或虚拟机上&#xff0c;就想将文件传输到树莓派上&#xff0c;但试了发现u盘不能简单传送&#xff0c;就在网络上找到了通过windows 的scp命令传送 前提是树莓派先开启ssh服务&#xff0c;且Window…

Unity抖音云启动测试:如何用cmd命令行启动exe

相关资料&#xff1a;弹幕云启动&#xff08;原“玩法云启动能力”&#xff09;_直播小玩法_抖音开放平台 1&#xff0c;操作方法 在做云启动的时候&#xff0c;接完发现需要命令行模拟云环境测试启动&#xff0c;所以研究了下。 首先进入cmd命令&#xff0c;CD进入对应包的文件…

ZU47DR 100G光纤 高性能板卡

简介 2347DR是一款最大可提供8路ADC接收和8路DAC发射通道的高性能板卡。板卡选用高性价比的Xilinx的Zynq UltraScale RFSoC系列中XCZU47DR-FFVE1156作为处理芯片&#xff08;管脚可以兼容XCZU48DR-FFVE1156&#xff0c;主要差别在有无FEC&#xff08;信道纠错编解码&#xff0…

【算法】动态规划专题⑥ —— 完全背包问题 python

目录 前置知识进入正题模板 前置知识 【算法】动态规划专题⑤ —— 0-1背包问题 滚动数组优化 完全背包问题是动态规划中的一种经典问题&#xff0c;它与0-1背包问题相似&#xff0c;但有一个关键的区别&#xff1a;在完全背包问题中&#xff0c;每种物品都有无限的数量可用。…

C#中深度解析BinaryFormatter序列化生成的二进制文件

C#中深度解析BinaryFormatter序列化生成的二进制文件 BinaryFormatter序列化时,对象必须有 可序列化特性[Serializable] 一.新建窗体测试程序BinaryDeepAnalysisDemo,将默认的Form1重命名为FormBinaryDeepAnalysis 二.新建测试类Test Test.cs源程序如下: using System; us…

51单片机之引脚图(详解)

8051单片机引脚分类与功能笔记 1. 电源引脚 VCC&#xff08;第40脚&#xff09;&#xff1a;接入5V电源&#xff0c;为单片机提供工作电压。GND&#xff08;第20脚&#xff09;&#xff1a;接地端&#xff0c;确保电路的电位参考点。 2.时钟引脚 XTAL1&#xff08;第19脚&a…

jupyterLab插件开发

jupyter lab安装、配置&#xff1a; jupyter lab安装、配置教程_容器里装jupyterlab-CSDN博客 『Linux笔记』服务器搭建神器JupyterLab_linux_布衣小张-腾讯云开发者社区 Jupyter Lab | 安装、配置、插件推荐、多用户使用教程-腾讯云开发者社区-腾讯云 jupyterLab插件开发教…

配置#include “nlohmann/json.hpp“,用于处理json文件

#include “nlohmann/json.hpp” // 需要安装 nlohmann/json.hpp 头文件 using json = nlohmann::json; 下载链接:https://github.com/nlohmann/json/tree/develop 1.下载并解压:首先,需要从nlohmann/json的GitHub仓库下载源代码,并解压得到的文件。 地址: nlohmann/json…

MATLAB | 基于Theil-Sen斜率和Mann-Kendall检验的栅格数据趋势分析

最近看到一些博主分享关于 SenMK 检验的代码&#xff0c;对于新手来说可能有点复杂。我们编写了一段 MATLAB 代码&#xff0c;能够一次性解决这些问题&#xff0c;简化操作流程。我们还准备了几个关于趋势检验的空间分布图&#xff0c;供大家参考。 一、Sens Slope和Mann-Kenda…

Windows 系统下使用 Ollama 离线部署 DeepSeek - R1 模型指南

引言 随着人工智能技术的飞速发展&#xff0c;各类大语言模型层出不穷。DeepSeek - R1 凭借其出色的语言理解和生成能力&#xff0c;受到了广泛关注。而 Ollama 作为一款便捷的模型管理和部署工具&#xff0c;能够帮助我们轻松地在本地环境中部署和使用模型。本文将详细介绍如…

Docker、Ollama、Dify 及 DeepSeek 安装配置与搭建企业级本地私有化知识库实践

在现代企业中&#xff0c;管理和快速访问知识库是提升工作效率、促进创新的关键。为了满足这些需求&#xff0c;企业越来越倾向于构建本地私有化的知识库系统&#xff0c;这样可以更好地保护企业数据的安全性和隐私性。本文将介绍如何利用 **Docker**、**Ollama**、**Dify** 和…

【漫话机器学习系列】087.常见的神经网络最优化算法(Common Optimizers Of Neural Nets)

常见的神经网络优化算法 1. 引言 在深度学习中&#xff0c;优化算法&#xff08;Optimizers&#xff09;用于更新神经网络的权重&#xff0c;以最小化损失函数&#xff08;Loss Function&#xff09;。一个高效的优化算法可以加速训练过程&#xff0c;并提高模型的性能和稳定…

【JVM详解四】执行引擎

一、概述 Java程序运行时&#xff0c;JVM会加载.class字节码文件&#xff0c;但是字节码并不能直接运行在操作系统之上&#xff0c;而JVM中的执行引擎就是负责将字节码转化为对应平台的机器码让CPU运行的组件。 执行引擎是JVM核心的组成部分之一。可以把JVM架构分成三部分&am…

route 与 router 之间的差别

简述&#xff1a; router&#xff1a;主要用于处理一些动作&#xff0c; route&#xff1a;主要获得或处理一些数据&#xff0c;比如地址、参数等 例&#xff1a; videoInfo1.vue&#xff1a; <template><div class"video-info"><h3>二级组件…