消息中间件 Kafka 快速入门与实战

news2024/9/28 13:09:38

1、概述

最近感觉上班实在是太无聊,打算给大家分享一下Kafka的使用,本篇文章首先给大家分享三种方式搭建Kafka环境,接着给大家介绍kafka核心的基础概念以及Java API的使用,最后分享一个SpringBoot的集成案例,希望对大家有所帮助

2、环境搭建

2.1、安装包安装

关于环境搭建这块我们先来通过手动下载安装包的方式来完成,首先下载安装包,这是我使用的环境是CentOS7。

下载的地址 :

https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz

可以使用wget 工具下载 

下载完成后我们解压,这里我是用的路径是 /usr/local 解压后如下图所示:

来到bin路径下,我们需要先启动zookeeper 然后再启动kafka,相关命令如下

# 启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties &

# 启动 kafka
./kafka-server-start.sh ../config/server.properties &
启动后 我们可以查看进程

kafka 已经启动成功了

2.2、Kraft 方式启动 kafka

上面我们使用zookeeper的方式运行了Kafka,kafka从2.8的版本就引入了KRaft模式,主要是用来取代zookeeper来管理元数据,下面我们来试试使用KRaft的方式来启动kafka

# 停掉kafka
./kafka-server-stop.sh ../config/server.properties

# 停掉zookeeper
./zookeeper-server-stop.sh ../config/zookeeper.properties

# 生成Cluster UUID
./kafka-storage.sh random-uuid

这里我们能需要记录下 生成的这个uuid值

# 格式化日志目录
./kafka-storage.sh format -t y__hXaR2QVKaehk5FiNLfQ -c ../config/kraft/server.properties

接着启动kafka

# 启动Kafka
./kafka-server-start.sh ../config/kraft/server.properties &

2.3、 Docker 安装

相信大家一定喜欢这种方式,直接使用 Docker 一把梭哈

# 拉取Kafka镜像
docker pull apache/kafka:3.7.0

# 启动Kafka容器
docker run -p 9092:9092 apache/kafka:3.7.0

# 复制出来一份配置文件
docker cp 7434ce960297:/etc/kafka/docker/server.properties /opt/docker/kafka

然后我们需要修改配置文件

修改完成后我们使用以下命令启动kafka

docker run -d  \
-v /opt/docker/kafka:/mnt/shared/config  \
-v /opt/data/kafka-data:/mnt/kafka-data  \
-p 9092:9092  \
--name kafka-container  \
apache/kafka:3.7.0

完成之后我们可以使用客户端工具连接试试 

 至此我们的环境搭建完成了。

3、基础概念

3.1、Topic & event 简述

在kafka中 消息(event) 被组织并持久地存储在Topic中。非常简单地说,Topic 类似于文件系统中的一个文件夹,而事件就是该文件夹中的文件。这既是官方给出的Topic和event的定义

https://kafka.apache.org/37/documentation.html#introduction

接下来我们来创建一个主题,首先我们来到 kafka 安装目录的 bin 目录下

# 创建一个名为 tianlongbabu的主题 
./kafka-topics.sh --create --topic tianlongbabu --bootstrap-server 192.168.200.100:9092

# 列出所有的主题(在主机上操作可以使用 localhost)
./kafka-topics.sh --list --bootstrap-server localhost:9092

 创建好了之后 我们可以回到客户端工具查看

同样的 我们可以直接在这个客户端上删除这个 topic 

3.2、生产消息和消费消息

我们接下来看看 怎么往主题中写入事件(消息)

# 在主机上 指定topic  连接到kafka服务端
./kafka-console-producer.sh --topic tianlongbabu  \
--bootstrap-server 192.168.200.100:9092

连接上之后 就可以发送事件了

同样的我们新开一个终端  使用 kafka-console-consumer.sh 这个脚本 就可以消费topic中的数据了

## 主机上操作  可以使用localhost 
## --from-beginning 表示从kafka最早的消息开始消费

./kafka-console-consumer.sh --topic tianlongbabu  \
--from-beginning --bootstrap-server localhost:9092

我们连接上之后 就会打印出刚刚发送的事件(消息)了。

3.3、关于事件的组成

看到这里 相信大家对事件有了一定的了解,关于事件这里给出一段官方文档上的原文

我们 从这段话中至少可以知道

1、event 在文档中也被称为记录(record)或消息(message) 

2、 当你向Kafka读写数据时,采用的是事件形式

3、概念上,一个事件包含键(key)、值(value)、时间戳(timestamp),以及可选的元数据(metadata)。

所以一个事件可以设置一个key, 那么你可能会问key 是干什么用的呢,作用是什么呢?这个会在后面的编码的章节中给出解释,大家先知道有这个概念就行了

3.4、关于 Partition (分区)

先来给出一张图,出自官方文档的主要术语解释的章节

从这张图中我们可以看到,topic 中存在多个 partition(分区),也就是说事件其实都是被保存在topic中的分区里,并且一个topic 可以创建多个分区。

消息在分区中以追加的方式存储,每条消息都有一个唯一的偏移量(offset),表示它在分区中的位置。

那么分区的作用是什么呢,我自己根据文档上描述主要总结了4点

1、分区允许多个消费者并行消费同一主题中的消息,不同的消费者可以消费不同的分区,从而提高系统的吞吐量

2、Kafka会根据配置将消息均匀地分布到各个分区,确保负载在多个消费者之间均匀分配

3、在同一分区内,消息的顺序是有保障的。这意味着对于同一键的所有消息都会被发送到同一分区,从而保持顺序

4、通过增加分区数,可以扩展主题的并发能力和存储能力。Kafka支持动态增加分区,但请注意,这可能影响到现有数据的顺序。

这几点大家先了解即可。

4、入门程序开发 

4.1、新建项目引入依赖

新建项目,添加 kafka 客户端依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.0</version>
        </dependency>

4.2、编写生产者

我们先来创建消息生产者的代码

public class KafkaProducerTest {

    public static void main(String[] args) {
        // Kafka配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.200.100:9092"); // Kafka服务器地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "tianlongbabu"; // 主题名称
        String key = "gaibang"; // 消息键
        String value = "乔峰"; // 消息值

        // 发送消息
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record, (RecordMetadata metadata, Exception e) -> {
            if (e != null) {
                e.printStackTrace();
            } else {
                System.out.println("Sent message with key: " + key + ", value: " + value + ", to partition: " + metadata.partition() + ", offset: " + metadata.offset());
            }
        });

        // 关闭生产者
        producer.close();
    }
}

 上述代码的写法  在org.apache.kafka.clients.producer.KafkaProducer 类的注释中有详细的说明

这个简单的入门案例中用到了前面给大家介绍的key(事件键)了,看了这个案例大家应该能够明白是什么意思了吧。这个时候 我们运行main方法 就能把测试数据写入到 topic 中了 

4.3、编写消费者

相关代码如下:

public static void main(String[] args) {
        // Kafka配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.100:9092"); // Kafka服务器地址
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "tianlongbabu"; // 主题名称

        // 订阅主题
        consumer.subscribe(Collections.singletonList(topic));

        // 不断消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Consumed message with key: %s, value: %s, from partition: %d, offset: %d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }

        // 关闭消费者(通常不会到达这里)
        // consumer.close();
    }

消费者的代码里我们需要指定一个消费组 id ,当存在多个消费者服务的时候 需要为他们各自的消费组id。

我们运行main方法即可收到刚才发送的消息了。

到这里一个简单的生产-消费的案例已经结束了,相信你对Kafka也有了一个初步的认知。

4.4、关于消费组ID

上述入门程序的消费者程序中有一个消费组id 。在 Kafka 中,消费组 ID(Consumer Group ID)是一个重要的概念,用于标识一组协同工作的消费者。

简单来说就是:

1、对于一个特定的主题,如果多个消费者属于同一个消费组,那么主题中的每条消息只会被该消费组内的一个消费者消费。
2、如果消费者属于不同的消费组,则它们可以独立消费同一主题的消息,并且不会互相影响

5、SpringBoot集成

5.1、相关依赖

SpringBoot版本:

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.2.9</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

项目相关依赖: 

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Spring Boot Starter Web (optional, if you want to create a REST API) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Add other dependencies as needed -->
</dependencies>

5.2、配置详解

我们在applicatio.properties  添加以下配置

spring.kafka.bootstrap-servers=192.168.200.100:9092
## 消费组
spring.kafka.consumer.group-id=my-group
## 从最早的消息开始消费
spring.kafka.consumer.auto-offset-reset=earliest

5.3、编写生产者

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

5.4、编写消费者

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "tianlongbabu", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received Message: " + message);
    }
}

5.5、测试

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaProducer.sendMessage("tianlongbabu", message);
    }
}

启动类

@SpringBootApplication
public class BootKafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(BootKafkaApplication.class, args);
	}

}

启动服务,由于我们配置的是earliest所以会打印出 topic中之前的消息

我们可以继续使用postman发送消息 ,然后观察控制台的输出

 

好了一个简单的消息生产和消费的流程就是这样了

6、Kafka 的应用

今天这篇文章主要给大家介绍了Kafka的一些基础知识,并且实现了SpringBoot快速集成的案例,大家后续可以使用上述案例作为模板添加自己的业务代码。后续我也会分享一些 Kafka工程化落地的实战案例在我的微信公众号(代码洁癖症患者)上,感兴趣的小伙伴可以去查阅

最后我们聊聊Kafka的应用场景

首先 Kafka 可以作为高吞吐量的消息队列,支持异步通信。生产者将消息发送到主题,消费者从主题中读取消息,适合实现解耦的微服务架构。

其次 Kafka 可以收集各个服务或应用的日志信息,并将其集中存储,以便后续的分析和监控。

在大数据领域 使用 Kafka 结合流处理框架(如 Apache Flink、Apache Spark Streaming),可以实时处理和分析数据流,适用于实时监控、欺诈检测等场景。

在数据集成领域 Kafka 可以作为不同数据源之间的数据桥梁,实现数据的实时传输和整合,常用于 ETL(提取、转换、加载)流程。

这里仅仅列出几个比较常用的场景,还有很多领域都可以见到Kafka的身影。

好了,纸上得来终觉浅,大家赶紧动手试试吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2173588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Arthas sc(查看JVM已加载的类信息 )

文章目录 二、命令列表2.2 class/classloader相关命令2.2.5 sc&#xff08;查看JVM已加载的类信息 &#xff09;举例1&#xff1a;模糊搜索&#xff0c;xx包下所有的类举例2&#xff1a;打印类的详细信息举例3&#xff1a;打印出类的Field信息 二、命令列表 2.2 class/classlo…

计算机网络的整体认识---网络协议,网络传输过程

计算机网络背景 网络发展 独立模式: 计算机之间相互独立; 网络互联: 多台计算机连接在一起, 完成数据共享; 局域网LAN: 计算机数量更多了, 通过交换机和路由器连接在一起; 广域网WAN: 将远隔千里的计算机都连在一起;所谓 "局域网" 和 "广域网" 只是一个相…

(最新已验证)stm32 + 新版 onenet +dht11+esp8266/01s + mqtt物联网上报温湿度和控制单片机(保姆级教程)

物联网实践教程&#xff1a;微信小程序结合OneNET平台MQTT实现STM32单片机远程智能控制 远程上报和接收数据——汇总 前言 之前在学校获得了一个新玩意&#xff1a;ESP-01sWIFI模块&#xff0c;去搜了一下这个小东西很有玩点&#xff0c;远程控制LED啥的&#xff0c;然后我就想…

Arthas classloader (查看 classloader 的继承树,urls,类加载信息)

文章目录 二、命令列表2.2 class/classloader相关命令2.2.4 classloader &#xff08;查看 classloader 的继承树&#xff0c;urls&#xff0c;类加载信息&#xff09;举例1&#xff1a;按类加载类型查看统计信息举例2&#xff1a;按类加载实例查看统计信息举例3&#xff1a;查…

k8s搭建一主三从的mysql8集群---无坑

一&#xff0c;环境准备 1.1 k8s集群服务器 ip角色系统主机名cpumem192.168.40.129mastercentos7.9k8smaster48192.168.40.130node1centos7.9k8snode148192.168.40.131node2centos7.9k8snode248192.168.40.132node3centos7.9k8snode348 k8s集群操作请参考《K8s安装部署&…

如何调整云桌面安装的虚拟机分辨率?

如何调整云桌面安装的虚拟机分辨率&#xff1f; 1. 编辑GRUB配置文件2. 修改分辨率3. 更新GRUB4. 重启虚拟机 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在云桌面环境中&#xff0c;虚拟机分辨率过低且无法调整时&#xff0c;可以通过以…

UCS512DHN DMX512差分并联协议LED驱动IC 舞动灯光的魔法芯片

UCS512DHN产品概述&#xff1a; UCS512DHN是DMX512差分并联协议LED驱动芯片&#xff0c;可选择1/2/3/4通道高精度恒流输出&#xff0c;灰度达65536 级。UCS512DHN为带散热片封装的大电流输出版本。UCS512DHN有PWM反极性输出功能&#xff0c;此功能适合外挂三极 管&#xff0c;…

认识Hash表+Hash函数的设计+Hash冲突的处理+Hash表的实现+Java中的equals与hashCode

一、Hash表 1、定义&#xff1a;Hash表是一种特殊的数组 2、Hash函数 &#xff08;1&#xff09;设计原则 &#xff08;2&#xff09;作用 &#xff08;3&#xff09;应用 &#xff08;4&#xff09;Hash冲突&#xff1a; 二、Hash函数的设计 1、解决Hash索引分布不均匀…

tomcat安装与部署

一、基础准备 1. 节点规划 IP 主机名 节点 192.168.200.70 tomcat Tomcat 2. 环境准备 准备一台虚拟机&#xff0c;镜像为CentOS-7-x86_64&#xff0c;下载两个软件包&#xff0c;apache-tomcat-9.0.95.tar.gz&#xff1b;zrlog WAR包。 二、安装Tomcat 1.基础环境配…

跳表的理解以及使用

文章目录 背景数组-链表优化链表随机访问的方法 介绍跳表的理解层数随机为什么随机可以保证效率实现细节 跳表与二分查找跳表与红黑数跳表与HASH 使用实现随机层数的实现跳表实现以及测试 背景 数组-链表 数组优点 随机访问速度较快&#xff08;基于下标访问&#xff09;。 实…

OpenCV视频I/O(5)视频采集类VideoCapture之从视频流中获取下一帧的函数grab()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 从视频文件或捕获设备中抓取下一帧。 grab() 函数是 OpenCV 中 VideoCapture 类的一个成员函数&#xff0c;用于从视频流中获取下一帧而不立即检…

基于SpringBoot的学生宿舍管理系统【附源码】

基于SpringBoot的高校社团管理系统&#xff08;源码L文说明文档&#xff09; 4 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xf…

相关数据库类型介绍

数据库类型可以根据不同的维度进行分类&#xff0c;但最常见的分类方式是将其分为关系型数据库&#xff08;Relational Databases&#xff09;和非关系型数据库&#xff08;Non-Relational Databases&#xff09;&#xff0c;也称为NoSQL数据库。下面我将详细介绍这两种类型的数…

[Linux] Linux操作系统 进程的优先级 环境变量

标题&#xff1a;[Linux] Linux操作系统 进程的优先级 个人主页水墨不写bug &#xff08;图片来源于网络&#xff09; 目录 一、进程优先级 1.PRI and NI 2.PRI vs NI 的补充理解 二、命令行参数和环境变量 1. 命令行参数 2.环境变量 I&#xff0c;环境变量是内…

AI大模型算法工程师就业宝典—— 高薪入职攻略与转行秘籍!

从ChatGPT到新近的GPT-4&#xff0c;GPT模型的发展表明&#xff0c;AI正在向着“类⼈化”⽅向迅速发展。 GPT-4具备深度阅读和识图能⼒&#xff0c;能够出⾊地通过专业考试并完成复杂指令&#xff0c;向⼈类引以为傲的“创造⼒”发起挑战。 现有的就业结构即将发⽣重⼤变化&a…

【CSS Tricks】深入聊聊前端编写css的方法论

目录 引言BEM 规范OOCSS 规范结构与样式分离容器与内容分离 SMACSS 规范ITCSS 规范设置层工具层通用层元素层对象层组件层微调层由此分层后的项目代码结构也会相应做修改&#xff0c;主要有两种形式&#xff1a;文件夹形式文件名形式引用方式按照层级顺序引用 ACSS 规范总结 引…

U盘打开提示要格式化:深度剖析、恢复策略与预防指南

U盘打开提示要格式化现象阐述 在日常的数字生活中&#xff0c;U盘作为便携式存储设备的代表&#xff0c;扮演着不可或缺的角色。然而&#xff0c;不少用户都曾遭遇过这样一个令人头疼的问题&#xff1a;当满怀期待地插入U盘&#xff0c;准备访问其中存储的数据时&#xff0c;系…

21天全面掌握:小白如何高效学习AI绘画SD和MJ,StableDiffusion零基础入门到精通教程!快速学习AI绘画指南!

‍‍大家好&#xff0c;我是画画的小强。 今天给大家分享一些我长期以来总结的AI绘画教程和各种AI绘画工具、模型插件&#xff0c;还包含有视频教程 AI工具&#xff0c;免费送&#x1f447;&#x1f447;‍‍ 这份完整版的AI绘画全套学习资料已经上传CSDN&#xff0c;朋友们如…

怎么通过AI大模型开发一个网站?

目录 一、提示词与AI输出 二、网站效果 以前不会代码开发&#xff0c;写网站是不可能的事情&#xff0c;现在有了AI&#xff0c;一切都有了可能。以下是我通过通义千问大模型开发的简单网站。 一、提示词与AI输出 提示词1 你是python程序员&#xff0c;我有一个大的需求&am…

使用代理IP数据采集都需要注意那些?

“在当今大数据时代&#xff0c;数据采集成为了企业决策和个人研究的重要依据。然而频繁访问目标网站往往会引发IP被封锁的风险&#xff0c;这时使用代理IP就显得尤为重要。但代理IP的使用并非毫无风险&#xff0c;以下是使用代理IP进行数据采集时需要注意的几个关键事项。” 一…