Kafka(三)Producer第二篇

news2024/9/29 7:19:36

一,生产者架构


生产者客户端由两个线程协调运行,分别为主线程和Sender线程(发送线程)。
  • 主线程:KafkaProducer创建消息,通过拦截器、序列化器和分区器之后缓存到消息收集器RecordAccumulator中;
  • Sender线程:从RecordAccumulator中获取消息并发送到Kafka集群;
1,RecordAccumulator
  • RecordAccumulator用来缓存消息以便Sender 线程批量发送,进而减少网络传输的资源消耗;
  • 消息会被追加到RecordAccumulator的某个双端队列中, 每个partition都维护了一个双端队列;
  • 双端队列内容是Deque<ProducerBatch>,ProducerBatch包含一至多个ProducerRecord;
2,主线程写入消息到RecordAccumulator
消息在发送之前会缓存在java.io.ByteBuffer的内存区域。RecordAccumulator的内部有一个BufferPool,用来实现ByteBuffer的复用,以实现缓存的高效利用。 BufferPool只对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个大小由batch.size参数指定;
消息流入RecordAccumulator过程:
1,先寻找与partition对应的双端队列(如果没有则新建);
2,从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建);
3,判断ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则创建一个新的ProducerBatch;
4,新建ProducerBatch时,评估这条消息的大小是否超过batch.size参数的大小:
    a,如果不超过,那么就以 batch.size 的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;
    b,如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用
3,Sender线程读取RecordAccumulator并发送
1. Sender从RecordAccumulator中获取消息,会将原本<分区,Deque<ProducerBatch>>的形式转变成<Node,List<ProducerBatch>,其中Node表示Kafka集群的broker节点
2. Sender将消息进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node;
3,Sender线程发送Request之前,请求还会保存到InFlightRequests(保存的形式为 Map<NodeId,Deque<Request>>,缓存了已经发出去但还没有收到响应的请求)中;
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。负载最小是通过每个Node在InFlightRequests中 还未确认的请求决定的,未确认的请求越多则认为负载越大。
选择leastLoadedNode发送请求可以使它能够尽快发出。

二,元数据的更新


元数据是指Kafka集群的元数据,包括主题、分区、副本分布、哪些副本在AR、ISR等集合、集群中有哪些节点、控制器节点又是哪一个等等。
元数据的更新操作是由Sender线程发起的,对客户端的外部使用者不可见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1908976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java面试八股之MySQL如何使用explain优化SQL和索引

MySQL如何使用explain优化SQL和索引 在MySQL中,EXPLAIN是一个非常有用的工具,用于分析和优化SQL查询。它可以帮助你理解查询执行计划,包括如何使用索引、表的连接方式、是否使用了临时表或文件排序等。以下是一些使用EXPLAIN来优化SQL查询和…

VBA经典应用69例应用5:使用VBA拆分窗格

《VBA经典应用69例》(版权10178981),是我推出的第九套教程,教程是专门针对初级、中级学员在学习VBA过程中可能遇到的案例展开,这套教程案例众多,紧贴“实战”,并做“战术总结”,以便…

卷积神经网络可视化的探索

文章目录 训练LeNet模型下载FashionMNIST数据训练保存模型 卷积神经网络可视化加载模型一个测试图像不同层对图像处理的可视化第一个卷积层的处理第二个卷积层的处理 卷积神经网络是利用图像空间结构的一种深度学习网络架构,图像在经过卷积层、激活层、池化层、全连…

防火墙详解(USG6000V)

0、防火墙组网模式 防火墙能够工作在三种模式下分别是路由模式、透明模式、旁路检测模式、混合模式 0.1、路由模式 路由模式:防火墙全部以第三层对外连接,即接口具有IP 地址。一般都用在防火墙是边界的场景下 防火墙需要的部署/配置: 接…

自动连点鼠标器是什么?好用的鼠标连点器分享

你听说过自动鼠标连点器吗?自动连点鼠标器是一种软件工具,用于自动模拟鼠标点击操作。这种工具可以设置为在特定位置和时间间隔内自动点击鼠标,减轻手动点击的负担。自动连点器通常可以在很多生活场景中帮助我们节省时间成本,今天…

OS Copilot测评

1.按照第一步管理重置密码时报错了,搞不懂为啥?本来应该跳转到给的那个实例的,我的没跳过去 2.下一步重置密码的很丝滑没问题 3安全组新增入库22没问题 很方便清晰 4.AccessKey 还能进行预警提示 5.远程连接,网速还是很快,一点没卡,下载很棒 6.替换的时候我没有替换<>括…

Python | Leetcode Python题解之第224题基本计算器

题目&#xff1a; 题解&#xff1a; class Solution:def calculate(self, s: str) -> int:ops [1]sign 1ret 0n len(s)i 0while i < n:if s[i] :i 1elif s[i] :sign ops[-1]i 1elif s[i] -:sign -ops[-1]i 1elif s[i] (:ops.append(sign)i 1elif s[i] …

嵌入式Linux系统编程 — 7.4 fork、vfork函数创建子进程

目录 1 父进程与子进程概念 2 fork创建子进程 3 系统调用 vfork()函数 4 vfork与 fork函数如何选择 1 父进程与子进程概念 进程与子进程是操作系统中的一个基本概念&#xff0c;用于描述进程之间的层级关系。下面是对这一概念的简要说明&#xff1a; 父进程&#xff1a;在…

线程池的合理使用

线程池的合理使用 一、简介二、为什么要使用线程池三、核心参数四、如何合理配置线程参数1.1 corePoolSize && maximumPoolSize1.2 Handler 拒绝策略1.2.1AbortPolicy&#xff1a;优势&#xff1a;劣势&#xff1a; 1.2.2 DiscardPolicy&#xff1a;优势&#xff1a;劣…

3Python的Pandas:数据选取

1.数据选取操作 1.1. 选取单列 df[Q1]df[Q2]1.2. 选取多列 df[[team,Q1]]df.loc[:,[team,Q1]]1.3.选择行 使用指定索引选择 df[df.indexAck]选择前n行 df[0:3]df.iloc[:10,:]1.4. 前n行&#xff0c;每隔m选择一个 df[0:10:3]1.5. 条件选择 df[df.Q1>90]df[(df.teamC…

linxu驱动入门基础课一(GPIO控制LED灯)基于RK3568

虽然GPIO控制LED 是最简单的linux驱动&#xff0c;但是是初学者入门必须跨过的门槛&#xff0c;里面很多基础知识点&#xff0c;有GPIO的控制原理&#xff0c;字符设备驱动&#xff0c;设备树&#xff0c;gpio和pinctrl子系统&#xff0c;内核模块原理等等&#xff0c;这些知识…

APP明暗主题设置

1.preference.xml 增加一个暗色主题 SwitchPreference 2.每个 Activity 的 setContentView 前面增加 setTheme SharedPreferences sharedPreferences PreferenceManager.getDefaultSharedPreferences(this); if (sharedPreferences.getBoolean("switch_dark_theme"…

Windows下编译OpenSSL静态库

目录 1. 版本与下载地址 2. 下载与安装VS2015 3. 下载与安装Perl 4. 测试ActivePerl是否安装正确 5. 下载OpenSSL 6. 编译32位OpenSSL静态库 6.1 解压openssl-1.0.2l.tar.gz 6.2 打开VS2015 x86本机工具命令提示符 6.3 输入命令进入到openssl的目录中 6.4 执行配置命…

加密与安全_密钥体系的三个核心目标之不可否认性解决方案

文章目录 Pre概述不可否认性数字签名&#xff08;Digital Signature&#xff09;证书是什么证书使用流程 PKICA证书层级多级证书证书链是如何完成认证的&#xff1f; 其他疑问1. Alice能直接获取Bob的公钥&#xff0c;是否还需要证书&#xff1f;2. 为什么即使能直接获取公钥也…

世界人工智能大会 | 江行智能大模型解决方案入选“AI赋能新型工业化创新应用优秀案例”

日前&#xff0c;2024世界人工智能大会暨人工智能全球治理高级别会议在上海启幕。本次大会主题为“以共商促共享&#xff0c;以善治促善智”&#xff0c;汇聚了上千位全球科技、产业界领军人物&#xff0c;共同探讨大模型、数据、新型工业化等人工智能深度发展时代下的热点话题…

CLIP编码器调用时刚开始正常,然后输出全部变为NaN

碰到了这个问题&#xff1a;输入是正常的&#xff0c;输出全是NaN 网上办法不多&#xff0c;找了半天终于看到问题所在&#xff0c;但是没有说在哪里改的&#xff0c;故记录一下。 改一下模型精度就正常了&#xff0c;默认的是fp16&#xff0c;改为fp32即可 具体步骤如下&…

渲染引擎之ECS架构介绍

1.什么是ECS&#xff1f; 我们先简单地介绍一下什么是ECS&#xff1a; E -- Entity 实体&#xff0c;本质上是存放组件的容器C -- Component 组件&#xff0c;引擎所需的所有数据结构S -- System 系统&#xff0c;根据组件数据处理逻辑状态的管理器 ECS全称Entity-Component-…

免费压缩pdf文件大小软件收费吗?pdf如何压缩文件大小?12款压缩应用推荐!

在数字化时代&#xff0c;PDF文件因其跨平台、格式统一的特点而广受欢迎。然而&#xff0c;随着文件内容的增加&#xff0c;PDF文件的大小也逐渐增大&#xff0c;给存储和传输带来了诸多不便。因此&#xff0c;寻找一款合适的PDF压缩软件成为了许多用户的需求。本文将详细介绍1…

阿里开源语音理解和语音生成大模型FunAudioLLM

近年来&#xff0c;人工智能&#xff08;AI&#xff09;的进步极大地改变了人类与机器的互动方式&#xff0c;例如GPT-4o和Gemin-1.5等。这种转变在语音处理领域尤为明显&#xff0c;其中高精度的语音识别、情绪识别和语音生成等能力为更直观、更类人的交互铺平了道路。阿里开源…