最全Kafka知识宝典之消费端深度剖析

news2025/1/11 19:58:39

一、Kafka消费者基本特性

消费者与消费者组的关系

消费者用一个消费者组名标记自己

一个发布在Topic上消息被分发给此消费者组中的一个消费者

  • 假如所有的消费者都在一个组中,那么这就变成了队列模型,即这些消费者只有一个消费者会收到消息
  • 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。每个消费者都会收到消息

上述特性总结为:Kafka的消息,只允许同一个消费者组里一个消费者消费。但是不同的消费者组之间是隔离的,互不影响的

消费者组是什么?

它是一个组,所以内部可以有多个消费者,这些消费者共用一个ID(叫做Group ID),一个组内的所有消费者共同协作,完成对订阅的topic的所有分区进行消费,其中一个topic中的一个分区只能由一个消费者消费

消费者组的特性

  • 一个消费者组可以有多个消费者。
  • Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
  • 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费,消费者组之间不影响。

消费者分配

Kafka是如何保证一条消息在同一个组内只会被一个消费者消费的?

Kafka是基于分区来分配给组内的消费者的,也就是说,基于分区到消费者有一个映射,这个分区内的消息都会被这个消费者收到,而其他消费者没有映射关系就不会被收到了。

这是组里只有一个消费者的情况,那么所有分区的消息都会与这个消费者建立映射关系

这是组里有多个消费者的情况,但是消费者数量会小于分区数量,那么一个消费者会接收来自多个分区的消息

这是组里有多个消费者的情况,但是消费者数量大于分区数量,那么会有消费空闲,收不到分区的消息

所以最好消费者数量小于等于分区的数量,不然会导致有些消费者永远收不到消息

分区消息分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费,Kafka提供了3种消费者分区分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor

RangeAssignor

对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。可以理解为平均分

计算规则 

比如上述7个分区,3个comsumer,则7/3=2,余1,这个表明如果3个消费线程均分7个分区还会多出1个分区,那么这个多出的额外分区就会给前面的消费线程处理,所以它会把第一个分区先给到consumer-1消费线程消费

配置方式

prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");

RoundRobinAssignor

RoundRobinAssignor 采用轮询的方式分配分区。如果consuemrs订阅Topics都是相同的,那么partitions将会被均匀分配给每个consumer,最理想的状态是partitions数是consumers数的整数倍,这样每个consumer都有相同数量的partitions数。

计算规则 

类似于斗地主发牌,第一个分区给了第一个消费者,第二个分区就给第二个消费者,一次进行下去

配置方式

prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

StickyAssignor

StickyAssignor 是 Kafka 2.4.0 版本引入的一种新的分区分配策略。它的目标是在重新分配时尽可能保持现有的分配不变,以减少重新分配带来的影响。

计算规则

  • StickyAssignor 会在重新分配时尽量保持现有的分区分配不变。
  • 如果需要重新分配,它会尽量将分区分配给已经在消费该分区的消费者,或者分配给负载较轻的消费者。
  • 分配时,尽量使每个消费者的分区数量大致相等。

这里再举个例子

配置方式 

prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

二、Kafka的消费安全问题

消费者线程安全问题

首先,kafka 的Java consumer是单线程的设计,准确来说是双线程,kafka新版本中KafkaConsumer变成了用户主线程和心跳线程的双线程设计

所谓用户主线程,就是你启动Consumer应用程序的main方法的那个线程,而心跳线程只负责定期发送心跳给对应的Broker,以标识消费者应用的存活性,引入心跳线程的目的还有一个:解耦真实的消息处理逻辑与消费者组成员存活性管理。

尽管多了一个心跳线程,但是实际的消息处理还是由主线程完成,所以我们还是可以认为KafkaConsumer是单线程设计的。

那为什么要采用单线程设计的思路呢?

  • 新版本Consumer设计了单线程+轮询的机制,这种设计能够较好的实现非阻塞式的消息获取。(因为一旦是多线程,必然会发送阻塞等待,所以这样读取消息确保是非阻塞的)
  • 单线程的设计能够简化Consumer端的设计,将处理消息的逻辑是否使用多线程的选择,由你来决定。(这里说的多线程是指,单线程依然是获取消息,这个消息要存下来,真正处理这个消息的handler可以提交给线程池去处理)
  • 不论使用那种编程语言,单线程的设计都比较容易实现,并且,单线程设计的Consumer更容易移植到其它语言上。

死信队列和重试队列

重试队列

        与此对应的还有一个“回退队列"的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障,实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

        重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。

死信队列

        当一条消息初次消费失败,消息队列 MQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 MQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,这种正常情况下无法被消费的消息称为死信消息,存储死信消息的特殊队列称为死信队列。

消息丢失和消息重复

在消费者端,消费了消息要提交一个东西叫做offset,就是消息偏移量(也叫位移),代表我现在已经消费到哪个位置的消息了。如果我们对于位移提交控制不好可能出现消息丢失以及消息消息重复的情况

重复消费

这种情况发生在消费了数据但没有及时提交offset

比如开启了自动offset提交,consumer默认5s提交一次offset,提交offset 2s之后consumer挂了,此时已经消费了2s的消息,但是因为没有触发5s时间间隔没有告诉kafka已经消费信息,此时再启动consumer broker还是记录的5s自动提交之前的offset 此时会造成消息的重复消费

消息丢失

这种情况发生消息还没真正消费完,就提交offset了

如果将offset设置为手动提交,当offset被提交时,数据还在内存中未处理,刚好消费者宕机,offset已经提交,数据未处理,此时就算再启动consumer也消费不到之前的数据了,导致了数据漏消费

如果想要consumer精准一次消费,需要kafka消息的消费过程和提交offset变成原子操作,此时需要我们将kafka的offset持久化到其他支持事务的中间件(比如MySOL)

消息堆积

1、如果是kafka消费能力不足,考虑增加topic分区数,并且同时增加消费者组的消费者数量,因为一个partition只能被CG(消费者组)中的一个consumer消费,所以partition和consumer必须同时增加

2、如果是下游数据处理不及时,可以提高每次拉取的数量。因为批次拉取数据过少,会使得处理数据小于生产的数据

配置

fetch.max.bytes消费者获取服务器端一批消息最大的字节数,默认50M
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条

三、消费者offset(位移)管理

消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息,在kafka中,这个位置信息有个专门的术语:位移(offset)

位移类型

有两种位移

1、分区位移

生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表示,假设一个生产者向一个空分区写入了10 条消息,那么这 10 条消息的位移依次是 0、1、…、9;

2、消费位移

注意,这和上面所说的消息在分区上的位移完全不是一个概念,上面的“位移“表示的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了,而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器。

假设一个分区中有 10条消息,位移分别是0到9,某个 Consumer 应用已消费了5条消息,这就说明该 Consumer 消费了位移为0到4的5条消息,此时 Consumer 的位移是5,指向了下一条消息的位移。

至于为什么要有消费位移,很好理解,当Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍,就好像书签一样,需要书签你才可以快速找到你上次读书的位置。

位移信息存放在哪? 

Kafka0.9之后kafka将offset维护在了系统topic __consumer_offsets 中,该主题有50个partition,采用K-V方式存储数据,key=groupId+topic+partition号,value即使当前的ofset值,每隔一段时间,kafka内部会对这个topic进行压缩compact操作,保留最新的offset。

位移提交方式

自动提交

Kafka 消费者在后台定期自动提交偏移量。所以有两个配置

  • enable.auto.commit:设置为 true 以启用自动提交,默认值为 true
  • auto.commit.interval.ms:指定自动提交的间隔时间,默认值为 5000 毫秒(5 秒)。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true");  // 启用自动提交
props.put("auto.commit.interval.ms", "5000");  // 设置自动提交间隔时间为 5 秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

手动提交

消费者在处理完消息后显式地提交偏移量。需要enable.auto.commit设置为 false 以禁用自动提交。

消费者在处理完消息后,显式调用 commitSync() 或 commitAsync() 方法来提交偏移量。
commitSync() 是同步提交,会阻塞直到提交成功或抛出异常。
commitAsync() 是异步提交,不会阻塞,可以提供回调函数来处理提交结果。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false");  // 禁用自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        // 同步提交偏移量
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // 处理提交失败的情况
    e.printStackTrace();
} finally {
    consumer.close();
}

这个代码示例中,commitSync()是在consumer.poll()得到了大量的records之后,只要进行了xommit,那就是对这poll下来的所有records进行提交位移

考虑一种情况,如果在循环执行处理单条record中,发生了死循环或者出现了异常,但是之前的record又被处理过了,就会导致前面的这些record没被提交位移

所以有没有对单条record的commit呢?当然有,这种做法称为逐条提交

consumer.commitSync(
    Collections.singletonMap(
        new TopicPartition(record.topic(), record.partition()), 
        new OffsetAndMetadata(record.offset() + 1)//当前record已处理,所以提交的offset应该是下一条record,需要+1
    )
);

四、分区再均衡

什么是分区再均衡?

        一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息,当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取,在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

        分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡,再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为,在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用,另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

再均衡的过程

        只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息,消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳,如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

总结:什么时候会触发分区再分配

  • Topic中添加一个新的分区,消费者将重新分配
  • 消费者关闭或者崩溃,消费者读取的分区将会分配给其他消费者
  • 消费者群组中添加新的消费者,将分区重新分配

五、Kafka存储结构

Kafka存储数据,是以分区为单位的,每个分区都有自己的log文件夹,下面的文件会分段(segment)存储

为什么分区太多的时候,Kafka性能会下降?

Kafka是在硬盘上顺序存取数据的,但是分区太多,造成写数据会东一个分区西一个分区的找,演变成随机存取了,所以导致kafka性能下降

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2233872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL之JDBC入门详解

01-JDBC入门 一、JDBC概念 jdbc : java database connection , java数据库连接 jdbc是sun公司定义的java程序访问数据库的规范。 二、JDBC操作需要6步 三、入门程序 1、使用eclipse打开一个新的工作空间 2、切换到java视图界面 3、创建java工程&#xff1a;01-jdbc-helloworl…

ctfshow——web(总结持续更新)

文章目录 1、基础知识部分2、php伪协议2.1 php://input协议2.2 data://text/plain协议 3、webshell连接工具3.1 蚁剑连接一句话木马 4、各个web中间件重要文件路径4.1 Nginx 5、sqlmap使用6、php特性6.1 md5加密漏洞6.2 php特殊符号 7、TOP 10漏洞7.1 SQL注入7.2 代码执行7.3 文…

数论——约数(完整版)

2、约数 一个数能够整除另一数&#xff0c;这个数就是另一数的约数。 如2&#xff0c;3&#xff0c;4&#xff0c;6都能整除12&#xff0c;因此2&#xff0c;3&#xff0c;4&#xff0c;6都是12的约数。也叫因数。 1、求一个数的所有约数——试除法 例题&#xff1a; 给定…

python: Parent-child form operations using ttkbootstrap

# encoding: utf-8 # 版權所有 2024 ©塗聚文有限公司 # 許可資訊查看&#xff1a;言語成了邀功的功臣&#xff0c;還需要行爲每日來值班嗎&#xff1f; # 描述&#xff1a; 主、子表單 窗體傳值 Parent-child form operations # Author : geovindu,Geovin Du 塗聚文. …

读书笔记#深入理解Java虚拟机(第三版)# Java内存模型与线程

深入理解Java虚拟机&#xff08;第三版&#xff09;# 高效并发 chap12 Java内存模型与线程 概述 在许多场景下&#xff0c;让计算机同时去做几件事情&#xff0c;不仅是因为计算机的运算能力强大了&#xff0c;还有一个很重要的原因是计算机的运算速度与它的存储和通信子系统的…

文心一言 VS 讯飞星火 VS chatgpt (383)-- 算法导论24.5 3题

三、对引理 24.10 的证明进行改善&#xff0c;使其可以处理最短路径权重为 ∞ ∞ ∞ 和 − ∞ -∞ −∞ 的情况。引理 24.10(三角不等式)的内容是&#xff1a;设 G ( V , E ) G(V,E) G(V,E) 为一个带权重的有向图&#xff0c;其权重函数由 w : E → R w:E→R w:E→R 给出&…

阿里云-部署CNI flannel集群网络

环境 1.一台阿里云作为k8s-master:8.130.XXX.231&#xff08;阿里云私有IP&#xff09; 2.Vmware 两个虚拟机分别作为 k8s-node1:192.168.40.131 k8s-node2:192.168.40.131 3.安装Docker 部署过程 k8s-master,k8s-node1,k8s-node2 初始操作 # 关闭防火墙 systemctl stop fi…

「C/C++」C++设计模式 之 抽象工厂模式(Abstract Factory)

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

免费在线绘图:创意与效率的结合

在数字化时代&#xff0c;绘图已成为各行业人士的基本技能。无论你是设计师、学生、创作者还是爱好者&#xff0c;免费的在线绘图软件都是释放创意和表达思想的理想选择。本文将介绍七款功能全面、免费的在线绘图软件&#xff0c;帮助你轻松实现创作愿景。只需网络连接&#xf…

【教程】Git 标准工作流

目录 前言建仓&#xff0c;拉仓&#xff0c;关联仓库修改代码更新本地仓库&#xff0c;并解决冲突提交代码&#xff0c;合入代码其他常用 Git 工作流删除本地仓库和远程仓库中的文件日志打印commit 相关 前言 Git 是日常开发中常用的版本控制工具&#xff0c;配合代码托管仓库…

基于springboot+vue实现的任务管理系统(源码+L文)4-103

第4章 系统设计 4.1 总体功能设计 员工&#xff0c;经理&#xff0c;管理员都需要登录才能进入任务管理系统&#xff0c;使用者登录时会在后台判断使用的权限类型&#xff0c;包括一般使用者和管理者,一般使用者为员工和经理&#xff0c;对员工只能提供任务信息显示查询&…

vue2中使用vue-awesome-swiper实现轮播

swiper官方文档&#xff1a;Swiper中文网-轮播图幻灯片js插件,H5页面前端开发 1.安装 注意&#xff1a;swiper和vue-awesome-swiper的版本一定一定一定要相对应&#xff0c;版本对应如下&#xff1a; Swiper 5-6 vue-awesome-swiper4.1.1(vue2) Swiper 4.x vue-awesome-swi…

Node.js 入门指南:从零开始构建全栈应用

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;node.js篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来node.js篇专栏内容:node.js-入门指南&#xff1a;从零开始构建全栈应用 前言 大家好&#xff0c;我是青山。作…

favicon是什么文件?如何制作网站ico图标?

一般我们做网站的话&#xff0c;都会制作一个独特的ico图标&#xff0c;命名为favicon.ico。这个ico图标一般会出现在浏览器网页标题前面。如下图红色箭头所示&#xff1a; 部分博客导航大全也会用到所收录网站的ico图标。比如boke123导航新收录的网站就不再使用网站首页缩略图…

“大跳水”的全新奥迪A3,精准狙击年轻人的心

文/王俣祺 导语&#xff1a;随着传统豪华品牌在国内市场的全面崩盘&#xff0c;奥迪再一次坐不住了。这次&#xff0c;奥迪“割肉”的目标瞄准了被称为“年轻人第一台豪车”的奥迪A3&#xff0c;这款车问世以来&#xff0c;就凭借出色的性能与品质收获了一大批年轻粉丝。如今&a…

两台手机如何提词呢,一台手机后置高清摄像一台手机前置提词+实时监测状态的解决方案来喽

拍视频只会用前置摄像头可不行啊&#xff0c; 后置高清才会更有流量&#xff0c; 你看哦&#xff0c;我用的是后置摄像头拍摄&#xff0c; 然后前面就用来提词&#xff0c; 它不光能提词&#xff0c; 和其他家不一样的是&#xff0c; 还能把后面手机画面投影到前面手机 这样呀&…

[SWPUCTF 2021 新生赛]easy_sql的write up

开启NSSCTF靶场&#xff0c;在浏览器中访问链接&#xff0c;看到让我们输入点什么还有标签页名字提示&#xff1a; "参数是wllm" 直接/?wllm1访问一下&#xff1a; 这里就直接用sqlmap直接爆破了&#xff1a; 查看数据库有哪些&#xff1a; python sqlmap.py -u …

Tr2 CYT2B75使用记录(二):GPIO、串口、CAN(FD)和busoff检测、看门狗和复位原因

目录 概述GPIO串口1.FIFO2.中断 CANFDRX Buffer and FIFO ElementTX Buffer Elementbusoff 检测 看门狗复位原因 待梳理 概述 GPIO 1.如何定位IO的作用 2.读取电平必须为输入模式 串口 熟悉手册串口特性如下&#xff1a; ■ 数据帧大小可从4位编程到16位 ■ STOP位的可编程…

Android13 系统/用户证书安装相关分析总结(二) 如何增加一个安装系统证书的接口

一、前言 接着上回说&#xff0c;最初是为了写一个SDK的接口&#xff0c;需求大致是增加证书安装卸载的接口&#xff08;系统、用户&#xff09;。于是了解了一下证书相关的处理逻辑&#xff0c;在了解了功能和流程之后&#xff0c;发现settings中支持安装的证书&#xff0c;只…

【Java语言】继承和多态(一)

继承 继承就是实现代码的复用&#xff1b;简而言之就是重复的代码作为父类&#xff08;基类或超类&#xff09;&#xff0c;而不同的可以作为子类&#xff08;派生类&#xff09;。如果子类想要继承父类的成员就一定需要extends进行修饰&#xff08;如&#xff1a;&#xff08;…