熟悉Kafka组成模块、Kafka消息提交的方式及优缺点

news2025/1/9 20:22:55

1. Kafka概念

1.1 Kafka组成模块

Kafka其实是一款基于发布与订阅模式的消息系统,如果按常理来设计,大家是不是把消息发送者的消息直接发送给消息消费者?但Kafka并不是这么设计的,Kafka消息的生产者会对消息进行分类,再发送给中间的消息服务系统,而消息消费者通过订阅某分类的消息去接受特定类型的消息。

其实这么设计的目的也是为了满足大量业务消息的接入,要是单一的消息发送和接收,那开个进程的管道通信就可以了。另外如果大家对设计模式的发布/订阅模式熟悉的话,对Kafka的设计理念会更容易理解。

总的来说,Kafka由五大模块组成,大家要理解好这些模块的功能作用:消息生产者、消息消费者、Broker、主题Topic、分区Partition

(1)消息生产者

消息生产者是消息的创造者,每发送一条消息都会发送到特定的主题上去。

(2)消息消费者

消息生产者和消费者都是Kafka的客户端,消息消费者顾名思义作为消息的读取者、消费者。同时Kafka很灵活的一点是,一个消费者可以订阅多个主题,而且一个主题消息也可被不同消息分组的多个消费者处理。这就给我们变化多端的业务设计带来了众多可能性了,方便大家自由发挥。

(3)Broker

孤零零部署在Linux的Kafka服务器被称为Broker,也就是我上文提到的中间的消息服务系统,大家不要小瞧他,单台Broker可以轻松处理每秒百万级的消息量。Broker日常工作内容就是接收消息生产者的消息,为每条消息设置偏移量,最后提交到磁盘进行持久化保存。

(4)主题Topic

上文我们知道Kafka的消息是有分类的,而分类的标识就是主题Topic。大家可以看下具体代码落地会更容易理解,消息生产者Producer发送给clock-topic主题,消息消费者监听消费clock-topic主题下的消息。

// 消息生产者
public class Producer implements ApplicationRunner {
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        RBlockingQueue<Clock> blockingFairQueue = redissonClient.getBlockingQueue("delay_queue");

        while (true) {
            Clock clock = blockingFairQueue.take();
            kafkaTemplate.send("clock-topic", "key", clock.toString());
            log.info("time out: {} , clock created: {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), clock.getTime());
        }
    }
}
    // 消息消费者
    @KafkaListener(topics = "clock-topic", groupId = "kafka-group")
    public void listener(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("listener get message: " + record.value());
        ack.acknowledge();
    }
(5)分区Partition

每一个主题下的消息都需要提交到Broker的磁盘里,假如我们搭建了三个Broker节点组成的Kafka集群,一般情况下同一个主题下的消息会被分到三个分区进行存储。说到这,由于顺序发送的消息是存储在不同分区中,我们无法保证消息被按顺序消费,只能保证同一个分区下的消息被顺序消费.

1.2 分区

消费分区的作用主要就是为了提高Kafka处理消息的吞吐量,谁叫Kafka设计之初就是作为一款高吞吐量、高可用、可扩展的应用程序。

假如一个topic下有N个分区、N个消费者,每个分区会发送消息给对应的一个消费者,这样N个消费者就可以负载均衡地处理消息。

同时消息生产者会发送消息给不同分区,每个分区又是属于不同的Broker,这让Broker集群平坦压力,大大提高了Kafka的吞吐量。

大家还需要注意一点,如果一个主题下消费者的数量超过分区的数量,超过数量的消费者是会被闲置的,一般N个分区最多搭配N个消费者。

1.3 异步回调

当我们调用send()异步发送消息时,可以指定一个回调函数,该函数会等Broker服务器响应时触发。如下源码所示,我们可以为响应参数ListenableFuture添加一个回调函数实现callback

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    public interface ListenableFuture<T> extends Future<T> {
       void addCallback(ListenableFutureCallback<? super T> callback);
    }

那这个回调函数有什么作用?我们一般用来进行异常日志的记录

Kafka的异步提交消息相比同步提交来说不需要在Broker响应前阻塞线程,这也在一定程度提高了消息的处理速度。但异步提交我们是不知道消息的消费情况的,此时就可以通过Kafka提供的回调函数来告知程序异常情况,从而方便程序进行日志记录。

2. 消费者消息提交

2.1 提交消息的方式

手动提交和自动提交是Kafka两种客户端的偏移量提交方式,提交方式的配置选项是enable.auto.commit,默认情况下该选项为ture。

偏移量提交是什么?大家可以理解为消费者通知当前最新的读取位置给到分区,也就是告诉分区哪些消息已消费了。

如果enable.auto.commit为true代表提交方式为自动提交,默认为5秒的提交时间间隔。每过5秒,消费者客户端就会自动提交最大偏移量。

如果enable.auto.commit为false代表提交方式为手动提交,我们需要让消费者客户端消费程序执行后提交当前的最大偏移量。

2.2 提交方式的优缺点

(1)自动提交

自动提交比较方便,我们甚至都不需要配置提交方式,不过可能会导致消息丢失或重复消费。

如果刚好到了5秒的时间间隔自动提交了最大偏移量,此时正在执行消息程序的消费者客户端崩溃了,就会导致消息丢失

如果成功消费了消息,下一秒消费者应该自动提交,但如果此时消费者客户端奔溃,就会导致其他分区的消费者重复消费

(1)手动提交

手动提交需要消费者客户端在消费消息后手动提交消息,手动提交的方式又分为同步提交、异步提交。

手动提交是同步提交的话,在Broker对请求做出回应之前,客户端会一直阻塞,这样的话限制应用程序的吞吐量

手动提交是异步提交的话,不会有吞吐量的问题。不过消费者客户端发送给Broker偏移量之后,不会管Broker有没有收到消息。这种情况就要采用上文我提到的消息生产者异步回调来进行日志记录,有了日志记录方便后续bug排查,工作效率妥妥的高😏。

 你好,我是胡广。 致力于为帮助兄弟们的学习方式、面试困难、入职经验少走弯路而写博客 🌹🌹🌹 坚持每天两篇高质量文章输出,加油!!!🤩

 如果本篇文章帮到了你 不妨点个赞吧~ 我会很高兴的 😄 (^ ~ ^) 。想看更多 那就点个关注     吧 我会尽力带来有趣的内容 。

 😎感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以      给我留言咨询,希望帮助更多的人

更多专栏:
📊 Java设计模式宝典:从入门到精通(持续更新)

📝 Java基础知识:GoGoGo(持续更新)

⚽ Java面试宝典:从入门到精通(持续更新)

🌟 程序员的那些事~(乐一乐)

🤩 Redis知识、及面试(持续更新)

🚀 Kafka知识文章专栏(持续更新)

🎨 Nginx知识讲解专栏(持续更新)

📡 未完待续。。。

🎯 未完待续。。。

🔍 未完待续。。。

感谢订阅专栏 三连文章

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2116183.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LVI-SAM】激光雷达点云处理点云帧投影LIO-SAM 之ImageProjection实现细节

【LVI-SAM】激光雷达点云处理点云帧投影LIO-SAM 之ImageProjection实现细节 1. ImageProjection激光雷达点云预处理算法1.0 总结&#xff1a;1.1 功能概述&#xff1a;1.2 算法流程&#xff1a; 2. ImageProjection激光雷达点云预处理算法数学推倒3. ImageProjection激光雷达点…

安卓玩机工具------小米工具箱扩展工具 小米机型功能拓展

小米工具箱扩展版 小米工具箱扩展版 iO_Box_Mi_Ext是由晨钟酱开发的一款适用于小米&#xff08;MIUI&#xff09;、多亲&#xff08;2、2Pro&#xff09;、多看&#xff08;多看电纸书&#xff09;的多功能工具箱。该工具所有功能均可以免root实现&#xff0c;使用前&…

图解TCP三次握手|深度解析|为什么是三次

写在前面 这篇文章我们来讲解析 TCP三次握手。 TCP 报文段 传输控制块TCB&#xff1a;存储了每一个连接中的一些重要信息。比如TCP连接表&#xff0c;指向发送和接收缓冲的指针&#xff0c;指向重传队列的指针&#xff0c;当前的发送和接收序列等等。 我们再来看一下TCP报…

[高级人工智能 开放性调研] 近两年来[2022~2024]人工智能应用进展重要案例介绍

文章目录 [高级人工智能 开放性调研] 近两年来[2022-2024]人工智能应用进展重要案例介绍写在前面1. AIGC1.1 LLM | 大语言模型问答系统式的生成式AI文档解读——KimiChat代码生成——Cursor 1.2 AI绘画\视频生成 | Stable Diffusion | OpenAI SoraStable DiffusionOpenAI Sora …

模拟网络丢包常用方法以及工具

文章目录 背景常用方法代码实现使用方法测试代码 使用网络流量控制工具 常用工具Clumsy 背景 在软件开发过程中&#xff0c;经常需要模拟不同的网络环境来测试应用在不同条件下的表现。 这些模拟可以采用多种方式进行&#xff0c;包括在代码中实现随机丢包、随机延时、乱序&am…

《JavaEE进阶》----12.<SpringIOCDI【扫描路径+DI详解+经典面试题+总结】>

本篇博客主要讲解 扫描路径 DI详解&#xff1a;三种注入方式及优缺点 经典面试题 总结 五、环境扫描路径 虽然我们没有告诉Spring扫描路径是什么&#xff0c;但是有一些注解已经告诉Spring扫描路径是什么了 如启动类注解SpringBootApplication。 里面有一个注解是componentS…

【Leetcode152】乘积最大子数组(动态规划)

文章目录 一、题目二、思路三、代码 一、题目 二、思路 &#xff08;0&#xff09;读懂题意&#xff1a;题目的“连续”是指位置的连续&#xff0c;而不是说数字的连续&#xff0c;这是个大坑。 &#xff08;1&#xff09;确定状态&#xff1a;定义两个状态来记录当前子数组的…

Windows本地制作nginx证书

OpenSSL 是一个用于生成和管理 SSL/TLS 证书的工具。下载并安装 OpenSSL Select Additional Tasks页面勾选 The OpenSSL binaries (/bin) directory 将OpenSSL的bin目录配置到path中 开命令提示符&#xff08;cmd&#xff09;或 PowerShell。运行以下命令生成一个新的私钥和自…

哈希表的封装和位图

文章目录 2 封装2.1 基础框架2.2 迭代器(1)2.3 迭代器(2) 3. 位图3.1 问题引入3.2 左移和右移&#xff1f;3.3 位图的实现3.4 位图的题目3.5 位图的应用 2 封装 2.1 基础框架 文章 有了前面map和set封装的经验&#xff0c;容易写出下面的代码 // UnorderedSet.h #pragma on…

WireShark抓包软件介绍和安装

文章目录 一、WireShark软件介绍1. **概述**2. **主要功能**3. **使用场景**4. **安装和使用**5. **优点和限制**6. **结论** 二、WireShark的安装三、WireShark的基本使用1. **混杂模式&#xff08;Promiscuous Mode&#xff09;****概述****工作原理****应用场景****启用方式…

STM32F407VET6开发板RT-Thread memheap 内存堆的适配

相关文章 STM32F407VET6开发板RT-Thread的移植适配 STM32F407VET6开发板RT-Thread MSH 串口的适配 环境 STM32F407VET6 开发板&#xff08;魔女&#xff09;&#xff0c;http://www.stm32er.com/ Keil MDK5&#xff0c;版本 5.36 memheap 内存堆 RT-Thread 支持 memheap …

数据结构基础讲解(二)——线性表之单链表专项练习

本文数据结构讲解参考书目&#xff1a; 通过网盘分享的文件&#xff1a;数据结构 C语言版.pdf 链接: https://pan.baidu.com/s/159y_QTbXqpMhNCNP_Fls9g?pwdze8e 提取码: ze8e 上一节我讲了线性表中顺序表的定义以及常用的算法&#xff0c;那么这节我将继续讲解顺序表中的链式…

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们…

数据结构基础详解(C语言): 树与二叉树的应用_哈夫曼树与哈夫曼曼编码_并查集_二叉排序树_平衡二叉树

文章目录 树与二叉树的应用1.哈夫曼树与哈夫曼曼编码1.1 带权路径长度1.2 哈夫曼树1.2.1 哈夫曼树的构造1.3 哈夫曼编码 2.并查集2.1 并查集的三要素2.1.1 并查集的逻辑结构2.1.2 并查集的存储结构 2.2 并查集的优化2.2.1 初步优化&#xff08;并操作优化&#xff09;2.2.2 终极…

flink wordcount

Maven配置pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/P…

mybatis-plus使用@EnumValue搭配shardingsphere报错“getObject with type”

目录 一、背景二、修改方案三、如何让修改的TypeHandler生效1、在TableField中配置TypeHandler2、考虑直接在TypeHandlerRegistry注册该枚举的handler为自定义的handler处理类。3、不止重写MybatisEnumTypeHandler&#xff0c;还重写CompositeEnumTypeHandler类3.1、修改Compos…

【WPF】桌面程序开发之xaml页面主题和样式详解

使用Visual Studio开发工具&#xff0c;我们可以编写在Windows系统上运行的桌面应用程序。其中&#xff0c;WPF&#xff08;Windows Presentation Foundation&#xff09;项目是一种常见的选择。然而&#xff0c;对于初学者来说&#xff0c;WPF项目中xaml页面的布局设计可能是一…

Bat的退役前

我们很讨厌bat 语法这版的命令形式后缀尽管古老&#xff0c;可是在涉及细微VS 项目op 时候&#xff0c;它起到了不可忽视且非它不行的效应 我们不想替历史背上厚重的学习包袱&#xff0c;可是我们能忽视BAT 吗 如若进入到 无window时代&#xff0c;我们几乎得全然依仗BAT专家。…

35天学习小结

距离上次纪念日&#xff0c;已经过去了35天咯 算算也有5周了&#xff0c;在这一个月里&#xff0c;收获的也挺多&#xff0c;在这个过程中认识的大佬也是越来越多了hh 学到的东西&#xff0c;其实也没有很多&#xff0c;这个暑假多多少少还是有遗憾的~ 第一周 学习了一些有…

【计算机组成原理】详细解读带符号整数的原码表示法

带符号整数的表示——原码 导读一、有符号整数的存储结构二、有符号整数的表现形式三、原码3.1 原码与真值之间的转换3.2 原码的运算3.3 原码的优缺点 结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 在上一篇内容中我们介绍了无符号…