Kafka之消费者客户端

news2024/11/23 21:55:25

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2224071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【力扣】GO解决子序列相关问题

文章目录 一、引言二、动态规划方法论深度提炼子序列问题的通用解法模式 三、通用方法论应用示例:最长递增子序列(LeetCode题目300)Go 语言代码实现 四、最长连续递增序列(LeetCode题目674)Go 语言代码实现 五、最长重…

ffmpeg视频滤镜:定向模糊-dblur

滤镜简述 dblur 官网链接 > https://ffmpeg.org/ffmpeg-filters.html#dblur 有一个模糊滤镜&#xff0c;我试了一下&#xff0c;没有感觉到它的特殊之处, 这里简单介绍一下。 滤镜使用 滤镜的参数 angle <float> ..FV.....T. set angle (from 0 t…

找不到包的老版本???scikit-learn,numpy,scipy等等!!

废话不多说 直接上链接了&#xff1a; https://pypi.tuna.tsinghua.edu.cn/simple/https://pypi.tuna.tsinghua.edu.cn/simple/https://pypi.tuna.tsinghua.edu.cn/simple/xxx/ 后面的这个xxx就是包的名字 大家需要什么包的版本&#xff0c;直接输进去就可以啦 举个栗子&#…

零基础Java第十期:类和对象(一)

目录 一、拜访对象村 1.1. 什么是面向对象 1.2. 面向对象与面向过程 二、类定义和使用 2.1. 类的定义格式 2.2. 类的定义练习 三、类的实例化 3.1. 什么是实例化 3.2. 类和对象的说明 四、this引用 4.1. 什么是this引用 4.2. this引用的特性 一、拜访对象村 在…

<项目代码>YOLOv8路面病害识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

STMicroelectronics意法半导体车规芯片系列--亿配芯城(ICgoodFind)

在汽车电子领域&#xff0c;意法半导体的车规级芯片系列一直备受瞩目。亿配芯城作为电子元器件领域的可靠供应商&#xff0c;为大家介绍意法半导体车规级芯片系列的卓越之处。 意法半导体在车规级芯片领域拥有深厚的技术积累和丰富的经验。 其车规级芯片涵盖了多个关键领域&am…

8.three.js相机详解

8.three.js相机详解 1、 认识相机 在Threejs中相机的表示是THREE.Camera&#xff0c;它是相机的抽象基类&#xff0c;其子类有两种相机&#xff0c;分别是正投影相机THREE.OrthographicCamera和透视投影相机THREE.PerspectiveCamera&#xff1a; 正投影和透视投影的区别是&am…

【Java】常用方法合集

以 DemoVo 为实体 import lombok.Data; import com.alibaba.excel.annotation.ExcelProperty; import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;Data ExcelIgnoreUnannotated public class ExportPromoteUnitResult {private String id;ExcelProperty(value &qu…

贪心算法记录 - 下

135. 分发糖果 困难 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0c…

一文搞懂指令周期,机器周期和时钟周期

如图&#xff1a; 指令周期 > 机器周期 > 时钟周期 指令周期&#xff1a;一个指令&#xff0c;从取值到执行的全部周期。一个指令执行过程包括取值&#xff0c;译码和执行阶段。 机器周期&#xff1a;,取指、间址、执行和中断等 时钟周期&#xff1a;时钟频率的倒数&am…

什么样的JSON编辑器才好用

简介 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也便于机器解析和生成。随着互联网和应用程序的快速发展&#xff0c;JSON已经成为数据传输和存储的主要格式之一。在处理和编辑JSON数据…

python查询并安装项目所依赖的所有包

引言 如果需要进行代码的移植&#xff0c;肯定少不了在另一台pc或者服务器上进行环境的搭建&#xff0c;那么首先是要知道在已有的工程的代码中用到了哪些包&#xff0c;此时&#xff0c;如果是用人工去一个一个的代码文件中去查看调用了哪些包&#xff0c;这个工作甚是繁琐。…

推荐一款三维数值建模软件:3DEC

3DEC是一种用于土壤、岩石、地下水、结构支撑和砖石等高级岩土工程分析的三维数值建模软件。该软件的数值公式基于离散元法(DEM)进行不连续建模。UDEC是它的二维版本。不连续材料是一组离散的块。不连续性充当着块之间的边界条件。允许块的大位移和旋转。常见的结构可以直接从地…

HTML5教程(一)- 网页与开发工具

1. 什么是网页 网页 基于浏览器阅读的应用程序&#xff0c;是数据&#xff08;文本、图像、视频、声音、链接等&#xff09;展示的载体常见的是以 .html 或 .htm 结尾的文件 网站 使用 HTML 等制作的用于展示特定内容相关的网页集合。 2. 网页的组成 浏览器 代替用户向服务…

Cout输出应用举例

Cout输出应用 在main.cpp里输入程序如下&#xff1a; #include <iostream> //使能cin(),cout(); #include <stdlib.h> //使能exit(); #include <sstream> #include <iomanip> //使能setbase(),setfill(),setw(),setprecision(),setiosflags()和res…

根据用户选择的行和列数据构造数据结构(跨行跨列)

方案一 这段代码的功能是根据用户选择的行和列数据&#xff0c;生成一个适合复制粘贴的字符串表格。代码会先按列的 id 从小到大排序&#xff0c;再根据行列的选择关系将数据按顺序填入表格&#xff0c;每行之间使用换行符分隔&#xff0c;每列之间使用制表符分隔。如果某一行…

【汇编语言】第一个程序(一)—— 一个源程序从写出到执行的过程

文章目录 前言1. 第一步&#xff1a;编写汇编源程序2. 第二步&#xff1a;对源程序进行编译连接3. 第三步&#xff1a;执行可执行文件中的程序结语 前言 &#x1f4cc; 汇编语言是很多相关课程&#xff08;如数据结构、操作系统、微机原理&#xff09;的重要基础。但仅仅从课程…

多元线性回归【正规方程/sklearn】

多元线性回归【正规方程/sklearn】 1. 基本概念1.1 线性回归1.2 一元简单线性回归1.3 最优解1.4 多元线性回归 2. 正规方程求最优解2.1 线性回归的损失函数&#xff08;最小二乘法&#xff09;2.2 推导正规方程2.3 正规方程练习2.4 使用sklearn计算多元线性方程2.5 凸函数 3. 线…

比例数据可视化(Python实现板块层级图绘制)——Instacart Market Basket Analysis

【实验名称】 实验一&#xff1a;绘制板块层级图 【实验目的】 1. 掌握数据文件读取 2. 掌握数据处理的方法 3. 实现板块层级图的绘制 【数据介绍】Instacart Market Basket Analysis 1. 数据说明 数据共有300 0000orders&#xff0c; 20 0000users&#xff0c; …

electron 打包

安装及配置 安装electron包以及electron-builder打包工具 # 安装 electron cnpm install --save-dev electron # 安装打包工具 cnpm install electron-builder -D 参考的package.json文件 其中description和author为必填项目 {"name": "appfile",&qu…