【kafka】十五、kafka消费者API

news2025/2/25 4:39:02

kafka消费者API

Consumer消费数据时的可靠性是很容易保证的,因为数据在kafka中是持久化的,故不用担心数据丢失的问题。

由于consumer在消费过程中可能会出现断电宕机的等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后可以继续消费。

所以,offset的维护是consumer消费数据必须考虑的问题。

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

1.自动提交offset

KafkaConsumer:创建一个kafka消费者对象,用来消费数据

ConsumerConfig:获取所需的一系列配置参数

ConsumerRecord:每条数据都要封装成ConsumerRecord对象

public class MyConsumer {

    public static void main(String[] args) {
        //创建配置信息
        Properties properties = new Properties();

        //配置信息赋值
        //连接kafka集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
        //开启自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交offset的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //key, value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("bigdata"));
        //循环不断拉取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

通过生产者生产消息,之后在控制台可以看到:

image-20220302222101359

如果启动消费者之后,控制台一直在kafka的日志,可以在resources目录下新创建logback.xml文件,添加下面的代码,更改日志级别:

<logger name="org.apache.kafka.clients.consumer" level="off" />

2.重置offset

//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group1");

//重置消费者的offset,默认是latest
/**
  * 重新消费一个主题的数据需要满足条件:更换一个新的消费者组(或者offset过期),且配置auto.offset.reset=earliest
  * 配置earliest不等于offset就是0,因为之前的数据可能会被删除,offset就不是从0开始的
  */
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

3.手动提交offset

自动提交虽然十分便利,但是由于是基于时间提交的,开发人员难以把握offset提交的时机,配置时间过长容易造成服务等待时间太久,配置时间过短又可能会出现服务异常但offset又成功提交了。因此kafka提供了手动提交offset的API。

如果关闭自动提交offset,在消费者服务启动期间,消费暂时是正常的,消费者每次消费之后offset会更新到服务内存中,但是并没有通知kafka同步更新最新的offset,当重启消费者之后,会从kafka中获取在kafka最新的offset进行消费,这样就会造成重复消费

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交offset的两种方法:commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由于不可控因素,会出现提交失败),而commitAsync则没有失败重试机制,也有可能提交失败。

3.1 同步提交offset

同步提交有offset重试机制,会更加可靠

public class CustomConsumer {

    public static void main(String[] args) {
        //创建配置信息
        Properties properties = new Properties();

        //连接kafka
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
        //关闭自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //key, value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("bigdata"));

        //拉取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }

            //同步提交 当前线程会阻塞直到offset提交成功
            consumer.commitSync();
        }

    }
}

如果没有consumer.commitSync(),生产者生产消息后,消费者消费完成后不会通知kafka同步更新offset,当重启消费者服务,会从kafka端的offset重新消费数据,会重复消费

3.2异步提交offset

虽然同步提交会更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响,所以在更多的情况下会选择异步提交offset

public class CustomConsumer {

    public static void main(String[] args) {
        //创建配置信息
        Properties properties = new Properties();

        //连接kafka
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
        //关闭自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //key, value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("bigdata"));

        //拉取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }

            //同步提交 当前线程会阻塞直到offset提交成功
            //consumer.commitSync();

            //异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.out.println("提交失败:" + offsets);
                    }
                }
            });
        }

    }
}

无论同步提交还是异步提交offset,都有可能会造成数据的丢失或者重复消费。先提交offset后消费,可能造成数据的丢失;先消费后提交offset,可能造成数据重复消费

4.自定义存储offset

待补充…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/40551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

visual studio 2019 + Qt 开发,使用visual leak detector检测内存泄漏

选择了在vs2019上开发Qt, 遇到了内存泄露问题。还好vs上有方便的visual leak detector&#xff08;vld&#xff09;检测工具。 虽然官网上只支持到vs2015, 但vs2019上也能用。 具体参考这位博主的文章&#xff1a;https://blog.csdn.net/qq_22108657/article/details/1208843…

Redis数据库安装(Windows)

目录 一、下载Windows安装包 二、启动Redis 1.在终端中启动 2.使用start.bat文件启动 3.添加服务启动 三、安装Redis可视化管理工具 1.安装Redis图形客户端 2.连接数据库 一、下载Windows安装包 下载地址&#xff1a;Releases tporadowski/redis GitHub 选择下载相…

单链表的递归详解 (leetcode习题+ C++实现)

文章目录合并两个有序链表翻转链表链表中移除节点合并两个有序链表 传送门&#xff1a; https://leetcode.cn/problems/merge-two-sorted-lists/description/ 题目要求&#xff1b; 给你两个有序的链表&#xff0c;将这两个链表按照从小到大的关系&#xff0c;合并两个链表为…

Mybatis快速入门

Mybatis安装与配置 Mybatis概述 Mybatis本质上是一个别人写好的框架&#xff0c;用于简化 JDBC 开发&#xff0c;通过Mybatis框架&#xff0c;可以极大的降低JDBC的开发难度。 官方文档&#xff1a;https://mybatis.org/mybatis-3/zh/index.html Mybatis快速入门 需求&…

MySQL进阶实战10,MySQL全文索引

一、全文索引 全文索引的目的是 通过关键字的匹配进行查询过滤&#xff0c;基于相似度的查询&#xff0c;而不是精确查询。 全文索引利用分词技术分析出文字中某关键字的频率和重要性&#xff0c;并按照一定的算法智能的筛选出我们想要的结果。 全文索引一般用于字符串中某关…

tomcat服务器安装及配置教程(保姆级教程)

Tomcat安装教程 &#xff08;以tomcat-9.0.62为例&#xff1a;&#xff09; 1.下载安装包 可以从官网下载安装包&#xff1a; &#xff08;1&#xff09;从官网下载 输入网址进入官网 选择版本10&#xff0c;版本9&#xff0c;或者版本8&#xff0c;都可以&#xff0c;这里…

掘金热榜首推!阿里内部都在用的Java后端面试笔记,囊括99%的主流技术

纵观今年的技术招聘市场&#xff0c; Java依旧是当仁不让的霸主 &#xff01;即便遭受 Go等新兴语言不断冲击&#xff0c;依旧岿然不动。究其原因&#xff1a; Java有着极其成熟的生态&#xff0c;这个不用我多说&#xff1b;Java在 运维、可观测性、可监 控性方面都有着非常优…

Spring Boot JPA 本机查询示例

在本教程中&#xff0c;您将了解如何在 Spring 引导中使用 Spring Data JPA 本机查询示例&#xff08;带参数&#xff09;。我将向您展示&#xff1a; 将 Spring JPA 本机查询与Query注释一起使用的方法如何在 Spring 引导中执行 SQL 查询具有 WHERE 条件的 JPA 选择查询示例 …

动态SQL

动态SQL 可以根据具体的参数条件&#xff0c;来对SQL语句进行动态拼接。比如在以前的开发中&#xff0c;由于不确定查询参数是否存在&#xff0c;许多人会使用类似于where 1 1 来作为前缀&#xff0c;然后后面用AND 拼接要查询的参数&#xff0c;这样&#xff0c;就算要查询的…

MongoShake数据灾备与迁移

安装部署 解压 建议部署在离目标端近的地方&#xff0c;比如部署再目标端本地 tar -zxvf mongo-shake-v2.8.1.tgz配置 同构环境下主要参数 启动 执行下述命令启动同步任务&#xff0c;并打印日志信息&#xff0c;-verbose 0表示将日志打印到文件&#xff0c;在后台运行 …

【Linux从入门到放弃】Linux基本指令大全

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《Linux从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; L…

黑苹果之微星(MSI)主板BIOS详细设置篇

很多童鞋安装黑苹果的时候会卡住&#xff0c;大部分原因是cfg lock 没有关闭&#xff0c;以及USB端口或SATA模式设置错误。 为了避免这些安装阶段报错的情况发生&#xff0c;今天给大家分享一下超详细的BIOS防踩坑设置指南--微星&#xff08;MSI&#xff09;主板BIOS篇&#xf…

springcloud总结篇

一.整体结构 springcloud总体架构 对比学习 二.具体 1.场景模拟 订单服务调用库存服务来更新数据库中的库存 2.springcloud问题解析 Eureka OpenFeign &#xff08;RibbonRestTemplate&#xff09; Hystrix Gateway config Bus 订单服务只知道库存服务的名称…

Python测试-unittest,2022-11-27

(2022.11.27 Sun) unittest是Python自带的单元测试框架。unittesthtml和pytestallure(测试报告)成为常用的自动测试和报告的框架组合。 unittest-archi-2022-11-23-2114.png 概念 test case测试用例&#xff1a;测试用例是测试的基本单元&#xff0c;用于测试一组特定输入的特…

OpenCV图像特征提取学习四,SIFT特征检测算法

一、SIFT特征检测概述 SIFT的全称是Scale Invariant Feature Transform&#xff0c;尺度不变特征变换&#xff0c;由加拿大教授David G.Lowe提出的。SIFT特征具有对旋转、尺度缩放、亮度变化等保持不变性&#xff0c;是一种非常稳定的局部特征。 1.1 SIFT算法具的特点 图像…

平衡搜索树——AVL树小记

文章目录二叉搜索树平衡搜索树AVL树定义AVL中平衡(Balance)因子的定义AVL树插入规则AVL树失衡情况左左失衡/右右失衡左右失衡RL失衡代码左旋-调整平衡插入调整平衡因子AVL树正确性的验证二叉搜索树 理想情况下&#xff0c;二叉搜索树的查找时间复杂度是0(log(n)) 但是&#xff…

Linux 进程概念 —— 初识操作系统(OS)

文章目录1. 概念2. 设计操作系统的目的3. 定位4. 如何理解管理5. 再谈操作系统&#x1f351; 硬件部分&#x1f351; 操作系统&#x1f351; 驱动程序&#x1f351; 用户部分&#x1f351; 系统调用接口&#x1f351; 用户接口操作6. 总结1. 概念 任何计算机系统都包含一个基本…

HTML5基础汇总

目录 一&#xff0c;html5文档头部 1.页面标题及字符集的收集 &#xff08;1&#xff09;.title标签 &#xff08;2&#xff09;.charset属性 2.元信息的设置 &#xff08;1&#xff09;.meta标签的作用 &#xff08;2&#xff09;.http-equiv/content &#xff08;2&am…

数据结构堆介绍,图文详解分析——Java/Kotlin双版本代码

堆介绍 堆是一种特殊的树结构。根据根节点的值与子节点值的大小关系&#xff0c;堆又分为最大堆和最小堆。 最大堆&#xff1a;每个节点的值总是大于或者等于其任意子节点的值。所以最大堆中根节点即为最大值。 最小堆&#xff1a;每个节点的值总是小于或者等于其任意子节点…

第六章课后题(LSTM | GRU)

目录习题6-3 当使用公式(6.50)作为循环神经网络得状态更新公式时&#xff0c;分析其可能存在梯度爆炸的原因并给出解决办法.习题6-4 推导LSTM网络中参数的梯度&#xff0c;并分析其避免梯度消失的效果​编辑习题6-5 推导GRU网络中参数的梯度&#xff0c;并分析其避免梯度消失的…