消息消费过程

news2025/1/12 12:06:41

前言

本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。

消费者与消费组

Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。

开发流程

  1. 配置consumer参数并创建consumer实例;
  2. 订阅主题;
  3. 拉取消息并消费;
  4. 提交消费偏移量;
  5. 关闭consumer;
class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // bootstrap server
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        // group.id, 如果当前consumer需要加入到某个group中, 否则自成一个group
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
        // 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。
        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");

        // 自动提交offset设置, 样例中为手动提交
        // props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");
        // 自动提交offset的时间间隔
        // props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");

        // offset reset配置
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");
        // key和value的deserializer配置
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        boolean running = true;
        while(running) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record : records) {
                // 消费消息
            }
            // 消费成功提交offset
            consumer.commitSync();
        }

        consumer.close();
    }
}

主题与分区

每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。

例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。

反序列化

consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。

interface Deserializer {
    T deserialize(String topic, byte[] data);
    void close();
}

主题订阅

  1. 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
  2. 订阅多个特定主题, subscribe(collection);
  3. 订阅某种模式的主题, subscribe(pattern);
  4. 订阅某个主题的特定partition, assign(partition);
  5. 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;

消息获取

  1. 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
  2. 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
  3. 拉模式: client主动发起消息请求, 从server端拉取消息;
  4. 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)

内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。

位移提交

位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
请添加图片描述

  1. 消息是按照partition存储的;
  2. 消息写入partition时, offset单调递增;
  3. 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;

因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;

提交方式说明优缺点
自动提交(默认方式) Kafka Client周期性地提交偏移量优点是简单, 确定是重复消费和丢失风险
手动提交由用户主动提交偏移量优点是可细粒度管理, 缺点是相对复杂

自动提交

自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
在这里插入图片描述

  1. consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
  2. 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  3. 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  4. 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;

手动提交

也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |

指定offset

消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?

  1. 由参数auto.offset.reset设定默认行为
    | 参数值 | 行为 |
    |----|----|
    | earliest | 从分区第一条消息的offset开始 |
    | latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
    | none | 程序逻辑自定义, 如果未设置则抛出异常 |

  2. 程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。

  3. 由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。

再平衡

再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。

Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。

消费者拦截器

Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。

拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

腾讯云服务器公网带宽速度怎么样?上传下载实测!

腾讯云服务器公网带宽下载速度计算&#xff0c;1M公网带宽下载速度是128KB/秒&#xff0c;5M带宽下载速度是512KB/s&#xff0c;腾讯云10M带宽下载速度是1.25M/秒&#xff0c;腾讯云百科txybk.com来详细说下腾讯云服务器不同公网带宽实际下载速度以及对应的上传速度对照表&…

二十九、W5100S/W5500+RP2040树莓派Pico<Web socket Server>

文章目录 1 前言2 简介2 .1 什么是WebSocket协议&#xff1f;2.2 WebSocket协议工作原理2.3 WebSocket协议优点2.4 WebSocket应用场景 3 WIZnet以太网芯片4 WebSocket示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接…

鸿蒙:Harmony开发基础知识详解

一.概述 工欲善其事&#xff0c;必先利其器。 上一篇博文实现了一个"Hello Harmony"的Demo&#xff0c;今天这篇博文就以Demo "Hello Harmony" 为例&#xff0c;以官网开发文档为依据&#xff0c;从鸿蒙开发主要的几个方面入手&#xff0c;详细了解一下鸿…

macOS 后台项目已添加 “Google Updater添加了可在后台运行的项目。你可以在“登陆项”设置中管理

文章目录 Intro解决查看三个文件夹分析 & 操作确认结果是否生效 Intro 我的macbook上经常弹出这样的通知狂&#xff1a; macOS 后台项目已添加 “Google Updater添加了可在后台运行的项目。你可以在“登陆项”设置中管理 不胜其扰&#xff0c;终于决定禁用它。以下为方法…

算法设计与分析【期中+期末复习知识点总结】(持续更新)

第1章&#xff1a;算法概述 算法&#xff1a;具有输入、输出、确定性、有限性。 程序&#xff08;算法数据结构程序&#xff09;&#xff1a;具有输入、输出、确定性&#xff08;注意&#xff1a;程序可以不满足有限性&#xff0c;如操作系统是在无限循环中执行的程序&#x…

es的优势

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目…

教程:使用 Keras 优化神经网络

一、介绍 在 我 之前的文章中&#xff0c;我讨论了使用 TensorFlow 实现神经网络。继续有关神经网络库的系列文章&#xff0c;我决定重点介绍 Keras——据说是迄今为止最好的深度学习库。 我 从事深度学习已经有一段时间了&#xff0c;据我所知&#xff0c;处理…

橱柜的装修干货|板材、五金、高度、配色4个方面。福州中宅装饰,福州装修

引言 橱柜的装修干货。 橱柜是厨房的核心&#xff0c;一个好的橱柜能让厨房变得实用又美观。以下是关于橱柜装修的几个问题解答。 1. 橱柜的柜门常用的板材有哪些&#xff1f; 橱柜的柜门常用的板材有实木板、防火板、烤漆板、包复框、PVC板、膜压板等。不同板材有不同的特点…

庖丁解牛:NIO核心概念与机制详解 01

文章目录 Pre输入/输出Why NIO流与块的比较通道和缓冲区概述什么是缓冲区&#xff1f;缓冲区类型什么是通道&#xff1f;通道类型 NIO 中的读和写概述Demo : 从文件中读取1. 从FileInputStream中获取Channel2. 创建ByteBuffer缓冲区3. 将数据从Channle读取到Buffer中 Demo : 写…

五、hdfs常见权限问题

1、常见问题 2、案例 &#xff08;1&#xff09;问题 &#xff08;2&#xff09;hdfs的超级管理员 &#xff08;3&#xff09;原因 没有使用Hadoop用户对hdfs文件系统进行操作。 在Hadoop文件系统中&#xff0c;Hadoop用户相当于Linux系统中的root用户&#xff0c;是最高级别用…

win10电脑无法联网,设置IPv4,点击属性无法打开,闪退

win10设置IPv4&#xff0c;点击属性无法打开&#xff0c;闪退 问题:win10设置IPv4&#xff0c;点击属性无法打开&#xff0c;闪退 问题:win10设置IPv4&#xff0c;点击属性无法打开&#xff0c;闪退 第1步&#xff1a;用管理员打开cmd命令窗口&#xff0c;然后输入下面的命令&…

JVM的运行时数据区

Java虚拟机&#xff08;JVM&#xff09;的运行时数据区是程序在运行过程中使用的内存区域&#xff0c;主要包括以下几个部分&#xff1a; 程序计数器虚拟机栈本地方法栈堆方法区运行时常量池直接内存 不同的虚拟机实现可能会略有差异。这些区域协同工作&#xff0c;支持Java…

BUUCTF snake 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 下载附件&#xff0c;解压得到一张snake的图片。 密文&#xff1a; 这里有一张蛇的图片&#xff0c;本人害怕不敢放&#xff0c;想看自己下载附件解压。&#xff08;吐槽一下&#xff0c;我做这道题&#xff0c;全…

每天一点python——day71

#每天一点Python——71 #格式化字符串在Python中&#xff0c;你可以使用格式化字符串来动态地插入变量的值、表达式的结果等到字符串中。 如图&#xff1a;xxx部分需要不断变化&#xff0c;再和原文拼接 如上图所示这是一个类似于字符串拼接的操作。 因为字符串拼接操作会产生很…

【算法挨揍日记】day26——53. 最大子数组和、918. 环形子数组的最大和

53. 最大子数组和 53. 最大子数组和 题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 解题思路&#xff1a; 状态…

公司会倒闭,但大模型肯定不会

咋玩抖音的我&#xff0c;前几天在抖音上发了一张图片&#xff0c;没想到竟然有1000多的播放量。 当然这个播放量不算高&#xff0c;甚至在抖音的体系里属于很低的&#xff0c;但是比我预料的可能只有个位数的播放量是高了不少。 这张图片是我用某国产 AI 软件生成的&#xff…

R语言和RStudio的下载安装(非常简便舒适)

目录 R语言和RStudio的关系R语言和Tableau下载R语言进入官网选择清华镜像源Download R for Windows选择base版本开始下载进行安装配置环境变量检查是否安装成功 下载RStudio进入官网点击下载进行安装检查是否安装成功打开选择R语言环境成功打开显示四个工作区 R语言和RStudio的…

CDN是什么,能起到什么作用

随着互联网的快速发展&#xff0c;用户对于快速、稳定、高效的互联网体验的需求日益增长。为了满足这一需求&#xff0c;内容分发网络&#xff08;CDN&#xff09;应运而生&#xff0c;并在近年来得到了广泛应用。CDN通过在全球范围内部署大量的服务器和网络节点&#xff0c;实…

2023 鹏程杯

前言 笔者没有参加此次比赛&#xff0c;由于团队后面会复现此次比赛&#xff0c;所以笔者在此进行复现记录。 silent 考点: 栈溢出 ret2csu 栈迁移 保护: 开了 Full RELRO 和 NX, 禁掉了 execve/execveat 系统调用 漏洞分析 一个裸的栈溢出, 但是没有输出函数可以泄漏 …

MySQL进阶_8.数据库其他调优策略

文章目录 第一节、数据库调优的步骤1.1、选择合适的DBMS1.2、优化表设计1.3、优化逻辑查询1.4、优化物理查询1.5、使用 Redis 或 Memcached 作为缓存1.6、库级优化 第二节、优化MySQL服务器第三节、优化数据库结构 第一节、数据库调优的步骤 1.1、选择合适的DBMS 如果对事务性…