Kafka简要介绍与快速入门示例

news2025/4/3 5:46:07

1、什么是Kafka?

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

2、主流消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,吞吐量比RocketMQ和Kafka要低了一个数量级万级,吞吐量同ActiveMQ10万级,可以支撑高吞吐量10万级别,高吞吐量。适合日志采集,实时计算等场景
topic数量对吞吐量的影响topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topictopic从几十个到几百个的时候,吞吐量会「大幅度下降」所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
时效性ms级微秒级,这是rabbitmq的一大特点,延迟是最低的ms级延迟在ms级以内
可用性高,基于主从架构实现高可用性同ActiveMQ非常高,分布式架构非常高,同样也是分布式式
消息可靠性有较低的概率丢失数据经过参数优化配置,可以做到0丢失同RocketMQ一样也可以做到消息零丢失
功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
优劣总结

「ActiveMQ」

  • 优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用
  • 缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。

「RabbitMQ」

  • 优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。
  • 缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

「RocketMQ」

  • 优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。
  • 缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。

「Kafka」

  • 优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。
  • 缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。

从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。

3、kafka集成java

导入kafka依赖

<!-- kafka依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.6</version>
</dependency>

配置kafka

## kafka ##
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer:  org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    topic:
      demo_test  #主题

生产者

@Component
@Slf4j
public class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Value("${spring.kafka.topic}")
    private String topic;

    /**
     * 发送kafka消息
     *
     * @param jsonString
     */
    public void send(String jsonString) {
        ListenableFuture future = kafkaTemplate.send(topic, jsonString);
        future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));
    }
}

消费者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "demo_test")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
        System.out.println("收到消息:"+record.value());
    }


}

单元测试

@Test
public void testDemo() throws InterruptedException {
    for (int i = 0; i <= 10; i++) {
        log.info("start send");
        kafkaProducer.send("我是刘列广"+i);
        log.info("end send");
        // 休眠10秒,为了使监听器有足够的时间监听到topic的数据
        Thread.sleep(10);
    }
}

Kafka面试题

1、Kafka的设计时什么样的呢

Kafka将消息以topic为单位进行归纳

将向Kafka topic发布消息的程序成为producers.

将预订topics并消费消息的程序成为consumer.

Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.

producers通过网络将消息发送到Kafka集群,集群向消费者提供消息

2、数据传输的事物定义有哪三种?

数据传输的事务定义通常有以下三种级别:

(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输

(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.

(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的

3、Kafka判断一个节点是否还活着有那两个条件?

(1)节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接

(2)如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久

4、producer是否直接将数据发送到broker的leader(主节点)?

producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了

5、Kafa consumer是否可以消费指定分区消息?

Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的

6、Kafka消息是采用Pull模式,还是Push模式?

Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:

producer将消息推送到broker,consumer从broker拉取消息

一些消息系统比如Scribe和Apache Flume

采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式

Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略

Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞直到新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发

7、Kafka存储在硬盘上的消息格式是什么?

消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。

·消息长度: 4 bytes (value: 1+4+n)

·版本号: 1 byte

·CRC校验码: 4 bytes

·具体的消息: n bytes

8、Kafka高效文件存储设计特点:

(1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。

(2).通过索引信息可以快速定位message和确定response的最大大小。

(3).通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。

(4).通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

9、Kafka 与传统消息系统之间有三个关键区别

(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留

(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性

(3).Kafka 支持实时的流式处理

10、Kafka创建Topic时如何将分区放置到不同的Broker中

·副本因子不能大于 Broker 的个数;

·第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;

·其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;

·剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的

11、Kafka新建的分区会在哪个目录下创建

在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。

当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。

如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。

但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。

12、partition的数据如何保存到硬盘

topic中的多个partition以文件夹的形式保存到broker,每个分区序号从0递增,

且消息有序

Partition文件下有多个segment(xxx.index,xxx.log)

segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g

如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名

13、kafka的ack机制

request.required.acks有三个值 0 1 -1

0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据

1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失

-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢失

14、Kafka的消费者如何消费数据

消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置

等到下次消费时,他会接着上次位置继续消费

15、消费者负载均衡策略

一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如果组中成员太多会有空闲的成员

16、kafaka生产数据时数据的分组策略

生产者决定数据产生到集群的哪个partition中

每一条消息都是以(key,value)格式

Key是由生产者发送数据传入

所以生产者(key)决定了数据产生到集群的哪个partition

17、数据有序

一个消费者组里它的内部是有序的

消费者组与消费者组之间是无序的

18、在kafka里面怎么解决信息的丢失?

  1. 设置合适的ack参数:在producer端设置ack参数,可以控制消息的确认方式。如果设置为"all",则只有当消息被成功写入所有replica后才会返回ack信号,这样可以保证消息不会丢失。如果设置为"1",则只有当消息被写入leader时才会返回ack信号,这可能会存在数据丢失的风险。
  2. 配置重试机制:在producer端配置重试机制,当消息写入失败时可以自动重试,提高消息的可靠性。但是需要注意,重试可能会增加消息的延迟,需要根据实际业务场景进行权衡。
  3. 设置合理的副本因子:通过增加副本因子来增加数据的冗余度,从而在某些节点出现问题时能够保证数据的可靠性和可用性。
  4. 定期清理过期数据:Kafka会根据配置的策略定期清理过期数据,以避免磁盘空间不足导致的数据丢失问题。
  5. 使用Kafka的消费者组:通过将消费者组中的多个消费者组成一个逻辑上的整体,可以实现负载均衡和容错的效果。当某个消费者出现故障时,其他消费者可以继续消费数据,保证数据的可靠性和可用性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2326322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用 Chrome devTools Source Override 实现JS逆向破解案例

之前讲解 Chrome 一大强势技术 override 时&#xff0c;给的案例貌似没有给大家留下多深的印象 浏览器本地替换&#xff08;local overrides&#xff09;快速定位前端样式问题的案例详解&#xff08;也是hook js的手段&#xff09;_浏览器的 overrides 替换功能-CSDN博客 其实…

容器C++ ——STL常用容器

string容器 string构造函数 #include<iostream> using namespace std; #include<string.h> void test01() {string s1;//默认构造const char* str "hello world";string s2(str);//传入char*cout << "s2" << s2 << endl;s…

npu踩坑记录

之前使用qwen系列模型在ascend 910a卡进行了一些生成任务, 贴出踩坑过程也许对遇到类似问题的同学有帮助: ) 目录 千问 qwq32环境配置 代码部署 生成内容清洗 已生成内容清洗 生成过程优化 Failed to initialize the HCCP process问题 assistant 的历史回答丢失 推理执…

Linux信号——信号的产生(1)

注&#xff1a;信号vs信号量&#xff1a;两者没有任何关系&#xff01; 信号是什么&#xff1f; Linux系统提供的&#xff0c;让用户&#xff08;进程&#xff09;给其他进程发送异步信息的一种方式。 进程看待信号的方式&#xff1a; 1.信号在没有发生的时候&#xff0c;进…

【机器学习】——机器学习思考总结

摘要 这篇文章深入探讨了机器学习中的数据相关问题&#xff0c;重点分析了神经网络&#xff08;DNN&#xff09;的学习机制&#xff0c;包括层级特征提取、非线性激活函数、反向传播和梯度下降等关键机制。同时&#xff0c;文章还讨论了数据集大小的标准、机器学习训练数据量的…

JMeter进行分布式压测

从机&#xff1a; 1、确认防火墙是否关闭&#xff1b; 2、打开网络设置&#xff0c;关闭多余端口&#xff1b;&#xff08;避免远程访问不到&#xff09; 3、打开JMeter/bin 目录底下的jmeter.properties&#xff1b; remove_hosts设置当前访问地址&#xff0c;192.XXXXX&…

快速入手-基于Django-rest-framework的第三方认证插件(SimpleJWT)权限认证扩展返回用户等其他信息(十一)

1、修改serializer.py&#xff0c;增加自定义类 # 自定义用户登录token等返回信息 class MyTokenObtainPair(TokenObtainPairView): def post(self, request, *args, **kwargs): serializer self.get_serializer(datarequest.data) try: serializer.is_valid(raise_exceptio…

关于IP免实名的那些事

IP技术已成为个人与企业保护隐私、提升网络效率的重要工具。其核心原理是通过中介服务器转发用户请求&#xff0c;隐藏真实IP地址&#xff0c;从而实现匿名访问、突破地域限制等目标。而“免实名”代理IP的出现&#xff0c;进一步简化了使用流程&#xff0c;用户无需提交身份信…

【SQL性能优化】预编译SQL:从注入防御到性能飞跃

&#x1f525; 开篇&#xff1a;直面SQL的"阿喀琉斯之踵" 假设你正在开发电商系统&#x1f6d2;&#xff0c;当用户搜索商品时&#xff1a; -- 普通SQL拼接&#xff08;危险&#xff01;&#xff09; String sql "SELECT * FROM products WHERE name "…

SQL Server从安装到入门一文掌握应用能力。

本篇文章主要讲解,SQL Server的安装教程及入门使用的基础知识,通过本篇文章你可以快速掌握SQL Server的建库、建表、增加、查询、删除、修改等基本数据库操作能力。 作者:任聪聪 日期:2025年3月31日 一、SQL Server 介绍: SQL Server 是微软旗下的一款主流且优质的数据库…

力扣HOT100之矩阵:54. 螺旋矩阵

这道题之前在代码随想录里刷过类似的&#xff0c;还有印象&#xff0c;我就按照当初代码随想录的思路做了一下&#xff0c;结果怎么都做不对&#xff0c;因为按照代码随想录的边界条件设置&#xff0c;当行数和列数都为奇数时&#xff0c;最后一个元素无法被添加到数组中&#…

5.1 WPF路由事件以及文本样式

一、路由事件 WPF中存在一种路由事件&#xff08;routed event&#xff09;&#xff0c;该事件将发送到包含该控件所在层次的所有控件&#xff0c;如果不希望继续向更高的方向传递&#xff0c;只要设置e.Handled true即可。 这种从本控件-->父控件->父的父控件的事件&am…

Python数据可视化-第1章-数据可视化与matplotlib

环境 开发工具 VSCode库的版本 numpy1.26.4 matplotlib3.10.1 ipympl0.9.7教材 本书为《Python数据可视化》一书的配套内容&#xff0c;本章为第1章 数据可视化与matplotlib 本文主要介绍了什么是数据集可视化&#xff0c;数据可视化的目的&#xff0c;常见的数据可视化方式…

Flutter敏感词过滤实战:基于AC自动机的高效解决方案

Flutter敏感词过滤实战&#xff1a;基于AC自动机的高效解决方案 在社交、直播、论坛等UGC场景中&#xff0c;敏感词过滤是保障平台安全的关键防线。本文将深入解析基于AC自动机的Flutter敏感词过滤实现方案&#xff0c;通过原理剖析实战代码性能对比&#xff0c;带你打造毫秒级…

Odoo/OpenERP 和 psql 命令行的快速参考总结

Odoo/OpenERP 和 psql 命令行的快速参考总结 psql 命令行选项 选项意义-a从脚本中响应所有输入-A取消表数据输出的对齐模式-c <查询>仅运行一个简单的查询&#xff0c;然后退出-d <数据库名>指定连接的数据库名&#xff08;默认为当前登录用户名&#xff09;-e回显…

Vue中使用antd-table组件时,树形表格展开配置不生效-defaultExpandedRowKeys-默认展开配置不生效

defaultExpandedRowKeys属性 defaultExpandAllRows这个属性仅仅是用来设置默认值的,只在第一次渲染的时候起作用,后续再去改变,无法实现响应式 解决方案一 a-table表格添加key属性,当每次获取值时,动态改变key,以达到重新渲染的效果 <a-table:key="tableKey"…

VRRP交换机三层架构综合实验

题目要求&#xff1a; 1&#xff0c;内网Ip地址使用172.16.0.0/16分配 说明可以划分多个子网&#xff0c;图中有2个VLAN&#xff0c;可以根据VLAN划分 2&#xff0c;sw1和SW2之间互为备份 互为备份通常通过VRRP&#xff08;虚拟路由冗余协议&#xff09;来实现。VRRP会在两个…

基于卷积神经网络的眼疾识别系统,resnet50,efficentnet(pytorch框架,python代码)

更多图像分类、图像识别、目标检测、图像分割等项目可从主页查看 功能演示&#xff1a; 眼疾识别系统resnet50&#xff0c;efficentnet&#xff0c;卷积神经网络&#xff08;pytorch框架&#xff0c;python代码&#xff09;_哔哩哔哩_bilibili &#xff08;一&#xff09;简介…

基于srpingboot智慧校园管理服务平台的设计与实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论…

【力扣hot100题】(026)合并两个有序链表

可以创建一个新链表记录答案&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *…