Apache Kafka - 重识消费者

news2024/11/24 2:36:29

文章目录

  • 概述
    • Kafka消费者的工作原理
    • Kafka消费者的配置
    • Kafka消费者的实现
      • 高级API
      • 低级API
  • 导图
    • 总结

在这里插入图片描述


概述

Kafka是一个分布式的消息队列系统,它的出现解决了传统消息队列系统的吞吐量瓶颈问题。

Kafka的高吞吐量、低延迟和可扩展性使得它成为了很多公司的首选消息队列系统。

在Kafka中,消息被分成了不同的主题(Topic),每个主题又被分成了不同的分区(Partition)。

生产者(Producer)将消息发送到指定的主题中,而消费者(Consumer)则从指定的主题中读取消息。

接下来我们将介绍Kafka消费者相关的知识。

Kafka消费者的工作原理

Kafka消费者从指定的主题中读取消息,消费者组(Consumer Group)则是一组消费者的集合,它们共同消费一个或多个主题。在一个消费者组中,每个消费者都会独立地读取主题中的消息。当一个主题有多个分区时,每个消费者会读取其中的一个或多个分区。消费者组中的消费者可以动态地加入或退出,这样就可以实现消费者的动态扩展。

Kafka消费者通过轮询(Polling)方式从Kafka Broker中读取消息。当一个消费者从Broker中读取到一条消息后,它会将该消息的偏移量(Offset)保存在Zookeeper或Kafka内部主题中。消费者组中的消费者会协调并平衡分区的分配,保证每个消费者读取的分区数量尽可能均衡。

Kafka消费者的配置

  1. bootstrap.servers

    该参数用于指定Kafka集群中的broker地址,多个地址以逗号分隔。消费者会从这些broker中获取到集群的元数据信息,以便进行后续的操作。

  2. group.id

    该参数用于指定消费者所属的消费组,同一消费组内的消费者共同消费一个主题的消息。如果不指定该参数,则会自动生成一个随机的group.id。

  3. enable.auto.commit

    该参数用于指定是否启用自动提交offset。如果设置为true,则消费者会在消费消息后自动提交offset;如果设置为false,则需要手动提交offset。

  4. auto.commit.interval.ms

    该参数用于指定自动提交offset的时间间隔,单位为毫秒。只有当enable.auto.commit设置为true时,该参数才会生效。

  5. session.timeout.ms

    该参数用于指定消费者与broker之间的会话超时时间,单位为毫秒。如果消费者在该时间内没有发送心跳包,则会被认为已经失效,broker会将其从消费组中移除。

  6. max.poll.records

    该参数用于指定每次拉取消息的最大条数。如果一次拉取的消息数量超过了该参数指定的值,则消费者需要等待下一次拉取消息。

  7. auto.offset.reset

    该参数用于指定当消费者第一次加入消费组或者offset失效时,从哪个位置开始消费。可选值为latest和earliest,分别表示从最新的消息和最早的消息开始消费。

  8. max.poll.interval.ms

    该参数用于指定两次poll操作之间的最大时间间隔,单位为毫秒。如果消费者在该时间内没有进行poll操作,则被认为已经失效,broker会将其从消费组中移除。

  9. fetch.min.bytes

    该参数用于指定每次拉取消息的最小字节数。如果一次拉取的消息数量不足该参数指定的字节数,则消费者需要等待下一次拉取消息。

  10. fetch.max.wait.ms

    该参数用于指定拉取消息的最大等待时间,单位为毫秒。如果在该时间内没有获取到足够的消息,则返回已经获取到的消息。


Kafka消费者的实现

Kafka消费者的实现可以使用Kafka提供的高级API或者低级API。高级API封装了低级API,提供了更加简洁、易用的接口。下面分别介绍一下这两种API的使用方法。

高级API

使用高级API可以更加方便地实现Kafka消费者。下面是一个使用高级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。

低级API

使用低级API可以更加灵活地实现Kafka消费者。下面是一个使用低级API实现Kafka消费者的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "fal	VCC se");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        consumer.commitSync(Collections.singletonMap(record.topicPartition(), new OffsetAndMetadata(record.offset() + 1)));
    }
}

在上面的代码中,我们首先创建了一个Properties对象,用于存储Kafka消费者的配置信息。然后创建了一个KafkaConsumer对象,并指定了要消费的主题。最后使用poll方法从Broker中读取消息,并对每条消息进行处理。在处理完每条消息后,我们使用commitSync方法手动提交偏移量。


导图

在这里插入图片描述

总结

Kafka消费者是Kafka消息队列系统中的重要组成部分,它能够从指定的主题中读取消息,并进行相应的处理。在使用Kafka消费者时,需要注意消费者组ID、自动提交偏移量、偏移量重置策略以及消息处理方式等配置信息。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/549719.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++基础语法——模板

1. 泛型编程 在平常的编写中&#xff0c;对于一个实现固定作用的函数&#xff0c;如交换两变量的值的Swap函数&#xff0c;对于不同类型只能编写相对应的重载函数&#xff0c;即 void Swap(int& left, int& right) {int temp left;left right;right temp; }void S…

二叉树的认识(二)

既然要认识二叉树&#xff0c;自然要知道二叉树的基本操作。首先最基本的是要知道二叉树的遍历&#xff0c;所谓遍历(Traversal)是指沿着某条搜索路线&#xff0c;依次对树中每个结点均做一次且仅做一次访问。访问结点所做的操作依赖于具体的应用问题(比如&#xff1a;打印节点…

postgresql|数据库|postgresql-12的内置插件pg_stat_statements的启用和使用

前言&#xff1a; 插件就是原软件的扩展功能。postgresql有非常多的各种各样的插件&#xff0c;当然了&#xff0c;插件不安装对于我们使用数据库并没有什么太多的影响&#xff0c;可能只是不舒服一些而已&#xff0c;但有一些插件我们如果有安装&#xff0c;那么&#xff0c;对…

chatgpt赋能Python-pythonfrom

PythonFrom是什么&#xff1f; PythonFrom 是一种基于 Python 语言的开源数据采集与清洗框架&#xff0c;它提供了现代化的数据处理流程&#xff0c;非常适合于爬虫、数据挖掘和机器学习等应用场景。 特点 1. 简单易学 PythonFrom 采用了类似于 SQL 的语法结构&#xff0c;…

Spark Json系列UDF 姿势大全

主要基于jsonpath GitHub - yangyongyongyong/sparkThomasUDF at dev 解决的痛点 每次修改都要写udf函数 重复劳动,所以这里把json中常见的修改和读取都封装起来 场景案例 读取value 数组类型结果 读取value string类型结果 jsonArray 新增 元素 jsonObject 新增/更新 kv对 …

Pycharm安装教程,附详细图解

简介 PyCharm是一款Python IDE&#xff0c;其带有一整套可以帮助用户在使用Python语言开发时提高其效率的工具&#xff0c;比如&#xff0c; 调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制等等。此外&#xff0c;该IDE提供了一些高级功能&a…

Android RecyclerView实现侧滑删除

距上次写博客有半年多了&#xff0c;回忆起来都觉得不可思议&#xff0c;中间也想憋俩大招&#xff0c;总是被耽误&#xff0c;这俩月忙完之后&#xff0c;终于空下来了&#xff0c;恰好新项目我和UI俩人商量一下&#xff0c;用MD来实现app。中间有个需求是RecyclerView中侧滑显…

node + alipay-sdk 沙箱环境简单测试电脑网站支付

正式上线需要上传营业执照&#xff0c;不知道怎么去申请一个。。。。。 使用沙箱测试&#xff0c;首先前往支付宝开放平台控制台可看到左下方的沙箱测试链接&#xff1a; 然后设置接口加签方式&#xff0c;选择系统默认密钥&#xff1a; 系统默认密钥 -> 公钥模式 -> 查看…

将小米SoundMove 无缝接入 ChatGPT

将小米SoundMove 无缝接入 ChatGPT 本教程内容参考 Github 地址(可选)部署查看小米 SoundMove 信息的环境(可选)查看小米 SoundMove 的信息以容器方式部署程序到小米万兆路由器实际效果有待改善点 本教程内容 1 是记录了将小米 SoundMove 接入 ChatGPT 的操作步骤。 2 是将小米…

Chrome Performance 页面性能分析

Chrome Performance 页面性能分析 背景介绍 性能优化是前端开发一个非常重要的组成部分&#xff0c;如何更好地进行网络传输&#xff0c;如何优化浏览器渲染过程&#xff0c;来定位项目中存在的问题。Chrome DevTools给我们提供了2种常用方式 Audits和Performance&#xff0c…

Flink学习——基本转换算子

目录 一、filter算子 二、map算子 三、聚合算子 1.keyBy——按键分区 2.简单聚合 (1)min&#xff1a;在输入流上&#xff0c;对指定的字段求最小值 (2)minBy&#xff1a;返回包含字段最小值的整条数据 (3)max&#xff1a;在输入流上&#xff0c;对指定的字段求最大值 …

【Navicat 连接MySQL时出现错误1251:客户端不支持服务器请求的身份验证协议;请考虑升级MySQL客户端】

使用Navicat连接时报1251错误&#xff0c;如下图&#xff1a; 原因 MySQL8.0后的版本加密规则是“caching_sha2_password”&#xff0c;而 MySQL8.0之前的版本加密规则是“mysql_native_password” 解决办法 更改加密规则&#xff0c;将MySQL用户登录密码加密规则还原成“…

Python:常见的面试题和答案

1. 什么是Python&#xff1f; 答&#xff1a;Python是一种高级编程语言&#xff0c;被广泛应用于Web开发、数据分析、人工智能等领域。 2. Python的优点是什么&#xff1f; Python的优点包括&#xff1a; 简单易学&#xff1a;Python语法简单&#xff0c;易于上手&#xff…

chatgpt赋能Python-pythonfly

PythonFly介绍 PythonFly是一个功能丰富的Python Web框架&#xff0c;它提供了快速开发Web应用的工具和方法。PythonFly可以轻易扩展、分布式部署和最小化代码重复。PythonFly利用Python的清晰和简单的语法&#xff0c;让Web应用程序更容易阅读和维护。 PythonFly的特点 快速…

CSS图像填充文字(镂空文字效果 / 文字镂空效果)

先展示一下最终效果&#xff1a; 开始做 1. 搭建基本代码结构 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>CSS图像填充文字&#xff08;镂空文字效果&#xff09;</title></head><body><div cl…

20230520查找中国移动的APP在RK3566下调用UVC摄像头出错

20230520查找中国移动的APP在RK3566下调用UVC摄像头出错 2023/5/20 23:34 SDK&#xff1a;Android12RK3566平台 android12 UVC camera 没插摄像头&#xff0c;但是/dev/video0-13标号被占用&#xff0c;是啥原因导致的 板子上也没有摄像头 【板子没有接CSI/MIPI接口的I2C通道…

操作系统(持续更新)

操作系统的定义 操作系统&#xff08;operating system&#xff0c;OS&#xff09;是配置在计算机硬件上的第一层软件&#xff0c;是对硬 件系统的首次扩充&#xff0c;其主要作用是管理硬件设备&#xff0c;提高它们的利用率和系统吞吐量&#xff0c;并为 用户和应用程序提供一…

Spring Cloud 和3种架构分析 以及微服务的详细分析和示意图

目录 SpringCloud & SpringCloud Alibaba架构介绍 Spring Cloud 基本介绍 官方文档 提出问题, 引出微服务 单机架构 - 示意图 动静分离架构&#xff1a;静态缓存 文件存储 解析 分布式架构&#xff1a;业务拆分负载均衡 解析 微服务架构&#xff1a;使用Spring Clo…

UE C++ Windows平台调用讯飞语音合成接口

UE C Windows平台调用讯飞语音合成接口 环境设置调用讯飞语音接口回放语音数据输出EXE 环境设置 下载讯飞语音合成的Windows平台的C版本SDK&#xff0c;包含lib库文件和dll动态链接库在UE工程下新建一个ThirdParty/msc目录&#xff0c;将lib库文件和dll动态链接库放入其中[PRO…

mybatis是如何集成到spring的之托管mapper接口

前言 mybatis集成到spring可以参考spring mvc集成mybatis进行数据库访问 &#xff0c;其中mybatis集成到spring最重要的两个配置分别是SqlSessionFactoryBean和MapperScannerConfigurer&#xff0c;如下所示&#xff1a; <!--mybatis sqlSeesionFactory配置--><bean…