Kafka 消费者 API 指南:深入探讨消费者的实现与最佳实践

news2025/1/13 13:47:51

Kafka 消费者 API 是连接应用程序与 Kafka 集群之间的关键接口,用于从 Kafka 主题中拉取消息并进行处理。本篇文章将深入探讨 Kafka 消费者 API 的核心概念、用法,以及一些最佳实践,帮助你构建高效、可靠的消息消费系统。

1. Kafka 消费者 API 概览

Kafka 消费者 API 允许应用程序从 Kafka 集群中的指定主题订阅消息,并以流式的方式进行消费。消费者 API 提供了丰富的配置选项和强大的消息处理功能,使得开发者能够根据实际需求进行高度定制。

1.1 引入依赖

首先,确保项目中引入了 Kafka 相关的依赖,例如 Maven 中的:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version> <!-- 替换为你的 Kafka 版本 -->
</dependency>

1.2 创建消费者实例

使用 Kafka 消费者 API 首先需要创建一个消费者实例。以下是一个简单的示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {

    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        // 拉取消息并处理
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 处理消息逻辑
            records.forEach(record -> {
                System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
            });
        }
    }
}

2. 消息的订阅与拉取

2.1 订阅主题

使用 subscribe 方法订阅一个或多个主题,使消费者能够从这些主题中拉取消息。

consumer.subscribe(Collections.singletonList("my-topic"));

2.2 拉取消息

通过 poll 方法拉取消息,该方法返回一个包含消费记录的 ConsumerRecords 对象。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

3. 消费者组与分区分配

3.1 消费者组

Kafka 消费者可以组成一个消费者组,共同消费一个主题。消费者组能够实现负载均衡和故障恢复。

props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

3.2 分区分配

消费者组内的每个消费者会被分配一个或多个分区,以实现消息的并行处理。

consumer.subscribe(Collections.singletonList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

4. 消息处理与提交

4.1 处理消息

通过遍历 ConsumerRecords 对象,可以处理拉取到的每条消息。

records.forEach(record -> {
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

4.2 手动提交偏移量

消费者可以选择手动提交偏移量,确保消息被成功处理。

consumer.commitSync();

5. 消费者的配置选项

Kafka 消费者 API 同样提供了众多配置选项,根据实际需求进行灵活定制。以下是一些常用的配置选项:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 更多配置项...

6. 消费者的事务支持

Kafka 消费者 API 也支持事务,确保消息的一致性。以下是事务的基本用法:

consumer.subscribe(Collections.singletonList("my-topic"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        consumer.beginTransaction();
        records.forEach(record -> {
            // 处理消息逻辑
            System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
        });
        consumer.commitTransaction();
    }
} finally {
    consumer.close();
}

7. 性能调优和最佳实践

7.1 提高并行性

通过增加消费者实例的数量,可以提高消息的并行处理能力。

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟

7.2 手动管理偏移量

在某些场景下,手动管理偏移量能够更精细地控制消息的处理逻辑。

consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
        // 处理消息逻辑
        System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
    });
    consumer.commitAsync();
}

总结

通过本文的介绍,对 Kafka 消费者 API 有了更深入的了解。从创建消费者实例、消息的订阅与拉取,再到消费者组与分区分配、消息处理与提交,这些都是构建高效、可靠 Kafka 消费者系统的核心知识点。在实际应用中,根据业务需求和性能期望,结合消费者 API 的灵活配置,可以更好地发挥 Kafka 在消息消费领域的优势。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1288004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[CAD]接下来导出一张高清大图

选择输出-范围&#xff0c;点击右侧绿色画框&#xff0c;划区一个范围 点击输出区域并设置右侧选项。 下图&#xff0c;大大大 页面设置替代-大大大 输出即可&#xff0c;可以说是非常的清晰了

HttpRunner4 Python版(十二)自动化测试平台 实战开发接入案例 技术实现 功能逻辑大致梳理 实行方案初稿

前言 通过之前的文档相信你对HttpRunner 4.x Python版本以后有较为深入的理解和认识了,本文主要讲解 动化测试平台 实战开发接入案例 技术实现 功能逻辑大致梳理 实行方案初稿,后续具体案例需要根据自身项目组的功能去具体实现,并在日常维护工作中逐步完善并增加其健壮性。 …

使用pyscenedetect进行视频场景切割

1. 简介 在视频剪辑有转场一词&#xff1a;一个视频场景转换到另一个视频场景&#xff0c;场景与场景之间的过渡或转换&#xff0c;就叫做转场。 本篇介绍一个强大的开源工具PySceneDetect&#xff0c;它是一款基于opencv的视频场景切换检测和分析工具&#xff0c;项目地址: h…

Azure Machine Learning - 使用 Azure OpenAI 服务生成文本

使用 Azure OpenAI 服务生成文本 关注TechLead&#xff0c;分享AI全维度知识。作者拥有10年互联网服务架构、AI产品研发经验、团队管理经验&#xff0c;同济本复旦硕&#xff0c;复旦机器人智能实验室成员&#xff0c;阿里云认证的资深架构师&#xff0c;项目管理专业人士&…

DataGrip连接虚拟机上Docker部署的Mysql出错解决

1.1 首先判断CentOS的防火墙&#xff0c;如果开启就关闭 //查看防火墙状态 systemctl status firewalld //关闭防火墙systemctl stop firewalld.service//关闭防火墙开机自启systemctl disable firewalld.service而后可以打开DataGrip连接了&#xff0c;如果连接不上执行如下…

Selenium+Unittest+HTMLTestRunner框架更改为Selenium+Pytest+Allure(一)

背景&#xff1a;之前的框架&#xff0c;Selenium是3.x版本&#xff0c;现在更新到4.15版本后&#xff0c;一些写法如find_element_by_xxx 不再支持&#xff0c;改为find_element(By.xxx)的方式&#xff0c;同时由于Unittest不如Pytest在执行方面灵活&#xff08;比如只执行冒烟…

【开源】基于Vue和SpringBoot的开放实验室管理系统

项目编号&#xff1a; S 013 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S013&#xff0c;文末获取源码。} 项目编号&#xff1a;S013&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实…

26、卷积 - 实际上是一个特征提取器

矩阵乘法的本质是特征的融合&#xff0c;卷积算法的本质是特征的提取。 回想一下之前所有介绍卷积的时候&#xff0c;描述了一种卷积运算的场景&#xff0c;那就是一个窗口在图片上滑动&#xff0c;窗口中的数值是卷积核的参数&#xff0c;也就是权值。 卷积的计算本质是乘累…

[组合数学]LeetCode:2954:统计感冒序列的数目

作者推荐 [二分查找]LeetCode2040:两个有序数组的第 K 小乘积 题目 给你一个整数 n 和一个下标从 0 开始的整数数组 sick &#xff0c;数组按 升序 排序。 有 n 位小朋友站成一排&#xff0c;按顺序编号为 0 到 n - 1 。数组 sick 包含一开始得了感冒的小朋友的位置。如果位…

DS图应用--最短路径

Description 给出一个图的邻接矩阵&#xff0c;再给出指定顶点v0&#xff0c;求顶点v0到其他顶点的最短路径 Input 第一行输入t&#xff0c;表示有t个测试实例 第二行输入n&#xff0c;表示第1个图有n个结点 第三行起&#xff0c;每行输入邻接矩阵的一行&#xff0c;以此类…

Apollo新版本Beta技术沙龙

有幸参加Apollo开发者社区于12月2日举办的Apollo新版本(8.0)的技术沙龙会&#xff0c;地址在首钢园百度Apollo Park。由于去的比较早&#xff0c;先参观了一下这面的一些产品&#xff0c;还有专门的讲解&#xff0c;主要讲了一下百度无人驾驶的发展历程和历代产品。我对下面几个…

第3章 接入网

文章目录 3.1.1 接入网的定义与接口3.1.2 接入网的功能结构 3.1 接入网概述 3.1.1 接入网的定义与接口 电信网按网络功能分&#xff0c;分为&#xff1a;接入网、交换网和传输网。交换网和传输网合在一起称为核心网。 接入网&#xff08;Access Network&#xff0c;AN&am…

在AWS Lambda上部署标准FFmpeg工具——Docker方案

大纲 1 确定Lambda运行时环境1.1 Lambda系统、镜像、内核版本1.2 运行时1.2.1 Python1.2.2 Java 2 启动EC23 编写调用FFmpeg的代码4 生成docker镜像4.1 安装和启动Docker服务4.2 编写Dockerfile脚本4.3 生成镜像 5 推送镜像5.1 创建存储库5.2 给EC2赋予角色5.2.1 创建策略5.2.2…

[足式机器人]Part2 Dr. CAN学习笔记-数学基础Ch0-3线性化Linearization

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-数学基础Ch0-3线性化Linearization 1. 线性系统 Linear System 与 叠加原理 Superposition2. 线性化&#xff1a;Taylor Series3. Summary 1. 线性系统 Linear System 与 叠加原理 Superposition…

Linux基础命令(测试相关)

软件测试相关linux基础命令笔记 操作系统 常见Linux&#xff1a; Redhat系列&#xff1a;RHSL、Centos、FedoraDebian系列&#xff1a;Debian、Ubuntu以上操作系统都是在原生Linux系统上&#xff0c;增加了一些软件或功能。linux的文件及路径特点 Linux没有盘符的概念&#xf…

LeetCode-478. 在圆内随机生成点【几何 数学 拒绝采样 随机化】

LeetCode-478. 在圆内随机生成点【几何 数学 拒绝采样 随机化】 题目描述&#xff1a;解题思路一&#xff1a;一个最简单的方法就是在一个正方形内生成随机采样的点&#xff0c;然后拒绝不在内切圆中的采样点。解题思路二&#xff1a;具体思想是先生成一个0到r的随机数len&…

css处理 纯英文数据不换行问题 - word-break、word-wrap

问题图 解决 添加 css 样式 word-break: break-all;补充 还有一个 word-wrap 样式&#xff0c;可以看下 参考 &#xff1a; word-wrap: normal 只在允许的断字点换行&#xff08;浏览器保持默认处理&#xff09;。word-wrap: break-word 在长单词或 URL 地址内部进行换行。

智能优化算法应用:基于闪电连接过程算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于闪电连接过程算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于闪电连接过程算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.闪电连接过程算法4.实验参数设定5.算…

Leetcode—205.同构字符串【简单】

2023每日刷题&#xff08;五十&#xff09; Leetcode—205.同构字符串 算法思想 参考自k神思路 实现代码 class Solution { public:unordered_map<char, char> s2t, t2s;bool isIsomorphic(string s, string t) {int n s.size();for(int i 0; i < n; i) {char …

uniapp横向滚动示例

目录 插件市场案例最后 插件市场 地址 案例 地址 最后 感觉文章好的话记得点个心心和关注和收藏&#xff0c;有错的地方麻烦指正一下&#xff0c;如果需要转载,请标明出处&#xff0c;多谢&#xff01;&#xff01;&#xff01;