【kafka实战】05 Kafka消费者消费消息过程源码剖析

news2025/2/10 22:21:53

1. 概述

Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理等。本文将深入剖析Kafka消费者消费消息的源码,并结合相关原理图进行讲解。

以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简单的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。

1.1 消费者代码使用示例

  • 已经安装并启动了 Kafka 集群。
  • 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

示例代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        // 自动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 键和值的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建 Kafka 消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        try {
            // 订阅主题
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));

            // 持续消费消息
            while (true) {
                // 从 Kafka 拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

代码解释

  1. 配置消费者属性

    • BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
    • GROUP_ID_CONFIG:指定消费者所属的消费组。
    • ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
    • KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
  2. 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。

  3. 订阅主题:使用 subscribe 方法订阅指定的主题。

  4. 持续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。

  5. 关闭消费者:在消费完成后,使用 close 方法关闭消费者。

注意事项

  • 确保 Kafka 集群的地址和主题名称正确。
  • 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync()commitAsync() 方法手动提交偏移量。

2. Kafka消费者消费消息的核心流程

Kafka消费者消费消息的核心流程可以分为以下几个步骤:

  1. 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
  2. 分区分配:组协调器为消费者分配分区。
  3. 消息拉取:消费者从分配的分区中拉取消息。
  4. 消息处理:消费者处理拉取到的消息。
  5. 提交偏移量:消费者提交已处理消息的偏移量。

下面我们将结合源码详细分析每个步骤。
在这里插入图片描述

3. 源码剖析


关键组件说明

  1. ConsumerCoordinator

    • 负责消费者组的协调和分区分配。
    • 管理消费者的心跳和重平衡。
  2. Fetcher

    • 负责从 Kafka Broker 拉取消息。
    • 管理拉取请求和响应的处理。
  3. SubscriptionState

    • 管理消费者订阅的主题和分区。
    • 记录消费者的消费偏移量。
  4. PartitionAssignor

    • 负责分区分配策略的实现。
  5. OffsetCommitCallback

    • 处理偏移量提交的回调逻辑。

3.1 消费者组协调

消费者在启动时,首先需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。

// org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    // 1. 订阅主题
    this.subscriptions.subscribe(new HashSet<>(topics), listener);
    
    // 2. 加入消费者组
    coordinator.subscribe(subscriptions);
}

subscribe方法中,消费者首先订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。

3.2 分区分配

组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配策略由PartitionAssignor决定,Kafka提供了多种内置的分区分配策略,如RangeAssignorRoundRobinAssignor等。

// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
private void ensurePartitionAssignment() {
    // 1. 获取分区分配结果
    Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());
    
    // 2. 更新消费者的分区分配
    subscriptions.assignFromSubscribed(assignments.get(consumerId));
}

ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。

3.3 消息拉取

消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
    // 1. 拉取消息
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
    
    // 2. 返回拉取到的消息
    return new ConsumerRecords<>(records);
}

poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。

3.4 消息处理

消费者在拉取到消息后,会对消息进行处理。消息处理的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处理等。

// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
    // 1. 拉取消息
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
    
    // 2. 处理消息
    for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
        for (ConsumerRecord<K, V> record : entry.getValue()) {
            // 用户自定义的消息处理逻辑
            processRecord(record);
        }
    }
    
    // 3. 返回拉取到的消息
    return new ConsumerRecords<>(records);
}

poll方法中,消费者通过processRecord方法处理每条消息。processRecord方法的具体实现由用户自定义。

3.5 提交偏移量

消费者在处理完消息后,需要提交已处理消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交策略,如自动提交、同步提交、异步提交等。

// org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
public void commitSync() {
    // 1. 提交偏移量
    coordinator.commitOffsetsSync(subscriptions.allConsumed());
}

commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交成功。

4. 原理图

以下是Kafka消费者消费消息的核心流程示意图:

+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|  消费者组协调      | ----> |     分区分配       | ----> |     消息拉取       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
                                                                      |
                                                                      v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|     消息处理       | <---- |     提交偏移量     | <---- |     网络传输       |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+

5. 总结

Kafka消费者消费消息的过程涉及多个步骤,包括消费者组的协调、分区分配、消息拉取、消息处理和偏移量提交。通过源码剖析,我们可以更深入地理解Kafka消费者的工作原理。希望本文能够帮助你更好地理解Kafka消费者的内部机制。

6. 参考

  • Kafka官方文档
  • Kafka源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296002.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[EAI-033] SFT 记忆,RL 泛化,LLM和VLM的消融研究

Paper Card 论文标题&#xff1a;SFT Memorizes, RL Generalizes: A Comparative Study of Foundation Model Post-training 论文作者&#xff1a;Tianzhe Chu, Yuexiang Zhai, Jihan Yang, Shengbang Tong, Saining Xie, Dale Schuurmans, Quoc V. Le, Sergey Levine, Yi Ma 论…

算法与数据结构(字符串相乘)

题目 思路 这道题我们可以使用竖式乘法&#xff0c;从右往左遍历每个乘数&#xff0c;将其相乘&#xff0c;并且把乘完的数记录在nums数组中&#xff0c;然后再进行进位运算&#xff0c;将同一列的数进行相加&#xff0c;进位。 解题过程 首先求出两个数组的长度&#xff0c;…

DeepSeek从入门到精通:全面掌握AI大模型的核心能力

文章目录 一、DeepSeek是什么&#xff1f;性能对齐OpenAI-o1正式版 二、Deepseek可以做什么&#xff1f;能力图谱文本生成自然语言理解与分析编程与代码相关常规绘图 三、如何使用DeepSeek&#xff1f;四、DeepSeek从入门到精通推理模型推理大模型非推理大模型 快思慢想&#x…

【异常解决】在idea中提示 hutool 提示 HttpResponse used withoud try-with-resources statement

博主介绍&#xff1a;✌全网粉丝22W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

【Uniapp-Vue3】UniCloud云数据库获取指定字段的数据

使用where方法可以获取指定的字段&#xff1a; let db uniCloud.database(); db.collection("数据表").where({字段名1:数据, 字段名2:数据}).get({getOne:true}) 如果我们不在get中添加{getOne:true}&#xff0c;在只获取到一个数据res.result.data将会是一个数组&…

信息科技伦理与道德3-2:智能决策

2.2 智能推荐 推荐算法介绍 推荐系统&#xff1a;猜你喜欢 https://blog.csdn.net/search_129_hr/article/details/120468187 推荐系统–矩阵分解 https://blog.csdn.net/search_129_hr/article/details/121598087 案例一&#xff1a;YouTube推荐算法向儿童推荐不适宜视频 …

Visual Studio 2022 中使用 Google Test

要在 Visual Studio 2022 中使用 Google Test (gtest)&#xff0c;可以按照以下步骤进行&#xff1a; 安装 Google Test&#xff1a;确保你已经安装了 Google Test。如果没有安装&#xff0c;可以通过 Visual Studio Installer 安装。在安装程序中&#xff0c;找到并选择 Googl…

WGCLOUD监控系统部署教程

官网地址&#xff1a;下载WGCLOUD安装包 - WGCLOUD官网 第一步、环境配置 #安装jdk 1、安装 EPEL 仓库&#xff1a; sudo yum install -y epel-release 2、安装 OpenJDK 11&#xff1a; sudo yum install java-11-openjdk-devel 3、如果成功&#xff0c;你可以通过运行 java …

协议-WebRTC-HLS

是什么&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09; 实现 Web 浏览器和移动应用程序之间通过互联网直接进行实时通信。允许点对点音频、视频和数据共享&#xff0c;而无需任何插件或其他软件。WebRTC 广泛用于构建视频会议、语音通话、直播、在线游…

MySQL系列之数据类型(String)

导览 前言一、字符串类型知多少 1. 类型说明2. 字符和字节的转换 二、字符串类型的异同 1. CHAR & VARCHAR2. BINARY & VARBINARY3. BLOB & TEXT4. ENUM & SET 结语精彩回放 前言 MySQL数据类型第三弹闪亮登场&#xff0c;欢迎关注O。 本篇博主开始谈谈MySQ…

【C++高并发服务器WebServer】-15:poll、epoll详解及实现

本文目录 一、poll二、epoll2.1 相对poll和select的优点2.2 epoll的api2.3 epoll的demo实现2.5 epoll的工作模式 一、poll poll是对select的一个改进&#xff0c;我们先来看看select的缺点。 我们来看看poll的实现。 struct pollfd {int fd; /* 委托内核检测的文件描述符 */s…

git提交到GitHub问题汇总

1.main->master git默认主分支是maser&#xff0c;如果是按照这个分支名push&#xff0c;GitHub会出现两个branch&#xff0c;与预期不符 解决方案&#xff1a;更改原始主分支名为main git config --global init.defaultBranch main2.git&#xff1a;OpenSSL SSL_read: SS…

CNN-GRU卷积神经网络门控循环单元多变量多步预测,光伏功率预测(Matlab完整源码和数据)

代码地址&#xff1a;CNN-GRU卷积神经网络门控循环单元多变量多步预测&#xff0c;光伏功率预测&#xff08;Matlab完整源码和数据) CNN-GRU卷积神经网络门控循环单元多变量多步预测&#xff0c;光伏功率预测 一、引言 1.1、研究背景和意义 随着全球能源危机和环境问题的日…

编译原理面试问答

编译原理面试拷打 1.编译原理的基本概念 编译原理是研究如何将高级程序语言转换为计算机可执行代码的理论与技术&#xff0c;其核心目标是实现高效、正确的代码翻译。 **编译器&#xff1a;**将源代码转化为目标代码&#xff08;机器码、字节码等&#xff09;。一次翻译整个程…

LIMO:上海交大的工作 “少即是多” LLM 推理

25年2月来自上海交大、SII 和 GAIR 的论文“LIMO: Less is More for Reasoning”。 一个挑战是在大语言模型&#xff08;LLM&#xff09;中的复杂推理。虽然传统观点认为复杂的推理任务需要大量的训练数据&#xff08;通常超过 100,000 个示例&#xff09;&#xff0c;但本文展…

Ollama 部署本地大语言模型

一、下载安装ollama 1.百度 ollama Ollama 2.点击下载 可以复制下载链接&#xff0c;使用下载器下载。 3.双击安装 默认安装目录&#xff1a;C:\Users\用户名\AppData\Local\Programs\Ollama 二、更改模型下载目录 0.默认下载目录 (跳过) 之前没下载过模型&#xff0c;不…

pytest-xdist 进行多进程并发测试!

在软件开发过程中&#xff0c;测试是确保代码质量和可靠性的关键步骤。随着项目规模的扩大和复杂性的增加&#xff0c;测试用例的执行效率变得尤为重要。为了加速测试过程&#xff0c;特别是对于一些可以并行执行的测试用 例&#xff0c;pytest-xdist 提供了一种强大的工具&…

24.ppt:小李-图书策划方案【1】

目录 NO1234​ NO5678​ NO1234 新建PPT两种方式&#x1f447;docx中视图→导航窗格→标题1/2/3ppt新建幻灯片→从大纲→重置开始→版式设计→主题插入→表格 NO5678 SmartArt演示方案&#xff1a;幻灯片放映→自定义幻灯片放映→新建→选中添加

模型 替身决策

系列文章分享模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。替身决策&#xff0c;换位思考&#xff0c;多角度决策。 1 替身决策模型的应用 1.1 替身决策模型在面试中的应用-小李的求职面试 小李是一名应届毕业生&#xff0c;正在积极寻找工作机会。在面试过程中…

ESP32S3读取数字麦克风INMP441的音频数据

ESP32S3 与 INMP441 麦克风模块的集成通常涉及使用 I2S 接口进行数字音频数据的传输。INMP441 是一款高性能的数字麦克风&#xff0c;它通过 I2S 接口输出音频数据。在 Arduino 环境中&#xff0c;ESP32S3 的开发通常使用 ESP-IDF&#xff08;Espressif IoT Development Framew…