Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

news2025/2/27 5:31:20

Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。

Kafka 生产者(Producer)

1 发送消息到 Kafka

Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生产者示例代码:

// 示例代码:创建 Kafka 生产者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(properties);

// 发送消息到主题 "my-topic"
producer.send(new ProducerRecord<>("my-topic", "key", "Hello, Kafka!"));

// 关闭生产者
producer.close();

2 生产者参数配置

了解如何配置生产者参数是保障生产者性能和可靠性的关键。示例代码:

// 示例代码:配置 Kafka 生产者参数
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);

Kafka 消费者(Consumer)

1 从 Kafka 消费消息

Kafka 消费者负责从指定的主题订阅消息并进行处理。以下是一个简单的消费者示例代码:

// 示例代码:创建 Kafka 消费者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(properties);

// 订阅主题 "my-topic"
consumer.subscribe(Collections.singletonList("my-topic"));

// 消费消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}

// 关闭消费者
consumer.close();

2 消费者组和 Offset

了解消费者组和 Offset 的概念对于实现可伸缩的消息处理系统至关重要。示例代码:

// 示例代码:创建消费者组
properties.put("group.id", "my-group");

// 获取消费者组的当前 Offset
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe

消费者的 Exactly Once 语义

Kafka 提供了强大的消息传递保证,包括至多一次和精确一次。了解如何配置消费者以实现 Exactly Once 语义:

// 示例代码:设置消费者的消息传递语义
properties.put("isolation.level", "read_committed");

扩展话题:生产者和消费者的高级用法

除了基本的消息发送和接收之外,Kafka 生产者和消费者还支持一系列高级用法,可以更灵活地满足各种复杂场景的需求。

1 生产者的事务支持

Kafka 从版本0.11开始引入了事务支持,使得生产者可以实现原子操作,确保消息的可靠性。

// 示例代码:使用 Kafka 事务
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("my-topic", "key", "value"));
    producer.send(new ProducerRecord<>("my-other-topic", "key", "value"));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.close();
} catch (KafkaException e) {
    producer.close();
    throw e;
}

2 消费者的多线程处理

在高吞吐量的场景下,多线程消费消息是提高效率的重要手段。消费者可以通过多线程同时处理多个分区的消息。

// 示例代码:多线程消费者
properties.put("max.poll.records", 500);
properties.put("max.poll.interval.ms", 300000);

Consumer<String, String> consumer = new KafkaConsumer<>(properties);

// 订阅主题 "my-topic"
consumer.subscribe(Collections.singletonList("my-topic"));

// 多线程消费消息
int numberOfThreads = 5;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        executor.submit(() -> processRecord(record));
    }
}

// 关闭消费者
consumer.close();
executor.shutdown();

3 自定义序列化和反序列化

Kafka 默认提供了一些基本的序列化和反序列化器,但你也可以根据需求自定义实现。这在处理复杂数据结构时非常有用。

// 示例代码:自定义序列化器
public class CustomSerializer implements Serializer<MyObject> {
    @Override
    public byte[] serialize(String topic, MyObject data) {
        // 实现自定义序列化逻辑
    }
}

最佳实践和注意事项

在使用 Kafka 生产者和消费者时,需要注意一些最佳实践:

  • 配置合理的参数: 生产者和消费者的性能和行为受到各种参数的影响,需要根据实际场景进行合理配置。

  • 避免阻塞: 长时间的阻塞可能影响整体性能,需要确保消费者在处理消息时是高效而迅速的。

  • 处理异常和错误: 生产者和消费者在运行中可能会遇到各种异常和错误,需要实现适当的异常处理逻辑以确保系统的稳定性。

总结

Apache Kafka 架构中的生产者和消费者是构建实时数据流系统的关键组件,本文深入剖析了它们的工作原理、核心概念以及高级用法。对于生产者而言,不仅介绍了基本的消息发送,还详细探讨了参数配置和事务支持,使得开发者能更好地利用其强大功能。消费者部分不仅涵盖了消息的接收和消费,还深入讨论了消费者组、Offset、以及如何实现 Exactly Once 语义。文章进一步扩展到高级话题,包括生产者的事务支持、消费者的多线程处理和自定义序列化,使大家能够灵活应对不同的业务需求。

最后,本文总结了最佳实践和注意事项,强调了合理配置参数、避免阻塞、处理异常等方面的重要性。通过深刻理解这些核心组件,以及在实践中的灵活应用,开发者能够更好地构建高效、可靠的实时数据流系统。生产者和消费者作为 Kafka 生态系统的基石,为处理大规模、高并发的数据流提供了强大的工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1281700.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RPG项目01_脚本代码

基于“RPG项目01_场景及人物动画管理器”&#xff0c;我们创建一个XML文档 在资源文件夹下创建一个文件夹&#xff0c; 命名为Xml 将Xnl文档拖拽至文件夹中&#xff0c; 再在文件夹的Manager下新建脚本LoadManager 写代码&#xff1a; using System.Collections; using System…

基于CNN对彩色图像数据集CIFAR-10实现图像分类--keras框架实现

项目地址&#xff08;kaggle&#xff09;&#xff1a;基于CNN对彩色图像数据集CIFAR-10实现图像分类--keras | Kaggle 项目地址&#xff08;Colab&#xff09;&#xff1a;https://colab.research.google.com/drive/1gjzglPBfQKuhfyT3RlltCLUPgfccT_G9 导入依赖 在tensorflow…

若依的基本使用

演示使用网址:若依管理系统 网站:RuoYi 若依官方网站 |后台管理系统|权限管理系统|快速开发框架|企业管理系统|开源框架|微服务框架|前后端分离框架|开源后台系统|RuoYi|RuoYi-Vue|RuoYi-Cloud|RuoYi框架|RuoYi开源|RuoYi视频|若依视频|RuoYi开发文档|若依开发文档|Java开源框架…

绝地求生在steam叫什么?

绝地求生在Steam的全名是《PlayerUnknowns Battlegrounds》&#xff0c;简称为PUBG。作为一款风靡全球的多人在线游戏&#xff0c;PUBG于2017年3月23日正式上线Steam平台&#xff0c;并迅速成为一部热门游戏。 PUBG以生存竞技为核心玩法&#xff0c;玩家将被投放到一个辽阔的荒…

基于 ESP32 的带触摸显示屏的 RFID 读取器

如何设计一款基于 ESP32 且具有 ILI9341 触摸屏显示屏且适合壁挂式安装的美观 RFID 读取器。 本项目中用到的东西 硬件组件 ESP32 开发套件 C 1 AZ-Touch ESP 套件 1 RFID-RC522 IC卡读写器 1 ​编辑 电线、绕包线 1 详细设计流程 …

结构体实现位段

一.什么是位段 位段的声明和结构是类似的&#xff0c;有两个不同&#xff1a; 位段的成员必须是 int、unsigned int 或 signed int &#xff0c;在C99中位段成员的类型也可以 选择其他类型。 位段的成员名后边有⼀个冒号和⼀个数字 struct A {int a : 5;int b : 4;int c : 2…

SpringBoot的配置加载优先级

目录 一、背景分析 二、学习资源 三、具体使用 四、一些小技巧 方式一 方式二 一、背景分析 SpringBoot项目在打包之后&#xff0c;其配置文件就在jar包内&#xff0c;如果没有<配置文件优先级>这个机制&#xff0c;那么项目打成jar包之后&#xff0c;如果启动项目…

avue-crud中时间范围选择默认应该是0点却变成了12点

文章目录 一、问题二、解决三、最后 一、问题 在avue-crud中时间范围选择&#xff0c;正常默认应该是0点&#xff0c;但是不知道怎么的了&#xff0c;选完之后就是一直是12点。具体问题如下动图所示&#xff1a; <template><avue-crud :option"option" /&g…

数据结构入门————树(C语言/零基础/小白/新手+模拟实现+例题讲解)

目录 1. 树的概念及其结构 1.1 树的概念&#xff1a; 1.2 树的相关概念&#xff1a; 1.3 树的表示方法&#xff1a; ​编辑 1.4 树的应用&#xff1a; 2. 二叉树的概念及其结构 2.1 概念: 2.2 特点&#xff1a; 2.3 特殊二叉树&#xff1a; 2.4 二叉树的性质&#xf…

shell编程-awk命令详解(超详细)

文章目录 前言一、awk命令介绍1. awk命令简介2. awk命令的基本语法3. 常用的awk命令选项4. 常用的awk内置变量 二、awk命令示例用法1. 打印整行2. 打印特定字段3. 根据条件筛选行4. 自定义分隔符5. 从文件中读取awk脚本 总结 前言 awk命令是一种强大的文本处理工具&#xff0c…

Qt OpenCV 学习(一):环境搭建

对应版本 Qt 5.15.2OpenCV 3.4.9MinGW 8.1.0 32-bit 1. OpenCV 下载 确保安装 Qt 时勾选了 MinGW 编译器 本文使用 MinGW 编译好的 OpenCV 库&#xff0c;无需自行编译 确保下载的 MinGW 和上述安装 Qt 时勾选的 MinGW 编译器位数一致&#xff0c;此处均为 x86/32-bit下载地址…

LongAddr

目录 1. 引言 2. AtomicInteger的局限性 3. AtomicInteger与LongAdder 的性能差异 4.LongAdder 的结构 LongAddr架构 Striped64中重要的属性 Striped64中一些变量或者方法的定义 Cell类 5. 分散热点的原理 具体流程图 6. 在实际项目中的应用 7. 总结 1. 引言 在这一…

【C++练级之路】【Lv.2】类和对象(上)(类的定义,访问限定符,类的作用域,类的实例化,类的对象大小,this指针)

目录 一、面向过程和面向对象初步认识二、类的引入三、类的定义四、类的访问限定符及封装4.1 访问限定符4.2 封装 五、类的作用域六、类的实例化七、类的对象大小的计算7.1 类对象的存储方式猜测7.2 如何计算类对象的大小 八、类成员函数的this指针8.1 this指针的引出8.2 this指…

课题学习(十四)----三轴加速度计+三轴陀螺仪传感器-ICM20602

本篇博客对ICM20602芯片进行学习&#xff0c;目的是后续设计一个电路板&#xff0c;采集ICM20602的数据&#xff0c;通过这些数据对各种姿态解算的方法进行仿真学习。 一、 ICM20602介绍 1.1 初识芯片 3轴陀螺仪&#xff1a;可编程全刻度范围(FSR)为250 dps&#xff0c;500 d…

Proteus仿真--基于ADC0832设计的两路电压表

本文介绍基于ADC0832实现的双路电压表采集设计&#xff08;完整仿真源文件及代码见文末链接&#xff09; 仿真图如下 采集芯片选用ADC0832&#xff0c;电压显示在LCD1602液晶显示屏上 仿真运行视频 Proteus仿真--基于ADC0832设计的两路电压表 附完整Proteus仿真资料代码资料…

自动驾驶学习笔记(十四)——感知算法

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo Beta宣讲和线下沙龙》免费报名—>传送门 文章目录 前言 感知算法 开发过程 测试和评价 前言…

【Linux】cp 命令使用

cp 命令 cp&#xff08;英文全拼&#xff1a;copy file&#xff09;命令主要用于复制文件或目录。 著者 由Torbjorn Granlund、David MacKenzie和Jim Meyering撰写。 语法 cp [选项]... [-T] 源文件 目标文件或&#xff1a;cp [选项]... 源文件... 目录或&#xff1a;cp [选…

【Docker】容器数据持久化及容器互联

一、Docker容器的数据管理 1.1、什么是数据卷 数据卷是经过特殊设计的目录&#xff0c;可以绕过联合文件系统&#xff08;UFS&#xff09;&#xff0c;为一个或者多个容器提供访问&#xff0c;数据卷设计的目的&#xff0c;在于数据的永久存储&#xff0c;它完全独立于容器的…

深入理解GMP模型

1、GMP模型的设计思想 1&#xff09;、GMP模型 GMP分别代表&#xff1a; G&#xff1a;goroutine&#xff0c;Go协程&#xff0c;是参与调度与执行的最小单位M&#xff1a;machine&#xff0c;系统级线程P&#xff1a;processor&#xff0c;包含了运行goroutine的资源&#…

canvas绘制单个圆和同心圆

代码实现&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdev…