Kafka之生产者

news2024/11/25 16:39:30

本章内容将整理下Kafka体系结构中的生产者相关的一些知识。

1. 生产者客户端

  • 生产者客户端在Kafka的发展历程当中一共有两个重大版本:

    • 一个是基于Scala语言开发的版本,称为Old Producer或Scala版的生产者客户端。
    • 一个是Kafka0.9.x版本之后以Java语言开发的版本,称为New Producer或Java版的生产者客户端。
  • 生产者客户端的3个必要参数配置:

    • bootstrap.servers:

      用来指定生产者客户端连接Kafka集群中的broker清单吗,格式为:host1:port1,host2:port2, …。

      没有必要将Kafka集群中的所有broker都列入到清单中,因为生产者客户端可以通过一个broker查询到其他broker。

      虽然清单中可以只配置一个broker,但是为了以防该broker宕机后导致生产者客户端无法正常发送消息,所以建议至少配置2个或2个以上的broker。

    • key.serializer 和 value.serializer:

      broker只接收字节数组(Byte[])形式的消息。所以,发送消息的时候需要将消息的key和value都转换为字节数组。而key.serializer 和 value.serializer就是分别指定了key和value的序列化器。

  • 消息发送

    • 消息发送有三种模式:

      • 发完即忘(fire-and-forget)

        只管往Kafka发送消息但是不管消息是否正确到达。

      • 同步(sync)

      • 异步(async)

        这里觉得值得说的是异步回调函数的调用循序问题,如果两个消息R1、R2都是发给了同一个分区,且R1先于R2,那么生产者客户端就能保证R1对应的回调函数优先于R2对应的回调函数被调用,也就是说回调函数的调用可以保证分区有序性。

    • 消息通过send()方法发送到broker的过程中,还需要经过拦截器、序列化器、分区器的操作:

      • 拦截器

        拦截器按着生产者客户端和消费者客户端分为:生产者拦截器和消费者拦截器。

        生产者拦截器可以使得我们有能力在消息真正发送出去之前做一些额外处理,对于消息来说生产者拦截器不是必需的

      • 序列化器

        上文中讲道key.serializer和value.serializer的时候,有提到序列化器。这里提到是生产者客户端的序列化器,它将消息转换为字节数组以便后续通过网络传输给Kafka。

        与生产者客户端序列化器对应的是消费者客户端的反序列化器,即将字节数组转换为消息对象。

        生产者的序列化器与对应的消费者的反序列化器必须保持一一对应加粗样式,否则无法正确的解析消息。

        对于消息来说序列化器是必需的

      • 分区器

        分区器的作用是给消息分配分区

        默认分区器在key为null的情况下,会轮询的方式将消息发送给主题下各个可用的分区。如果key不为null,则会根据key来计算出该消息将要发往的分区的分区号,该分区可以是主题下的任意分区。

        分区器不是每次都是必需的。需要根据实际的情况来判断,如果已指定了消息的分区号(即消息ProducerRecord中指定了partition字段,partition代表的就是所要发往的分区的分区号),那么就不需要分区器来为消息分配分区了。

2. 原理分析

首先我们先来看一张生产者客户端的整体架构图:
在这里插入图片描述
从整体架构图中我们可以得知以下一些信息:

  1. 生产者客户端是通过两个线程来协调工作的:主线程和Sender线程。

    • 主线程:用来创建消息,并使消息经过拦截器、序列化器、分区器的处理后,存储到消息累加器(RecordAccumulator,也称为消息收集器)中。
    • Sender线程:用来从消息累加器中获取消息并将消息发送给Kafka。
  2. 消息累加器的空间不是无限的,我们可以通过生产者客户端的配置参数buffer.memory来进行配置,默认情况下是32M。

  3. 消息写入消息累加器的速度如果远远超过消息发送到Kafka的速度,就会造成消息积压,从而导致消息累加器的空间被占满,即出现生产者客户端空间不足的情况。此时KafkaProducer的send()方法要么阻塞,要么抛出异常,这取决于生产者客户端的一个配置参数max.block.ms(最大阻塞超时时间),默认为60秒,即先阻塞,当阻塞时间超过了max.block.ms则抛异常。

  4. 双端队列(Deque):在消息累加器中有1个或多个双端队列,每个双端队列对应一个分区,双端队列中有若干ProducerBatch,ProducerBatch中存放的是若干个ProducerRecord。

    我们可以将ProducerRecord理解为一条消息,而ProducerBatch则是一个消息批次。

    消息被写入消息累加器的时候,会根据消息所属的分区找到对应的双端队列,然后取出该双端队列队尾的ProducerBatch并将消息写入到这里。

  5. Sender线程是从双端队列的对头开始获取消息的。

  6. BufferPool:消息在发送给Kafka之前需要一块内存空间来进行存储,我们姑且叫消息内存。生产者客户端是通过java.io.ByteBuffer类来对消息内存进行创建和释放的。

    消息内存频繁的创建和释放也是比较消耗新能的,为了减少这部分开销,消息累加器中提供BufferPool,在BufferPool中缓存了指定大小的ByteBuffer,这些ByteBuffer是可以进行复用(免去了再创建和释放同等大小的ByteBuffer开销)。

    注意这里提到的是“指定大小”,例如:只允许16KB大小的ByteBuffer能缓存在BufferPool中,那么大小不是16KB的ByteBuffer就不允许进入到BufferPool中,也就无法进行的复用了。

    我们可以通过生产者客户端的参数batch.size(默认16KB)来设置这个“指定大小”。

  7. 生产者客户端的参数batch.size对ProducerBatch的大小也有很大的影响。当消息要写入消息累加器的时候,首先根据消息的分区获取对应的双端队列(如果没有则新建),然后获取处于双端队列队尾的ProducerBatch(如果没有则新建),尝试将消息写入到该ProducerBatch中,如果能写入则写入(即ProducerBatch空间还够),如果不能,则需要新建一个ProducerBatch。当我们要新建ProducerBatch的时候,首先比较下消息的大小是否超过batch.size,如果没超过,则基于batch.size大小创建一个新的ProducerBatch(此内存空间将会被复用,即被BufferPool缓存了);如果超过了,则基于消息的大小去创建一个新的ProducerBatch(此内存空间将不会被复用,即没有缓存到BufferPool中)。

  8. 基于以上出现的一些概念,分区、双端队列、ProducerBatch和ProducrRecord,我整理了一下它们之间的关系:
    在这里插入图片描述

  9. 消息写入到消息累加器的过程如下:
    在这里插入图片描述

上一篇:Kafka之基本概念
下一篇:Kafka之消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2207686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《深度学习》OpenCV 光流估计 原理、案例解析

目录 一、光流估计 1、什么是光流估计 2、原理 3、光流估计算法 1)基于局部方法 2)和基于全局方法 4、光流估计的前提 1)亮度恒定 2)小运动 3)空间一致 二、案例实现 1、读取视频 2、特征检测 3、处理每…

Python | Leetcode Python题解之第474题一和零

题目: 题解: class Solution:def findMaxForm(self, strs: List[str], m: int, n: int) -> int:count10 []for s in strs:count10.append([0,0])for c in s:if c 0: count10[-1][0]1else: count10[-1][1]1dp [[0]*(n1) for _ in range(m1)]for i …

十一、数据库的设计规范

文章目录 1. 为什么需要数据库设计2. 范式2.1 范式介绍2.2 范式都包括哪些2.3 键和相关属性的概念2.4 第一范式(1st NF)2.5 第二范式(2nd NF)2.6 第三范式(3rd NF)2.7 小结3. 反范式化3.1 概述3.2 应用举例3.3 反范式的新问题3.4 反范式的使用场景3.4.1 增加冗余字段的建议3.…

windows系统更新升级node指定版本【避坑篇!!!亲测有效】(附带各版本node下载链接)一定看到最后!不用删旧版!

Node.js 是一个开源、跨平台的 JavaScript 运行时环境,广泛应用于服务器端和网络应用的开发。随着 Node.js 版本的不断更新,我们可能需要升级到特定版本以满足项目需求或修复安全漏洞。又或者是学习开发另外一个新项目,新项目对Node版本要求更…

上交大全华班复现o1旅程式学习下的深思考

因篇幅限制不重复原研究内容,建议访问原技术报告链接精读,这里主要向大伙表示我对上交大本此研究所涉三方面的价值认同及更进一步的延展思考。 价值认同: ① 深刻洞察:系统性研究并阐释旅程式学习; ② 行业促进&…

SQL Injection | MySQL 数据库概述

关注这个漏洞的其他相关笔记:SQL 注入漏洞 - 学习手册-CSDN博客 0x01:MySQL 数据库简介 MySQL 是一个流行的关系型数据库管理系统(RDBMS),它基于 SQL (Structured Query Language)进行操作。My…

Django项目的创建及说明(详细图解版)

Django项目的创建及说明 1、安装Django2、创建项目2.1、利用终端创建项目2.2、利用Pycharm企业版创建项目 3、默认文件介绍 1、安装Django 在终端输入下述命令行。 pip install django安装成功后执行如下命令查看Django是否安装好,若正确显示出Django版本号则安装…

[实时计算flink]应用场景

本文将以部门场景和技术领域场景为例,为您介绍实时计算Flink版的大数据是实时化场景。 背景信息 作为流式计算引擎,Flink可以广泛应用于实时数据处理领域,例如ECS在线服务日志,IoT场景下传感器数据等。同时Flink还能订阅云上数据…

进程的那些事--进程间的通信(重点说明管道和共享内存)

目录 前言 一、初始进程间通信 二、管道 1.匿名管道 2.命名管道 三、共享内存 四、消息队列(了解) 五、信号量(了解) 前言 提示:这里可以添加本文要记录的大概内容: 进程是一个能够独立运行&#…

什么情况下数据库和缓存不一致?

首先,在非并发的场景中,出现不一致的问题大家都能比较容易的理解,因为缓存的操作和数据库的操作是存在一定的时间差的。而生两个操作是没办法保证原子些的,也就是说,是有可能一个操作功,一个操作失败的。所…

C语言-数据结构 折半查找

在折半查找中,刚开始学可能会在下标处产生困惑,例如奇数个长度的数组怎么处理,偶数个长度的数组怎么处理,不需要修改代码吗?并且下标我从1开始算和0开始算影响代码吗?其实都可以用一样的代码,产…

【含文档】基于Springboot+Vue的失物招领系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…

如何替换OCP节点(一):使用oat | OceanBase应用实践

前言: OceanBase Cloud Platform(简称OCP),是 OceanBase数据库的专属企业级数据库管理平台。 在实际生产环境中,OCP的安装通常是第一步,先搭建OCP平台,进而依赖OCP来创建、管理和监控我们的生…

docker升级mysql

一、首选备份原数据库所有数据 二、在Docker中查看正在运行的MySQL容器名称,可以使用以下命令: docker ps --filter "namemysql" 三、查看当前docker中正在运行mysql的版本 docker exec -it qgz-mysql mysql -V 可以看到当前运行的版本是8.…

数据传输——差错控制

一、检错纠错 1、通信链路不是完全理想的,在传输的过程中可能会产生比特差错。 2、误码率:传输错误的比特占所传输比特总数的比率。 3、检错:能自动发现差错。 4、纠错:不仅能发现差错而且能自动纠正差错。 5、码字(codeword…

Selenium打开外部应用程序的弹窗处理

问题 selenium自动化操作页面跳转到外部应用程序进行下载等操作,各种窗口处理方式无法解决 原因 该窗口属于浏览器窗口,与访问页面无关(已经脱离页面操作层面) 解决 selenium启动浏览器时,对浏览器进行相关窗口设…

Elasticsearch的安装与配置

注意:elasticsearch 禁止安装在/root路径下! 1、创建用户组 groupadd elastic 2、创建用户 useradd es -d /home/es -g elastic echo es | passwd es --stdin 3、给新创建的用户进行授权 chown -R es:elastic /home/es chmod -R 775 /home/es 4…

sklearn机器学习实战——支持向量机四种核函数分类任务全过程(附完整代码和结果图)

sklearn机器学习实战——支持向量机四种核函数分类任务全过程(附完整代码和结果图) 关于作者 作者:小白熊 作者简介:精通python、matlab、c#语言,擅长机器学习,深度学习,机器视觉,目…

Nginx反向代理配置与负载均衡配置

简介:整理自黑马程序员苍穹外卖的第11节 nginx是什么? nginx的好处 nginx反向代理配置方式 nginx负载均衡的配置方式 nginx负责均衡策略

等保2.0测评 — WebSphere 中间件

查看版本信息: 登录websphere管理平台首页就能看到版本信息 可以进入\usr\IBM\WebSphere\AppServer\bin 下执行./versionInfo.sh查看版本 一、身份鉴别 a)应对登录的用户进行身份标识和鉴别,身份标识具有唯一性,身份鉴别信息具有…