Kafka学习笔记(高级篇)

news2024/11/24 9:02:17

目录

高级功能

高效读写

涉及技术

ZooKeeper

自定义拦截器

监控

延迟消费

一些改进手段


高级功能

高效读写

涉及技术

  • 高吞吐量:Kafka 每秒可以处理数百万消息。这是因为 Kafka 消息的处理是以批处理(Batching)的方式来完成的,生产者可以将多个消息一起发送到 Kafka 集群,以减少网络开销以及加速处理速度。
  • 低延迟:Kafka 利用磁盘存储加缓存,可以在微秒级别内完成消息处理。Kafka 具有高效的消息传递能力,也可以在微秒级别内完成消息处理。这是由于 Kafka 的消息存储设计是基于磁盘的,但同时消息缓存也是放在内存里的。也就是说,在处理消息时,Kafka 集群会先将消息写入到磁盘中进行持久化存储,并且在内存中缓存一份消息以便进行更快的消息传递和读取。
  • 分布式架构:Kafka 采用分布式的架构设计,可以通过水平扩展增加集群规模和负载容量。集群中的每个节点都可以独立完成消息处理和写入,可以有效地提高整个系统的吞吐量。
  • 高可靠性:Kafka 在存储消息时,使用了多副本机制,可以保证消息的高可靠性。当消息发送失败或者其中一个节点失效时,可以通过复制副本来实现自动故障转移,以确保消息的可靠性、可用性与一致性。
  • 顺序写:Kafka 内部的消息存储结构是一个连续的、顺序写入的日志文件(Log File)集合,也称“分区”(Partition)。分区中的每一条消息都被分配一个唯一的偏移量(Offset),并且保留在磁盘上直到被消费。通过这种消息存储方式,Kafka 可以实现高效的顺序写入操作。因为 Kafka 可以将流式的消息按顺序追加到 Log 文件的末尾,这避免了随机写入所产生的磁盘寻址和寻道时间,从而大大提高了写入性能,并降低了延迟。此外,由于只有新的消息会追加到 Log 文件中,而没有数据被修改或删除,因此,读取数据时,Kafka 也可以通过顺序扫描磁盘获取最新的消息,这样也大大提高了读取数据的效率。
  • 数据压缩:Kafka 提供了数据压缩功能,可以将传输的消息进行压缩和解压,减少了磁盘和网络带宽的使用。
  • 零拷贝:Kafka 零拷贝技术可以避免在传输数据时进行数据缓冲和复制,从而减少了 CPU 和内存的使用,提高了性能。

ZooKeeper

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。

Partition的Leader的选举过程

Partition的Leader选举流程

自定义拦截器

拦截器原理

Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:

  • configure(Map<String, ?> configs):获取配置信息和初始化数据时调用
  • onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。
  • onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close():关闭inteceptor,主要用于执行资源清理工作。

Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。

自定义加入时间戳拦截器

/**
 * @author caoduanxi
 * @Date 2021/1/13 14:15
 * @Motto Keep thinking, keep coding!
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                "TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
    }
    // 其余方法省略
}

自定义消息发送统计拦截器

/**
 * @author caoduanxi
 * @Date 2021/1/13 14:18
 * @Motto Keep thinking, keep coding!
 */
public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 输出结果,结束输出
        System.out.println("Sent successful:" + successCounter);
        System.out.println("Sent failed:" + errorCounter);
    }
}

在CustomProducer中加入拦截器

// 加入拦截器
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

注意:拦截器的close()方法是收尾的,一定要调用Producer.close()方法,否则拦截器的close()方法不会被调用。

监控

Eagle

Eagle是开源的额可视化和管理软件,允许查询、可视化、提醒和探索存储在任何地方的指标,简而言之,Eagle为您提供了将Kafka集群数据转换为漂亮的图形和可视化的工具。

实质: 一个运行在tomcat上的web应用

延迟消费

kafka目前默认可支持1h以内的延迟消费。

使用方式:consumer启动参数增加 --delay-time-seconds n 设置消费延迟时间,单位秒,默认不延迟消费。仅能拉取到消费延迟时间之前的消息。

注意:此参数默认限制最大值为3600s,超过限制可能导致consumer启动失败。如有调整最大延迟时间的需求,请联系李锦涛(KIM:lijintao)

注意:消息拉取可能有分钟级别的误差。

注意:由于目前每4kb数据构建一次时间索引,如果最后一批数据的size不够4kb,可能导致这些数据不能被延迟消费到。

一些改进手段

  • Rebalance优化
  • Federation架构应用
  • 存算分离等等

相关推荐文章:

Kafka学习笔记(基础篇)_Cat凯94的博客-CSDN博客

看完这篇Kafka,你也许就会了Kafka_心的步伐的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/745717.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python+unittest+requests+HTMLRunner搭建接口测试框架,执行用例请求多个不同请求方式的接口

问题描述&#xff1a; 搭建接口测试框架&#xff0c;执行用例请求多个不同请求方式的接口 实现步骤&#xff1a; ① 创建配置文件config.ini&#xff0c;写入部分公用参数&#xff0c;如接口的基本url、测试报告文件路径、测试数据文件路径等配置项 1 [DATABASE] 2 data_addre…

一文带你快速了解如何在Linux上部署项目

文章目录 前言一、手工部署项目1.在IDEA中开发SpringBoot项目并打成jar包2. 将jar包上传到Linux服务器3. 输入指令启动SpringBoot程序4. 检查防火墙&#xff0c;确保80端口&#xff08;项目端口&#xff09;对外开放&#xff0c;访问SpringBoot项目5. 在windows访问项目6. 改为…

Layui选项卡Tab:完美实现网页内容分类与导航

目录 什么是Layui选项卡&#xff1f; Layui选项卡的作用 实现步骤 1、编写公共jsp&#xff08;header.jsp&#xff09; 2、jsp界面&#xff08;main.jsp&#xff09; 3、JS代码&#xff08;main.js&#xff09; 4、PermissionDao类的修改 5、最终运行结果 什么是Layui选…

数据备份与还原,(mysqldump,source)索引(index),创建视图(view)

一、备份与还原 /***************************样例表***************************/ CREATE DATABASE booksDB; use booksDB;CREATE TABLE books (bk_id INT NOT NULL PRIMARY KEY,bk_title VARCHAR(50) NOT NULL,copyright YEAR NOT NULL ); INSERT INTO books VALUES (11078…

【i阿极送书——第五期】《Python机器学习:基于PyTorch和Scikit-Learn》

系列文章目录 作者&#xff1a;i阿极 作者简介&#xff1a;数据分析领域优质创作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x1f44d;收藏&#x1f4c1;评论&#x1f4d2;…

【CANN训练营】Atlas 200I DK A2开发板运行ChatYuan-large对话机器人应用

环境介绍 PC 操作系统&#xff1a;Ubuntu 22 内存&#xff1a;32GB Python&#xff1a;3.8 开发板 华为Atlas 200I DK A2 内存&#xff1a;4G NPU&#xff1a;昇腾310B 环境准备 只需要一台Linux 系统的PC机即可 Python版本需要3.7、3.8、3.9 准备CANN ToolKit 下载CANN T…

Centos6.5 用户权限例题

例题一&#xff1a;为网站管理员创建一个FTP帐户webmaster&#xff0c;将其加入到ftp组中&#xff0c;其登录的主目录为WEB站点的主目录/var/www/jnds.net&#xff0c;设置其为系统帐户&#xff0c;但却没有登录系统的权限&#xff0c;备注该用户为“FTP User” 解&#xff1a…

【RocketMQ】005-Docker 部署 RocketMQ

【RocketMQ】005-Docker 部署 RocketMQ 文章目录 【RocketMQ】005-Docker 部署 RocketMQ一、部署1、拉取镜像MQ 镜像可视化平台镜像 2、创建挂载目录创建 nameserver 挂载目录创建 broker 目录创建 broker 配置文件目录 3、编辑配置文件4、启动服务启动 nameserver启动 broker启…

pytorch保存、加载和解析模型权重

1、模型保存和加载 主要有两种情况&#xff1a;一是仅保存参数&#xff0c;二是保存参数及模型结构。 保存参数&#xff1a; torch.save(net.state_dict()) 加载参数&#xff08;加载参数前需要先实例化模型&#xff09;&#xff1a; param torch.load(param.pth) net.load_…

AWS 中文入门开发教学 47- S3 - 基本的使用

知识点 S3 - 基本的使用方法实战演习 创建存储桶 阻止所有公网访问: 打开版本控制、添加标签: KMS是收费的: 创建成功: 上传文件 选择存储类:

这是中国人工智能AI激情澎湃的一周

融资 贝联珠贯完成 5000 万元天使轮融资&#xff0c;业务涵盖 AI 型算力市场据投中网报道&#xff0c;近日&#xff0c;云资源管理服务提供商浙江贝联珠贯宣布完成 5000 万元天使轮融资&#xff0c;由元璟资本、红杉中国种子基金和舟轩股权投资。 盛大网络 CEO 陈天桥再投 1…

springboot就业信息管理系统

本次设计任务是要设计一个就业信息管理系统&#xff0c;通过这个系统能够满足就业信息管理功能。系统的主要功能包括&#xff1a;首页&#xff0c;个人中心&#xff0c;学生管理&#xff0c;导师管理&#xff0c;企业管理&#xff0c;招聘信息管理&#xff0c;应聘信息管理&…

DMA是一个超级简化版的cpu吗?

来自群友的讨论 我的理解是DMA某种程度相当于一个CPU是因为DMA拥有访问其他地址空间的权利。 从系统角度考虑&#xff0c;对整个系统的观测者一般CPU DSP GPU DMA是一个级别&#xff0c;其他都是slave。cache一致性POC是要保证所有观测者&#xff0c;包括DMA观测到相同数据。 …

【学习bubbliiiing代码-2】从txt中获取类别名称以及类别数量

本系列主要用于自我学习&#xff0c;参考的为bubbliiiing的代码 写一个优雅的&#xff1a;从txt文件中获得类别名与类别数的函数&#xff0c;如下&#xff1a; #---------------------------------------------------# # 获得类别名与类别数 #-----------------------------…

Python爬虫:利用JS逆向抓取携程网景点评论区图片的下载链接

Python爬虫:利用JS逆向抓取携程网景点评论区图片的下载链接 1. 前言2. 实现过程3. 运行结果 1. 前言 文章内容可能存在版权问题&#xff0c;为此&#xff0c;小编不提供相关实现代码&#xff0c;只是从js逆向说一说到底怎样实现这个的过程&#xff0c;希望能够帮助到那些正在做…

主动配电网故障恢复的重构与孤岛划分统一模型(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

SIP协议学习(2)

文章目录 注册&#xff08;REGISTER&#xff09;1、AOR和Contact区别2、注册概述3、注册与定位服务4、注册超时处理5、注册消息6、多Contact地址处理7、下期预告 注册&#xff08;REGISTER&#xff09; 1、AOR和Contact区别 在学习注册之前&#xff0c;首先区分一下AOR和Cont…

Threads上线5天用户增至1亿,Threads软件常见问题百问百答

7月10日&#xff0c;脸书&#xff08;Facebook&#xff09;母公司Meta旗下新应用程序Threads上线的第5天&#xff0c;其用户数量已经超过1亿。这一增长速度打破聊天机器人ChatGPT的纪录——推出两个月内活跃用户量才破亿。 Threads或成为史上用户数增长速度最快的消费者应用。 …

Mysql数据库基础和增删改查操作

目录 一、数据库基本概念 二、数据库类型和常用数据库 1.关系型数据库 2.非关系型数据库 三、数据库的数据类型 四、SQL语句 1.简介 2.分类 五、SQL语句的使用 1.数据库操作 &#xff08;1&#xff09;创建数据库 ​编辑 &#xff08;2&#xff09;查看数据库 &am…

mac MySQL修改密码

简介&#xff1a; MySQL是一种常用的关系型数据库管理系统。在某些情况下&#xff0c;您可能需要关闭MySQL服务或修改root密码。本文将向您展示如何执行这些操作的步骤。 步骤1&#xff1a;关闭MySQL服务 打开MySQL软件并关闭它。 或者使用以下命令关闭MySQL服务&#xff1a…