目录
一. 前言
二. 版本下载
2.1. 版本说明
三. 快速启动
3.1. 下载解压
3.2. 启动服务
3.3. 创建一个主题(Topic)
3.4. 发送消息
3.5. 消费消息
3.6. 使用 Kafka Connect 来导入/导出数据
3.7. 使用 Kafka Stream 来处理数据
3.8. 停止 Kafka
四. 以系统服务方式启动 Kafka
4.1. Linux 系统
4.1.1. 创建服务文件
4.1.2. 启动服务
4.2. Windows 系统
4.2.1. 准备工作
4.2.2. 注册服务
4.2.3. 编辑注册表
4.2.4. 启动并测试
4.3. 常用命令
一. 前言
在前文《Kafka 入门介绍》中,主要介绍了 Kafka 的由来、使用场景、相关的重要术语等,本文就着重介绍下Kafka 的下载、安装和启动,以便为接下使用它做好准备。
二. 版本下载
登录 Apache kafka 官方下载,https://kafka.apache.org/downloads.html
推荐下载 Scala 2.13 版本的。
2.1. 版本说明
比如我们下载的包是这样:kafka_2.13-3.6.1.tgz,2.13 是 Scala 编译器的版本,后面的 3.6.1 才是 Kafka 的版本。
- 0.8版本以后才加入副本机制;新老 API 的主要区别在于连接 Kafka 的时候是配置 ZK 地址还是 Broker 的地址,新版 API 都是配置 Broker 的地址。
- 0.8.2.2这个版本比较稳定,其中的老版本 consumer API 也比较稳定,但是不要用新版本producer API,BUG 比较多。
- 0.9 版本的中的新版本的 producer API 比较稳定,但是新版本的 consumer API 不稳定。
- 0.10 加入了 Kafka Streams,但不要在生产中使用,不过该版本的消息引擎功能没问题。建议使用 0.10.2.2,该版本的新版 Consumer API 和 Producer API 都比较稳定。
- 0.11.0.0 在 Producer API 中加入了幂等性以及事务,另外就是对 Kafka 消息格式做了重构。增加了幂等性和事务才能使 Kafka 在流处理方面更加得心应手,但是稳定性不好。所以可以使用 0.11.0.3 这个版本。
- 1.0和2.0主要是针对 Kafka Streams 做了各种改进。在消息引擎方面没有重大变化。
- 3.x 版本去掉了对 ZK 的依赖,弃用 Java8,但依然可以使用,弃用 Scala2.12 版本,但依然可以使用。
三. 快速启动
3.1. 下载解压
$ tar -xzf kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1
3.2. 启动服务
注意:你的本地环境必须安装有
Java 8+
。如何配置请参见《JDK 环境变量设置》。
运行 Kafka 需要使用 Zookeeper,所以你需要先启动 Zookeeper,如果你没有 Zookeeper,你可以使用 Kafka 自带打包和配置好的 Zookeeper。
# 注意:Apache Kafka2.8版本之后可以不需要使用ZooKeeper,内测中,文章末尾有体验的安装方式。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
...
打开另一个命令终端启动 Kafka 服务:
$ bin/kafka-server-start.sh config/server.properties &
一旦所有服务成功启动,那 Kafka 已经可以使用了。
3.3. 创建一个主题(Topic)
创建一个名为 test 的 Topic,只有一个分区和一个备份:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建好之后,可以通过运行以下命令,查看已创建的 Topic 信息:
$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
或者,除了手工创建 Topic 外,你也可以配置你的 Broker,当发布一个不存在的 Topic 时自动创建Topic,点击这里查看如何配置自动创建topic时设置默认的分区和副本数自动创建 Topic 时设置默认的分区和副本数。
3.4. 发送消息
Kafka 提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给 Kafka 集群。每一行是一条消息。
运行 Producer(生产者),然后在控制台输入几条消息到服务器:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
3.5. 消费消息
Kafka 也提供了一个消费消息的命令行工具,将存储的信息输出来,新打开一个命令控制台,输入:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
可以使用
Ctrl-C
停止消费者客户端。
3.6. 使用 Kafka Connect 来导入/导出数据
你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect 允许你不断地从外部系统提取数据到 Kafka,反之亦然。用 Kafka 整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。
查看 Kafka Connect 部分,了解更多关于如何将你的数据持续导入和导出 Kafka。
3.7. 使用 Kafka Stream 来处理数据
一旦你的数据存储在 Kafka 中,你就可以用 Kafka Streams 客户端库来处理这些数据,该库适用于 Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与Kafka 服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了给你一个初步的体验,这里是如何实现流行的 WordCount 算法的:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka 流演示例子和应用开发教程展示了如何从头到尾编码和运行这样一个流式应用。
3.8. 停止 Kafka
现在你已经完成了快速入门,可以随时卸载 Kafka 环境了,或者继续玩下去。
- 使用 Ctrl-C 停止生产者和消费者客户端。
- 使用 Ctrl-C 停止 Kafka Broker。
- 最后,用 Ctrl-C 停止 ZooKeeper。
如果你还想删除你的本地 Kafka 环境的数据,包括你创建的消息,运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
四. 以系统服务方式启动 Kafka
4.1. Linux 系统
4.1.1. 创建服务文件
创建 /usr/lib/systemd/system/zookeeper.service 并写入:
[Unit]
Requires=network.target
After=network.target
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target
创建 /usr/lib/systemd/system/kafka.service 并写入:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target
注意:示例的 Kafka 安装地址在 /usr/local/kafka。
4.1.2. 启动服务
重载系统服务并启动:
systemctl daemon-reload
systemctl enable zookeeper && systemctl enable kafka
systemctl start zookeeper && systemctl start kafka
systemctl status zookeeper && systemctl status kafka
4.2. Windows 系统
4.2.1. 准备工作
下载 Kafka:https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
解压 Kafka 至 D:\bigdata\kafka_2.13-3.6.1
下载 instsrv.exe / srvany.exe
将 instsrv.exe / srvany.exe 拷贝至 D:\bigdata\kafka_2.13-3.6.1\bin\windows
4.2.2. 注册服务
以管理员身份打开 cmd:
cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
instsrv KafkaService D:\bigdata\kafka_2.13-3.6.1\bin\windows\srvany.exe
卸载服务:
instsrv KafkaService remove
或
sc delete KafkaService
4.2.3. 编辑注册表
打开注册表(运行窗口中输入 regedit):
- 定位到以下路径:HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\KafkaService;
- 如果该服务名下没有 Parameters 项目,则对服务名称项目右击新建项,名称为 Parameters;
- 定位到 Parameters 项,新建以下几个字符串值:
- 名称 Application 值为你要作为服务运行的 BAT 文件地址:D:\bigdata\kafka_2.13-3.6.1\bin\windows\kafka-server-start.bat;
- 名称 AppDirectory 值为你要作为服务运行的 BAT 文件所在文件夹路径:D:\bigdata\kafka_2.13-3.6.1\bin\windows;
- 名称 AppParameters 值为你要作为服务运行的 BAT 文件启动所需要的参数:D:\bigdata\kafka_2.13-3.6.1\config\server.properties。
4.2.4. 启动并测试
做完以上3步,启动服务即可。
测试 Kafka:
1. 创建 Topic
打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:
cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2. 创建生产者 Producer
打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:
cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test
3. 创建消费者
打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:
cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在生产者命令行窗口内随便输入一段字符,然后回车,你应该能看到同样的消息出现在消费者的命令行窗口内,如果在消费者端能看到你推送的消息,那么你已经成功的安装了 Kafka。
4.3. 常用命令
1. 列举 Topic:
kafka-topics.bat --list --zookeeper localhost:2181
2. 描述 Topic:
kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]
3. 从头消费消息:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning
4. 删除 Topic:
kafka-run-class.bat kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181