Kafka Consumer开发

news2025/1/10 17:44:36

Kafka Consumer - 消费者

跟生产者一样,消费者也属于kafka的客户端,不过kafka消费者是从kafka读取数据的应用,侧重于读数据。一个或多个消费者订阅kafka集群中的topic,并从broker接收topic消息,从而进行业务处理。今天来学习下kafka consumer基本使用。

消费者example

组件版本

  • kafka_2.13-3.3.1
  • JDK17
  • apache-maven-3.6.0

Maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
 </dependency>

消费者代码

public static void main(String[] args){

        String topicName = "consumer-topic";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");
        props.put("enable.auto.commit", true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, MessageDto> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
        try {
            while (true) {
                ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
                records.forEach(record -> {
                    System.out.println("Message received " + record.value());
                });
            }
        }finally {
            consumer.close();
        }
    }

测试验证

  • 创建topic

    ./bin/kafka-topics.sh --create --topic consumer-topic --bootstrap-server localhost:9092
    
  • 启动生产者 - 这里使用kafka自带的生产者脚本进行测试

    ./bin/kafka-console-producer.sh --topic consumer-topic --bootstrap-server localhost:9092
    
  • 测试结果

    在这里插入图片描述

至此 一个简单的kafka消费者程序已经开发完成,代码不多,开发起来也快。但是关于kafka 消费者内部有很多的原理、细节需要去梳理,否则出现问题就会茫然失措,不知所以。

pull VS poll

上面的消费者程序有一个很核心的细节需要关注,即kafka 消费者以什么的方式对数据进行消费。对比其他传统的消息中间件,消息消费的方式主要有两种:

  • 推送模式 - broker 主动推送消息给消费者
  • 拉取模式- 消费者主动从broker拉取消息

kafka在设计之处,就考虑这个问题:消费者从broker拉取数据,还是broker主动推送数据给消费者。在这方面kafka采用更为传统的设计:消费者主动拉取,其优势如下:

  • 拉取模式 可以根据消费者自身的消费能力对数据处理。如生产者大量数据,消费者消费能力有限
  • 拉取模式 消费者可以根据实际情况对数据进行批量处理。推送模式很难做到这一点
  • broker被设计成无状态模式,broker不需要对记录每一个消费者的偏移量,由客户端自己控制 便于kafka集群扩展

消息传递语义

在介绍消息传递语义之前,首先要了解下kafka 消费者位置(也叫做偏移量)管理。

位移管理

kafka 消费者端需要为每个读取的topic 分区保存消费进度,即当前分区中消费者消费消息的最新位置。该位置也叫做偏移量- offset。消费者需要定期地想kafka提交自己的位置信息,实际上,偏移量通常是下一条待消费消息的位置。如下图

在这里插入图片描述

从kafka broker读取消息,开发者可以选择提交偏移量的时间,消费者默认自动提交偏移量,这可能会带来一些风险。

最多一次

在这种情况下,在调用poll()后,一旦收到消息批,就立即提交偏移量。如果后续处理失败(如业务处理过程中发生异常,数据只是被从Broker读取出来,并没有真正的处理),消息将丢失。它不会被再次读取,因为这些消息的偏移量已经提交。

在这里插入图片描述

  1. 批量拉取数据
  2. 消费者自动提交偏移量
  3. 对消息进行业务处理,如发送email,此时系统奔溃
  4. 系统重启后,从上次已提交的偏移量进行读取,在业务上造成消息丢失

至少一次

在至少一次语义定义中,broker消息的每一个消息都会被传递到消费者,但是可能会存在重复拉取的场景,从而导致消息被重复处理。跟最多一次提交位置偏移量的时机不同,至少一次在处理消息后提交偏移量。

因此需要确保消息处理的幂等性,如对数据进行插入、更新操作;防止重复消费导致数据出现错乱。

至少一次消息处理的流程大致如下:

  1. 批量拉取数据

  2. 此时消费者并不提交偏移量

  3. 对消息进行业务处理

    3.1 处理完成 提交偏移量 进行下一次拉取数据

    3.2 消息处理失败(此时可能有一部分数据处理完成,还有一部分数据尚未处理)

  4. 重启应用 拉取数据,又会拉取之前的数据 导致消息被重复处理

精确一次

有些场景不仅需要至少一次语义(保证数据不丢失),还需要精确一次语义。每条消息只投递一次,这需要消费者应用程序跟kafka相互配合、相互合作就可以实现精确一次语义

  • 使用kafka事务API实现精确一次语义
  • 对于消费者应用程序,要有效地实现一次,必须使用幂等性消费

位移配置

props.put("enable.auto.commit", true);

enable.auto.commit 参数默认值为true,kafka默认在后台线程中周期性的提交消费者偏移量

auto.commit.interval.ms默认为5秒,如果enable.auto.commit参数设置为true,即消费者5秒提交一次位移。

在至少一次、精确一次语义中 需要将该参数设置为false,由应用程序手动提交偏移量

//...
props.put("enable.auto.commit", false);
//...
while (true) {
  ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
  records.forEach(record -> {
    System.out.println("Message received " + record.value());
  });
  //提交偏移量
  consumer.commitSync();
}

根据不用的应用场景,kakfa提供了多个API让开发者对消费者位移进行手动管理

在这里插入图片描述

auto.offset.reset

指定消费者从broker拉取数据的位置,有以下几个选项可以配置

  • earliest - 从最开始进行消费
  • latest - 从最后消费的偏移量进行消费 默认值
  • none - 如果未找到使用者组的先前偏移量,则向使用者抛出异常

消费者组

props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");

在开发kafka消费者代码时,必须指定消费者组,否则会报错,那么该参数有什么作用呢。在回答这个问题之前,先假设两个应用场景

  • kafka中消息特别多,需要增加消费者加快消息处理的速度,避免出现消息堆积
  • 某一类消息特别重要,需要被多个应用程序同时消费 - 如购买商品的消息,需要被库存应用、积分应用同时消费

借用RocketMQ中的概念(个人觉得比较合适),以上两种应用场景叫做集群消费、广播消费

  • 集群消费 - 多个消费者共同消费某一个主题内的消息
  • 广播消费 - 每一个消息被多个消费者同时消费

kafka 内部以消费者组的方式实现以上两点要求

  • 同一个消费组的不同消费实例 共同消费topiic的消息
  • 同一个消息被不同的消费组同时消费

在开发代码时,只需要按需更改一下配置即可

props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_B");
props.put("client.id", "client_02");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/112956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一种嵌入式项目的参数保存方案

设计背景 嵌入式项目中&#xff0c;为了保证系统的正常运转&#xff0c;通常需要保存一部分数据至非易失存储设备如flash中。此处提供了一种通用的方案用于快速在项目中集成参数保存功能&#xff0c;该方案有以下几点特征&#xff1a; 接口简便&#xff0c;方便快速集成以及使用…

东北大学2023分布式操作系统实验

1.实验目的 建立伪分布式&#xff08;有条件的可以建立分布式环境&#xff09;的Hadoop环境&#xff0c;并成功运行示例程序。 2.Hadoop简介 2.1 Hadoop项目基础结构 在其核心&#xff0c;Hadoop主要有两个层次&#xff0c;即&#xff1a; 加工/计算层(MapReduce)存储层(Ha…

Python pandas有几千个库函数,你用过几个?(1)

对Python的 pandas 库所有的内置元类、函数、子模块等全部浏览一遍&#xff0c;然后挑选一些重点学习一下。我安装的库版本号为1.3.5&#xff0c;如下&#xff1a; >>> import pandas as pd >>> pd.__version__ 1.3.5 >>> print(pd.__doc__)pandas…

C++ STL vector list set map容器循环通过迭代器删除元素注意事项

先说说写这篇博客的原因吧&#xff0c;同事转部门了&#xff0c;把他手头的工作交接给了我。他以前维护的一个模块&#xff0c;会将外部输入的数据缓存起来分段处理&#xff0c;处理完了就会清除缓存数据&#xff0c;最近出现了一个bug&#xff0c;缓存数据一直不清除&#xff…

【SpringMVC】非注解的处理器映射器和适配器

项目目录 1.导入的依赖 pom.xml <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><…

【K3s】第2篇 一篇文章学习实践K3s部署安装

目录 1、docker安装 2、docker-compose安装 3、K3s安装 3.1 k3s与install.sh文件准备 3.2 k3s 安装步骤 4、查看k3s部署状态 1、docker安装 方式一 https://fanjufei.blog.csdn.net/article/details/123500511https://fanjufei.blog.csdn.net/article/details/123500511 …

12.24

接口测试 ​ <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width…

Unity3D异步加载场景SceneManager.LoadSceneAsync()卡住,并不异步,获取process直接到0.9的问题

问题阐述&#xff1a; 一般来说&#xff0c;在加载场景的时候&#xff0c;会因为所加载资源的大小、复杂度、电脑配置等因素导致加载过程耗费一定的时间。虽然这个加载时间是不可避免的&#xff0c;但是在这一小段卡着的时间里如果就这样卡着的话会大大降低玩家体验。 所以很多…

(matlab编程基础)数组的基本操作

目录 1、数组寻址 2、数组元素删除 3、数组查找和排序 &#xff08;1&#xff09;数组查找 &#xff08;2&#xff09;数组排序 4、数组运算 5、数组操作函数 数组操作主要从以下5部分进行介绍:数组寻址、数组元素的删除、数组查找和排序、数组运算和数组操作函数。 1、…

PS CS6视频剪辑基本技巧(五)添加logo、动画和画中画

系列讲座导读 PS CS6视频剪辑基本技巧&#xff08;一&#xff09;CS6可以实现的视频剪辑功能 PS CS6视频剪辑基本技巧&#xff08;二&#xff09;视频剪接和添加图片 PS CS6视频剪辑基本技巧&#xff08;三&#xff09;添加声音和字幕 PS CS6视频剪辑基本技巧&#xff08;四&am…

RMQ - ST表

RMQ - ST表 1、RMQ 简介 RMQ (Range Minimum / Maximum Query) 问题是指&#xff1a;对于长度为 nnn 的数列 AAA&#xff0c;回答若干询问 (A,i,j)(A, i, j)(A,i,j) (1≤i,j≤n)(1≤i,j≤n)(1≤i,j≤n)&#xff0c;返回数列 A 中区间在 [i,j][i,j][i,j] 中的最小 (大) 值所在…

Vue2.0全面教程

Vue2.0 学习视频地址 文章目录Vue2.01.vue 简介1.1.什么是vue1.2.vue的两个特性1.2.1.数据驱动视图1.2.2.双向数据绑定1.3.MVVM概述1.4.MVVM工作原理2.vue的基本使用2.1.基本使用步骤2.2.基本代码与MVVM的对应关系3.vue的调试工具3.1.安装vue-devtool调试工具3.2.配置Chrome浏…

Axios(三)

目录 1.Axios的默认配置 2.Axios创建Ajax实例对象发送请求 3.Axios拦截器 4.Axios取消请求 5.Axios文件结构说明 6.Axios创建过程 7.Axios对象创建过程模拟实现 8.Axios发送请求过程详解 9.模拟实现Axios发送请求 1.Axios的默认配置 <!doctype html> <html …

QT系列第7节 标准对话框使用

QT编程中对话框作为最常用的窗口经常被使用&#xff0c;本节介绍一些QT常用的标准对话框使用。 目录 1.选择单文件 2.选择多文件 3.选择目录 4.文件存储 5.选择颜色 6.选择字体 7.输入字符换/整数/浮点数/条目 8.消息对话框 9.进度对话框 10.向导对话框 1.选择单文件…

【圣诞节限定】今天教你如何用Html+JS+CSS绘制3D动画圣诞树

一、前言 应CSDN的邀请&#xff0c;这次给大家展示一波&#xff0c;如何用H5技术绘制3D圣诞树。 二、创意名 只采用简单的HtmlJSCSS 技术绘制。 三、效果展示 圣诞树修过如下&#xff1a; 四、编码实现 将源码复制保存到html中打开即可。 <!DOCTYPE html> <html lang…

ChatGPT与BimAnt的1小时对话实录【数字孪生】

本文为BimAnt和ChatGPT对数字孪生相关问题的解答&#xff0c;感觉这个AI真的已经“懂”了很多东西&#xff0c;让人恍惚间忘了是在和bot对话。 BimAnt&#xff1a;hello ChatGPT&#xff1a;Hello! How can I help you today? BimAnt&#xff1a;can you speak chinese&am…

鲸鱼优化算法及其在无线网络资源分配中的应用(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 鲸鱼优化算法&#xff08;Whale Optimization Algorithm&#xff09;是一种新兴的智能优化算法&#xff0c;在2016年提出。算法…

JWT渗透与攻防(二)

目录 前言 JWT漏洞演示之CTFhub&#xff08;一&#xff09; JWT漏洞演示之CTFhub&#xff08;二&#xff09; 前言 我们在之前的文章中已经讲解过了JWT漏洞相关的原理和利用&#xff0c;今天我们就通过这篇文章再来了解一下JWT的漏洞。 JWT漏洞演示之CTFhub&#xff08;一&…

Linux-信号

文章目录信号准备知识&#xff1a;信号产生的方式实验验证&#xff1a;9号信号是不可被捕捉&#xff08;自定义的&#xff09;信号处理&#xff1a;信号产生前&#xff1a;信号产生的方式&#xff1a;键盘实验显示&#xff1a;段错误&#xff08;野指针&#xff09;实验验证&am…

SSRF ME XCTF

题目 就是一个验证框和URL框&#xff0c;两个都必须有参数 解法 验证码 做一个粗略的脚本&#xff0c;一般验证码都是数字&#xff0c;所以直接开md5&#xff1a; def cmpcapt(substr):for i in range(1,100001):md5 hashlib.md5(str(i).encode(utf-8))hexmd5 md5.hexd…