大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

news2024/12/23 1:07:22

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:

  • 消费组测试,消费者变动对消费的影响
  • 消费者的心跳机制
  • 消费者的相关配置参数

在这里插入图片描述

主题和分区

  • Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
  • Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
  • ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
  • Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。
    在这里插入图片描述

反序列化

  • Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
  • 消费者的反序列化器包括Key和Value。

自定义反序列化

如果要实现自定义的反序列化器,需要实现 Deserializer 接口:

public class UserDeserializer implements Deserializer<User> {


    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocate(data.length);
        buffer.put(data);
        buffer.flip();
        int userId = buffer.getInt();
        int usernameLen = buffer.getInt();
        String username = new String(data, 8, usernameLen);
        int passwordLen = buffer.getInt();
        String password = new String(data, 8 + usernameLen, passwordLen);
        int age = buffer.getInt();
        User user = new User();
        user.setUserId(userId);
        user.setUsername(username);
        user.setPassword(password);
        user.setAge(age);
        return user;
    }

    @Override
    public User deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}

消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:

  • 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
  • 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
  • ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
  • ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("=== 消费者拦截器 01 onConsume ===");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("=== 消费者拦截器 01 onCommit ===");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("消费者设置的参数");
        configs.forEach((k, v) -> {
            System.out.println(k + ", " + v);
        });
    }
}

位移提交

相关概念

  • Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
  • Consumer 需要为分配给它的每个分区提交各自的位移数据
  • 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
  • 位移提交:自动提交和手动提交
  • 位移提交:同步提交和异步提交

自动提交

Kafka Consumer后台提交

  • 开启自动提交 enable.auto.commit=true
  • 配置启动提交间隔:auto.commit.interval.ms,默认是5秒

位移顺序

自动提交位移的顺序:

  • 配置 enable.auto.commit=true
  • Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
  • 因此自动提交不会出现消息丢失,但是会重复消费

重复消费

重复消费的场景:

  • Consumer设置5秒提交offset
  • 假设提交offset后3秒发生了Rebalance
  • Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
  • 因为Rebalance发生前3秒的内的提交就丢失了

异步提交

  • 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
  • 该方法为同步操作 等待直到 offset 被成功提交才返回
  • 手动同步提交可以控制offset提交的时机和频率

位移管理

Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

重平衡

重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。

重平衡的出发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
  • 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡

为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。

避免重平衡

要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。

  • session.timeout.ms 规定超时时间是多久
  • heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
  • max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。

这里给出一些推荐参数的配置:

  • session.timeout.ms 设置为6秒
  • heaertbeat.interval.ms 设置2秒
  • max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1970738.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git的一些简单使用

下列内容适用于git初学者&#xff0c;从创建本地git仓库到提交的一个基本过程1. 1.创建git仓库 在想创建git仓库的路径下打开git bash&#xff0c;输入以下命令行创建仓库&#xff08;一般来说&#xff0c;我觉得直接在code workspace得地方创建git仓库就可以了&#xff0c;这…

acme.sh生成https证书

前言 SSL 价格并不便宜, 本节介绍如何使用 acme.sh 生成免费的 SSL 证书 证书生成原理 CA && Let’s Encrypt 证书颁发机构&#xff08;CA&#xff0c;Certificate Authority&#xff09;是一个负责颁发数字证书的实体。数字证书用于在互联网上验证实体的身份&…

注册或购买的谷歌账号的辅助邮箱是否需要设置?有什么用?设置的要点是什么?

今天早上&#xff0c;有个朋友联系到GG账号服务&#xff0c;问我谷歌账号辅助邮箱怎么用。说实在的这个问题有点抽象&#xff0c;哈哈。 然后我详细了解了一下&#xff0c;原来是这样的&#xff1a; 他的谷歌账号提示异常&#xff08;这个时候账号肯定是被停用了的&#xff09…

【Linux应用编程】Day12线程

线程 与进程类似&#xff0c;线程是允许应用程序并发执行多个任务的一种机制&#xff0c;线程参与系统调度&#xff1b; 事实上&#xff0c;系统调度的最小单元是线程、而并非进程。 ⚫ 线程的基本概念&#xff0c;线程 VS 进程&#xff1b; ⚫ 线程标识&#xff1b; ⚫ 线…

电脑上有什么好用的记笔记软件吗?试试这3款笔记软件,功能丰富又实用

笔记软件千千万&#xff0c;日常使用方便最关键&#xff01;&#xff01; 推荐3个各有亮点的笔记软件&#xff0c;不止是记笔记这么简单&#xff1a; 1、FlowUs 推荐指数&#xff1a;☆☆☆☆☆ 关键词&#xff1a;文档笔记软件 下载链接>>flowus.cn FlowUs是一款在…

ADI - 通过5 V至24 V输入提供双极性、双向DC-DC流入和流出电流

大部分电子系统都依赖于正电压轨或负电压轨&#xff0c;但是有些应用要求单电压轨同时为正负电压轨。在这种情况下&#xff0c;正电源或负电源由同一端子提供&#xff0c;也就是说&#xff0c;电源的输出电压可以在整个电压范围内调节&#xff0c;并且可以平稳转换极性。例如&a…

【mars3d】实现线面内插值计算效果

面插值计算效果展示&#xff1a; &#xff08;离屏渲染方式&#xff09;面插值效果展示&#xff1a; 面内插值计算插点效果展示&#xff1a; 线插值效果展示&#xff1a; &#xff08;离屏渲染方式&#xff09;高密度线内插值计算效果展示&#xff1a; 相关代码&#xff1a; i…

docker二进制包部署(带arm版自动部署包)

文章目录 1.概述2.Docker二进制包下载3.安装脚本制作4.安装5.卸载6.注意事项7.分享一个arm版自动部署安装包8.懒人 X86 版安装包 1.概述 最近需要在Linux上部署docker&#xff0c;于是自己做了一个自动部署包。脚本的写法不区分X86或arm&#xff0c;通用的。 2.Docker二进制包…

网络安全和数据安全到底有什么区别?(非常详细)零基础入门到精通,收藏这一篇就够了

随着信息技术的迅猛发展&#xff0c;网络安全和数据安全已经成为当今社会不可忽视的重要议题。两者在保障信息系统安全、防范数据泄露和保障用户权益方面起着至关重要的作用。然而&#xff0c;尽管网络安全与数据安全在某些方面有着密切的联系&#xff0c;但它们在定义、目标和…

“八股文”:程序员的福音还是梦魇?

——一场关于面试题的“代码战争” 在程序员的世界里&#xff0c;“八股文”这个词儿可谓是“如雷贯耳”。不&#xff0c;咱们可不是说古代科举考试中的那种八股文&#xff0c;而是指程序员面试中的那些固定套路的题目。如今&#xff0c;各大中小企业在招聘程序员时&#xff0…

11.2.0.4 ADG故障 LGWR (ospid: 30945):terminating the instance due to error 4021

11.2.0.4 ADG无法连接&#xff0c;查看数据库为关闭状态&#xff0c;重新启动实例&#xff0c;应用日志后即可正常同步数据并打开到只读模式。 查看alert日志发现有以下报错&#xff1a; 0RA-04021:timeout occurred while waiting to lock obiectLGWR (ospid: 30945):termi…

矩阵、向量、张量 一文彻底理清!

矩阵&#xff1a;可理解为二维数组、二维张量 向量Vector&#xff1a;是只有一列的矩阵 张量&#xff1a;是矩阵向任意维度的推广。 机器学习经常会用到张量做变换&#xff0c;所以下文重点介绍张量。 可以通过.ndim查看numpy数据的张量维度。张量的维度&#xff08;dimens…

【熊猫派对】

游戏简介 熊猫派对是一款滑稽打闹游戏&#xff0c;玩法容易上手简单&#xff0c;游戏中玩家将操控自己的熊猫人&#xff0c;与其他对手对战&#xff0c;重拳、飞脚甚至还有各种各样的武器都可用来击败你的对手。 游戏特色 1、滑稽角色 网络超火的滑稽角色&#xff0c;从表情包…

JNI原理是什么?JNI在DDS binding JAVA中/DDS移植android平台中有什么作用?

1 JNI是什么2 如何在JAVA中调用C/C方法&#xff08;通过JNI调用的demo&#xff09;java中声明一个本地native方法生成JNI头文件Java native方法转换成C的规则与语法说明C实现的native方法本地实现以及.o .dll库的生成查看hello.dll库中的函数运行一下HelloJNI JNI在DDS移植andr…

微信小程序 - 自定义计数器

微信小程序通过自定义组件&#xff0c;实现计数器值的增加、减少、清零、最大最小值限定、禁用等操作。通过按钮事件触发方式&#xff0c;更新计数器的值&#xff0c;并修改相关联的其它变量。通过提升用户体验&#xff0c;对计数器进行优化设计&#xff0c;使用户操作更加便捷…

PHP教育培训小程序系统源码

&#x1f680;【学习新纪元】解锁教育培训小程序的无限可能✨ &#x1f4da; 引言&#xff1a;教育培训新风尚&#xff0c;小程序来引领&#xff01; Hey小伙伴们&#xff0c;是不是还在为找不到合适的学习资源而烦恼&#xff1f;或是厌倦了传统教育模式的单调&#xff1f;今…

Monaco 使用 SignatureHelpProvider

Monaco 中 SignatureHelpProvider 是方法提示说明&#xff0c;当敲入方法名时&#xff0c;系统会提示方法名称和对应的参数信息。效果如下&#xff1a; 通过 registerSignatureHelpProvider 实现 SignatureHelpProvider 处理函数。 实现 signatureHelpTriggerCharacters 和 pro…

我们如何优化 Elasticsearch Serverless 中的刷新成本

作者&#xff1a;来自 Elastic Francisco Fernndez Castao, Henning Andersen 最近&#xff0c;我们推出了 Elastic Cloud Serverless 产品&#xff0c;旨在提供在云中运行搜索工作负载的无缝体验。为了推出该产品&#xff0c;我们重新设计了 Elasticsearch&#xff0c;将存储与…

深入了解下 Markdown 的原理

前面讲了 Markdown 的基本语法&#xff0c;常见的 Markdown 编辑器&#xff0c;在继续讲解其他知识之前&#xff0c;有必要稍微深入了解一下 Markdown 与 HTML 的关系。 ‍ ‍ HTML 简介 什么是 HTML&#xff1f;其实它也是标记语言的一种&#xff0c;但是比 Markdown 更重…

Java面试题--JVM大厂篇之深入分析Parallel GC:从原理到优化

目录 引言: 正文&#xff1a; 1. Parallel GC原理解析 2. Parallel GC关键参数配置 3. 常见调优场景与技巧 4. 监控与日志分析 结束语&#xff1a; 引言: 在Java应用程序中&#xff0c;垃圾回收&#xff08;Garbage Collection, GC&#xff09;扮演着至关重要的角色。对…