Kafka核心参数与使用02

news2025/1/11 13:35:59

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
        new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 业务处理
    }
    // 手动提交 offset
    consumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/kafka/send/{message}")
        public void sendMessage(@PathVariable("message") String msg) {
            kafkaTemplate.send("topic1", msg);
        }
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {
        @KafkaListener(topics = {"topic1"})
        public void onMessage(ConsumerRecord<?, ?> record) {
            System.out.println("消费内容:" + record.value());
        }
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2274932.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ElasticSearch 认识和安装ES

文章目录 一、为什么学ElasticSearch?1.ElasticSearch 简介2.ElasticSearch 与传统数据库的对比3.ElasticSearch 应用场景4.ElasticSearch 技术特点5.ElasticSearch 市场表现6.ElasticSearch 的发展 二、认识和安装ES1.认识 Elasticsearch&#xff08;简称 ES&#xff09;2.El…

mysql和redis的最大连接数

平时我们要评估mysql和redis的最大连接数&#xff0c;可以选择好环境&#xff08;比如4核8G&#xff09;,定好压测方法&#xff08;没有索引的mysql单表&#xff0c;redis单key&#xff09;进行压测&#xff0c;评估其最大并发量。 也可以查看各大云厂商的规格进行评估。 mys…

2025年中科院分区大类划分公布!新增8155本

2025年中科院分区表变更情况 扩大收录范围 2025年的期刊分区表在原有的自然科学&#xff08;SCIE&#xff09;、社会科学&#xff08;SSCI&#xff09;和人文科学&#xff08;AHCI&#xff09;的基础上&#xff0c;增加了ESCI期刊的收录&#xff0c;并根据这些期刊的数据进行…

机器人避障不再“智障”:HEIGHT——拥挤复杂环境下机器人导航的新架构

导读&#xff1a; 由于环境中静态障碍物和动态障碍物的约束&#xff0c;机器人在密集且交互复杂的人群中导航&#xff0c;往往面临碰撞与延迟等安全与效率问题。举个简单的例子&#xff0c;商城和车站中的送餐机器人往往在人流量较大时就会停在原地无法运作&#xff0c;因为它不…

Spring Boot教程之五十二:CrudRepository 和 JpaRepository 之间的区别

Spring Boot – CrudRepository 和 JpaRepository 之间的区别 Spring Boot建立在 Spring 之上&#xff0c;包含 Spring 的所有功能。由于其快速的生产就绪环境&#xff0c;使开发人员能够直接专注于逻辑&#xff0c;而不必费力配置和设置&#xff0c;因此如今它正成为开发人员…

加速物联网HMI革命,基于TouchGFX的高效GUI显示方案

TouchGFX 是一款针对 STM32 微控制器优化的先进免费图形软件框架。 TouchGFX 利用 STM32 图形功能和架构&#xff0c;通过创建令人惊叹的类似智能手机的图形用户界面&#xff0c;加速了物联网 HMI 革命。 TouchGFX 框架包括 TouchGFX Designer (TouchGFXDesigner)&#xff08;…

Java-数据结构-栈与队列(StackQueue)

一、栈(Stack) ① 栈的概念 栈是一种特殊的线性表&#xff0c;它只允许固定一端进行"插入元素"和"删除元素"的操作&#xff0c;这固定的一端被称作"栈顶"&#xff0c;对应的另一端就被称做"栈底"。 &#x1f4da; 栈中的元素遵循后…

案例研究:UML用例图中的结账系统

在软件工程和系统分析中&#xff0c;统一建模语言&#xff08;UML&#xff09;用例图是一种强有力的工具&#xff0c;用于描述系统与其用户之间的交互。本文将通过一个具体的案例研究&#xff0c;详细解释UML用例图的关键概念&#xff0c;并说明其在设计结账系统中的应用。 用…

【动态规划篇】欣赏概率论与镜像法融合下,别出心裁探索解答括号序列问题

本篇鸡汤&#xff1a;没有人能替你承受痛苦&#xff0c;也没有人能拿走你的坚强. 欢迎拜访&#xff1a;羑悻的小杀马特.-CSDN博客 本篇主题&#xff1a;带你解答洛谷的括号序列问题&#xff08;绝对巧解&#xff09; 制作日期&#xff1a;2025.01.10 隶属专栏&#xff1a;C/C题…

点击底部的 tabBar 属于 wx.switchTab 跳转方式,目标页面的 onLoad 不会触发(除非是第一次加载)

文章目录 1. tabBar 的跳转方式2. tabBar 跳转的特点3. 你的配置分析4. 生命周期触发情况5. 总结 很多人不明白什么是第一次加载&#xff0c;两种情况讨论&#xff0c;第一种情况假设我是开发者&#xff0c;第一次加载就是指点击微信开发者工具上边的编译按钮&#xff0c;每点击…

CUDA、CUDNN以及tensorRT的版本对应关系

各版本的对应除了可以看文件的命名上还可以查看TensorRT的Release日志&#xff1a; Release Notes :: NVIDIA Deep Learning TensorRT Documentation 这个是官方测试TensorRT的Release日志&#xff0c;里面指明了当前测试的TensorRT版本是在哪个CUDNN等库下的测试结果&#x…

设计模式(观察者模式)

设计模式&#xff08;观察者模式&#xff09; 第三章 设计模式之观察者模式 观察者模式介绍 观察者模式&#xff08;Observer Design Pattern&#xff09; 也被称为发布订阅模式 。模式定义&#xff1a;在对象之间定义一个一对多的依赖&#xff0c;当一个对象状态改变的时候…

Helm部署activemq

1.helm create activemq 创建helm文件目录 2.修改values.yaml 修改image和port 3. helm template activemq 渲染并输出 4. helm install activemq activemq/ -n chemical-park // 安装 5.启动成功

Untiy中如何嵌入前端页面,从而播放推流视频?

最近工作中频繁用到unity,虽然楼主之前也搞过一些UNTY游戏开发项目&#xff0c;但对于视频这块还是不太了解&#xff0c;所以我们采用的方案是在Unity里寻找一个插件来播放推流视频。经过我的一番寻找&#xff0c;发现了这款Vuplex 3D WebView&#xff0c;它可以完美的打通Unit…

rabbitmq的三个交换机及简单使用

提前说一下&#xff0c;创建队列&#xff0c;交换机&#xff0c;绑定交换机和队列都是在生产者。消费者只负责监听就行了&#xff0c;不用配其他的。 完成这个场景需要两个服务哦。 1直连交换机-生产者的代码。 在配置类中创建队列&#xff0c;交换机&#xff0c;绑定交换机…

Excel 技巧07 - 如何计算到两个日期之间的工作日数?(★)如何排除节假日计算两个日期之间的工作日数?

本文讲了如何在Excel中计算两个日期之间的工作日数&#xff0c;以及如何排除节假日计算两个日期之间的工作日数。 1&#xff0c;如何计算到两个日期之间的工作日数&#xff1f; 其实就是利用 NETWORKDAYS.INTL 函数 - weekend: 1 - 星期六&#xff0c;星期日 2&#xff0c;如…

初学stm32 --- DAC模数转换器工作原理

目录 什么是DAC&#xff1f; DAC的特性参数 STM32各系列DAC的主要特性 DAC框图简介&#xff08;F1/F4/F7&#xff09; 参考电压/模拟部分电压 触发源 关闭触发时(TEN0)的转换时序图 DMA请求 DAC输出电压 什么是DAC&#xff1f; DAC&#xff0c;全称&#xff1a;Digital…

从预训练的BERT中提取Embedding

文章目录 背景前置准备思路利用Transformer 库实现 背景 假设要执行一项情感分析任务&#xff0c;样本数据如下 可以看到几个句子及其对应的标签&#xff0c;其中1表示正面情绪&#xff0c;0表示负面情绪。我们可以利用给定的数据集训练一个分类器&#xff0c;对句子所表达的…

从CentOS到龙蜥:企业级Linux迁移实践记录(系统安装)

引言&#xff1a; 随着CentOS项目宣布停止维护CentOS 8并转向CentOS Stream&#xff0c;许多企业和组织面临着寻找可靠替代方案的挑战。在这个背景下&#xff0c;龙蜥操作系统&#xff08;OpenAnolis&#xff09;作为一个稳定、高性能且完全兼容的企业级Linux发行版&#xff0…

车联网安全--TLS握手过程详解

目录 1. TLS协议概述 2. 为什么要握手 2.1 Hello 2.2 协商 2.3 同意 3.总共握了几次手&#xff1f; 1. TLS协议概述 车内各ECU间基于CAN的安全通讯--SecOC&#xff0c;想必现目前多数通信工程师们都已经搞的差不多了&#xff08;不要再问FvM了&#xff09;&#xff1b;…