Kafka快速入门指南
- 微信公众号:阿俊的学习记录空间
- 小红书:ArnoZhang
- wordpress:arnozhang1994
- 博客园:arnozhang
- CSDN:ArnoZhang1994
第一步:获取Kafka
下载2.13-3.8.0版本的Kafka版本并解压:
$ tar -xzf kafka_2.13-3.8.0.tgz
$ cd kafka_2.13-3.8.0
第二步:启动Kafka环境
注意:你的本地环境必须安装Java 8及以上版本。
Apache Kafka可以通过KRaft模式或ZooKeeper模式启动。请按照以下其中一个配置启动Kafka,不要同时使用两者。
使用KRaft启动Kafka
Kafka可以通过KRaft模式启动,使用本地脚本和下载的文件或Docker镜像。选择以下其中一种方式启动Kafka服务:
使用已下载的文件
- 生成集群UUID
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
- 格式化日志目录
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
- 启动Kafka服务器
$ bin/kafka-server-start.sh config/kraft/server.properties
使用ZooKeeper启动Kafka
按照以下顺序启动所有服务:
- 启动ZooKeeper服务:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
- 打开另一个终端,启动Kafka代理服务:
$ bin/kafka-server-start.sh config/server.properties
所有服务成功启动后,你将拥有一个基本的Kafka环境,准备使用。
使用基于JVM的Apache Kafka Docker镜像
- 获取Docker镜像:
$ docker pull apache/kafka:3.8.0
- 启动Kafka Docker容器:
$ docker run -p 9092:9092 apache/kafka:3.8.0
使用基于GraalVM的原生Apache Kafka Docker镜像
- 获取Docker镜像:
$ docker pull apache/kafka-native:3.8.0
- 启动Kafka Docker容器:
$ docker run -p 9092:9092 apache/kafka-native:3.8.0
Kafka服务器成功启动后,你将拥有一个基础Kafka环境,准备使用。
第三步:创建用于存储事件的主题
Kafka是一个分布式事件流平台,它可以在多台机器上读取、写入、存储和处理事件(文档中也称为记录或消息)。在编写事件之前,必须创建一个主题。打开另一个终端,运行以下命令:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
你可以使用以下命令来查看新主题的分区数量等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
第四步:向主题中写入事件
运行控制台生产者客户端,将一些事件写入主题。每输入一行数据,就会将其作为独立事件写入主题:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
随时可以按Ctrl-C停止生产者客户端。
第五步:读取事件
打开另一个终端,运行控制台消费者客户端来读取刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
你可以按Ctrl-C随时停止消费者客户端。
第六步:使用Kafka Connect进行数据导入/导出
Kafka Connect可以让你持续从外部系统中摄取数据到Kafka,反之亦然。在此快速入门中,我们将演示如何使用简单的连接器将数据从文件导入Kafka主题,并将数据从Kafka主题导出到文件。
编辑 config/connect-standalone.properties
文件,添加以下配置:
$ echo "plugin.path=libs/connect-file-3.8.0.jar" >> config/connect-standalone.properties
然后创建一些测试数据:
$ echo -e "foo\nbar" > test.txt
接着,运行两个连接器,启动Kafka Connect:
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
第七步:使用Kafka Streams处理事件
Kafka Streams允许你在Java/Scala中实现实时应用程序和微服务,处理存储在Kafka中的数据。例如,实现WordCount算法:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
第八步:终止Kafka环境
你可以按Ctrl-C停止生产者和消费者客户端,Kafka代理和ZooKeeper服务。若要删除所有数据:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs