本文主要是关于Kafka的命令详解,每个命令都进行了非常详细的注释,帮助大家能更好的理解这些命令背后的含义,从底层去理解,如果大家喜欢,请多多点赞关注,欢迎评论!
为大家推荐几篇比较好的Kafka文章:
Kafka分布式集群部署实战:跨越理论,直击生产环境部署难题与解决方案,性能调优、监控与管理策略大揭秘,轻松上手分布式消息中间件-CSDN博客
Apache Kafka是一个开源的分布式消息系统,也是一个分布式流式计算平台。尽管它在流式计算方面有着强大功能,但在实际应用中,Kafka更多地被用作分布式消息队列。以下是对Kafka的详细解析:
1、启动Kafka
进入Kafka目录下
bin/kafka-server-start.sh -daemon config/server.properties
-
bin/kafka-server-start.sh
:这是 Kafka 提供的启动 Kafka 服务器的脚本文件。它位于 Kafka 安装目录的bin
子目录下。这个脚本负责启动 Kafka 服务器进程。 -
-daemon
:这个参数并不是 Kafka 官方文档中直接提到的标准参数。然而,在一些系统中,-daemon
参数可能用于指示脚本以守护进程(daemon)模式运行。守护进程是在后台运行的进程,它独立于启动它的终端或会话。如果 Kafka 脚本或系统环境以某种方式支持-daemon
参数,那么使用此参数可能会使 Kafka 服务器在后台运行,并且不会阻塞启动它的终端或脚本。 -
config/server.properties
:这是 Kafka 服务器的配置文件路径。这个文件包含了 Kafka 服务器的各种配置参数,如监听端口、日志目录、ZooKeeper 连接信息等。
然而,需要注意的是,Kafka 官方通常推荐使用 nohup
、screen
、tmux
或系统服务管理工具(如 systemd
、upstart
、supervisord
等)来在后台运行 Kafka 服务器。这些工具提供了更可靠和灵活的方式来管理后台进程。
如果你在使用 -daemon
参数时遇到问题,或者你的 Kafka 安装版本不支持这个参数,你可以考虑使用以下替代方案之一来在后台启动 Kafka 服务器:
nohup
nohup bin/kafka-server-start.sh config/server.properties &
这将使 Kafka 服务器在后台运行,并且即使关闭终端或会话,它也会继续运行。
screen
或 tmux
这些工具允许你在单个终端会话中启动多个“窗口”或“会话”,并且即使你断开连接,这些会话也会继续运行。
2、停止Kafka
bin/kafka-server-stop.sh stop
bin/
:这是 Kafka 安装目录中包含可执行脚本和程序的子目录。kafka-server-stop.sh
:这是 Kafka 提供的用于停止 Kafka 服务器的脚本文件。它的主要作用是向 Kafka 服务器进程发送一个停止信号,通常是通过查找 Kafka 服务器进程的 PID(进程ID)并发送一个SIGTERM
或SIGKILL
信号来实现的。stop
:这个参数在这个上下文中可能是不必要的,因为kafka-server-stop.sh
脚本的主要目的就是为了停止 Kafka 服务器。然而,有些脚本可能支持额外的参数来提供额外的控制或行为,但这通常取决于 Kafka 的具体版本和安装环境。
脚本会执行以下操作:
-
查找 Kafka 服务器进程:脚本会尝试定位 Kafka 服务器进程的 PID。这通常是通过查找与 Kafka 服务器相关联的日志文件、锁文件或使用进程列表命令(如
ps
)并匹配 Kafka 进程的名称或命令行参数来实现的。 -
发送停止信号:一旦找到 Kafka 服务器进程的 PID,脚本就会向该进程发送一个停止信号。默认情况下,这通常是
SIGTERM
信号,它允许 Kafka 服务器优雅地关闭,包括完成正在进行的请求、将日志数据刷新到磁盘等。然而,如果 Kafka 服务器没有响应SIGTERM
信号,或者你需要立即停止它,脚本可能会发送SIGKILL
信号。 -
等待进程退出(可选):有些版本的
kafka-server-stop.sh
脚本可能会等待 Kafka 服务器进程完全退出后才返回。这有助于确保 Kafka 服务器已经真正停止运行。
注意事项
- 如果你发现
stop
参数在你的环境中不起作用,或者导致错误,尝试只运行bin/kafka-server-stop.sh
而不带任何参数。 - 在执行停止命令之前,请确保没有任何重要的操作正在进行中,因为 Kafka 服务器的停止可能会导致正在进行的操作中断。
- 如果 Kafka 服务器进程无法停止,你可能需要手动查找并杀死该进程。这可以通过使用
kill
命令和 PID 来完成,但请谨慎操作,以免意外杀死其他重要进程。
总之,bin/kafka-server-stop.sh stop
命令(或简化后的 bin/kafka-server-stop.sh
)是停止 Kafka 服务器的标准方式,但请注意 stop
参数可能不是必需的,具体取决于你的 Kafka 版本和安装环境。
3、创建Topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
bin/kafka-topics.sh
:- 这是 Kafka 安装目录(
bin
子目录)下的一个脚本文件,用于管理 Kafka 主题。你可以使用它来创建、列出、描述、修改或删除主题。
- 这是 Kafka 安装目录(
--zookeeper hadoop102:2181
:- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 用来存储集群元数据的,包括主题的分区信息、消费者组和集群配置等。在这个例子中,ZooKeeper 运行在
hadoop102
这台机器上,监听的端口是2181
。
- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 用来存储集群元数据的,包括主题的分区信息、消费者组和集群配置等。在这个例子中,ZooKeeper 运行在
--create
:- 这个参数告诉
kafka-topics.sh
脚本要执行的操作是创建一个新的主题。
- 这个参数告诉
--replication-factor 3
:- 这个参数设置了主题的副本因子(replication factor)。副本因子是指每个分区(partition)在 Kafka 集群中有多少个副本。在这个例子中,每个分区都将有 3 个副本。这有助于提高数据的可靠性和容错性。注意,副本因子不能超过 Kafka 集群中 broker(服务器)的数量。
--partitions 1
:- 这个参数指定了主题应该有多少个分区(partition)。分区是 Kafka 中并行处理的基本单位。在这个例子中,主题
first
将只有一个分区。虽然只有一个分区限制了主题的并行处理能力,但它对于测试或低吞吐量场景可能是足够的。
- 这个参数指定了主题应该有多少个分区(partition)。分区是 Kafka 中并行处理的基本单位。在这个例子中,主题
--topic first
:- 这个参数指定了要创建的主题的名称。在这个例子中,新主题的名称是
first
。
- 这个参数指定了要创建的主题的名称。在这个例子中,新主题的名称是
综上所述,这条命令的作用是在 Kafka 集群中创建一个名为 first
的新主题,该主题有 1 个分区,每个分区有 3 个副本,并且 Kafka 集群的元数据信息存储在 ZooKeeper 服务器 hadoop102:2181
上。
执行这条命令后,Kafka 集群将配置并启动所需的资源来支持这个新主题,包括在集群中分配分区和副本的存储位置。一旦主题创建成功,你就可以开始向它发送消息,并使用 Kafka 消费者从它读取消息了。
4、查看当前服务器中的所有 topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
bin/kafka-topics.sh
:- 这是 Kafka 安装目录(
bin
子目录)下的一个脚本文件,用于执行与 Kafka 主题相关的各种管理任务。你可以通过这个脚本来创建、列出、描述、修改或删除 Kafka 主题。
- 这是 Kafka 安装目录(
--zookeeper hadoop102:2181
:- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 集群中用于存储集群元数据的系统,包括主题信息、分区信息、消费者组信息等。在这个例子中,ZooKeeper 运行在
hadoop102
这台机器上,监听的端口是2181
。通过连接到这个 ZooKeeper 实例,kafka-topics.sh
脚本能够获取 Kafka 集群的元数据信息。
- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 集群中用于存储集群元数据的系统,包括主题信息、分区信息、消费者组信息等。在这个例子中,ZooKeeper 运行在
--list
:- 这个参数告诉
kafka-topics.sh
脚本要执行的操作是列出 Kafka 集群中当前存在的所有主题。执行这个命令后,脚本将连接到指定的 ZooKeeper 服务器,查询并返回集群中所有主题的列表。
- 这个参数告诉
综上所述,这条命令的作用是连接到 ZooKeeper 服务器 hadoop102:2181
,并列出 Kafka 集群中当前存在的所有主题。这对于查看集群中当前有哪些主题非常有用,尤其是在进行故障排查、资源规划或主题管理时。
5、删除Topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
bin/kafka-topics.sh
:- 正如之前所述,这是 Kafka 安装目录(
bin
子目录)下的一个脚本文件,用于执行与 Kafka 主题相关的各种管理任务。
- 正如之前所述,这是 Kafka 安装目录(
--zookeeper hadoop102:2181
:- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 集群中用于存储集群元数据的系统,包括主题信息、分区信息、消费者组信息等。通过连接到这个 ZooKeeper 实例,
kafka-topics.sh
脚本能够获取 Kafka 集群的元数据信息,并执行相应的管理操作。
- 这个参数指定了 ZooKeeper 服务器的地址和端口。ZooKeeper 是 Kafka 集群中用于存储集群元数据的系统,包括主题信息、分区信息、消费者组信息等。通过连接到这个 ZooKeeper 实例,
--delete
:- 这个参数告诉
kafka-topics.sh
脚本要执行的操作是删除一个主题。需要注意的是,Kafka 的默认配置(从 0.11.0.0 版本开始)是禁止通过kafka-topics.sh
脚本删除主题的,因为这可能会导致数据丢失。如果你想要能够删除主题,你需要在 Kafka 服务器的配置文件中(通常是server.properties
)设置delete.topic.enable=true
。
- 这个参数告诉
--topic first
:- 这个参数指定了要删除的主题的名称。在这个例子中,要删除的主题是
first
。
- 这个参数指定了要删除的主题的名称。在这个例子中,要删除的主题是
综上所述,这条命令的作用是连接到 ZooKeeper 服务器 hadoop102:2181
,并请求 Kafka 集群删除名为 first
的主题。但是,请记住以下几点:
- 确保 Kafka 服务器的配置文件中已经启用了主题删除功能(
delete.topic.enable=true
)。 - 删除主题是一个敏感操作,因为它会永久移除与该主题相关的所有数据。在执行此操作之前,请确保你不再需要这些数据。
- Kafka 集群中的每个 broker 都会处理删除主题的请求,并移除与该主题相关的本地数据和元数据。但是,如果 Kafka 集群中的某些 broker 不可用或未能成功处理删除请求,那么这些 broker 上可能仍然会保留该主题的残留数据。因此,在删除主题后,你可能需要手动清理这些残留数据(尽管在大多数情况下,Kafka 会自动处理这些问题)。
最后,需要注意的是,从 Kafka 2.4.0 版本开始,Kafka 引入了一个更安全的方式来删除主题,即使用 AdminClient API(通过 Kafka 命令行工具或编程方式)。这种方法提供了更多的控制和反馈,使得删除主题的过程更加可靠和透明。然而,kafka-topics.sh
脚本仍然是一种快速且方便的方式来删除主题(前提是已经启用了删除功能)。
6、发送消息
bin/kafka-console-producer.sh --broker list hadoop102:9092 --topic first
bin/kafka-console-producer.sh
:- 这是 Kafka 安装目录(
bin
子目录)下的一个脚本文件,用于启动一个 Kafka 控制台生产者(console producer)。控制台生产者允许用户从命令行向 Kafka 主题发送消息。
- 这是 Kafka 安装目录(
--broker-list hadoop102:9092
:- 这个参数指定了 Kafka 集群中一个或多个 broker(服务器)的地址和端口。在这个例子中,它指向了
hadoop102
这台机器上的 Kafka broker,监听的端口是9092
。需要注意的是,如果你有一个包含多个 broker 的 Kafka 集群,并且你想要提高生产者的容错性和负载均衡能力,你可以在这个参数中列出所有 broker 的地址和端口,用逗号分隔(例如:--broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092
)。
- 这个参数指定了 Kafka 集群中一个或多个 broker(服务器)的地址和端口。在这个例子中,它指向了
--topic first
:- 这个参数指定了生产者要发送消息的目标主题名称。在这个例子中,目标主题是
first
。
- 这个参数指定了生产者要发送消息的目标主题名称。在这个例子中,目标主题是
综上所述,这条命令的作用是启动一个 Kafka 控制台生产者,它连接到 Kafka 集群中位于 hadoop102:9092
的 broker,并向名为 first
的主题发送消息。用户可以在命令行中输入消息,每条消息按下回车键后都会被发送到 Kafka 主题中。这对于测试 Kafka 生产者、验证 Kafka 集群是否正常运行或进行简单的数据生产非常有用。
需要注意的是,这个命令会启动一个交互式会话,直到用户显式地终止它(例如,通过按下 Ctrl+C)。在会话期间,用户可以随时输入消息并发送到 Kafka 主题。
7、消费消息
老式方法(不推荐)
bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first
-
bin/kafka-console-consumer.sh
:这是 Kafka 安装目录(bin
子目录)下的一个脚本文件,用于启动一个 Kafka 控制台消费者(console consumer)。控制台消费者允许用户从命令行读取 Kafka 主题中的消息。 -
--zookeeper hadoop102:2181
:这个参数指定了 ZooKeeper 服务器的地址和端口。在较旧版本的 Kafka 中,消费者通过 ZooKeeper 来发现 Kafka broker 的信息。但是,随着 Kafka 的发展,这种方式已经被认为是不够高效和现代的,因为它增加了 ZooKeeper 的负载,并且不是 Kafka 消费者 API 的直接用法。 -
--topic first
:这个参数指定了消费者要读取消息的主题名称。在这个例子中,消费者将读取名为first
的主题中的消息。
新版方法(推荐)
bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --topic first
-
bin/kafka-console-consumer.sh
:这是 Kafka 安装目录(bin
子目录)下的一个脚本文件,用于启动 Kafka 的控制台消费者程序。控制台消费者提供了一个简单的命令行界面,允许用户实时查看从 Kafka 主题中接收到的消息。 -
--bootstrap-server hadoop102:9092
:这个参数指定了 Kafka 集群中的一个或多个 broker(服务器)的地址和端口,用于建立初始连接。在这个例子中,它指向了hadoop102
这台机器上的 Kafka broker,该 broker 监听的端口是9092
。控制台消费者将使用这个地址来连接到 Kafka 集群,并从那里获取主题的信息和消息。如果 Kafka 集群中有多个 broker,可以在这个参数中列出它们,用逗号分隔,但通常只需要提供一个或多个可用于发现集群中其他 broker 的地址即可。 -
--topic first
:这个参数指定了消费者要读取消息的目标主题名称。在这个例子中,目标主题是first
。控制台消费者将连接到 Kafka 集群,并订阅该主题,然后不断从该主题中读取消息,并将它们输出到控制台。
注意事项
-
默认情况下,控制台消费者会读取主题中从最新消息开始的部分(即消费者启动后发送到主题的消息)。如果你想要从主题的起始位置开始读取消息,可以添加
--from-beginning
参数。 -
如果没有指定消费者组(通过
--group
参数),Kafka 会为控制台消费者自动分配一个唯一的消费者组 ID。在生产环境中,通常建议显式指定消费者组,以便能够控制和管理消费者实例的行为。 -
消费者将不断地从 Kafka 主题中读取消息,直到它被显式地终止(例如,通过按下 Ctrl+C)。
-
如果 Kafka 集群中有多个分区和多个副本,控制台消费者将只连接到它所能发现的分区领导者(leader),并从那里读取消息。Kafka 的内部机制确保了消息的可靠传递和容错性。
-
这条命令是在 Kafka 0.10.x 及更高版本中推荐使用的,因为它避免了直接通过 ZooKeeper 与 Kafka 集群进行交互,从而减少了 ZooKeeper 的负载,并提高了整体的性能和可靠性。
读取全部消息
bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first
-
bin/kafka-console-consumer.sh
:这是 Kafka 安装目录(bin
子目录)下的一个脚本文件,用于启动 Kafka 的控制台消费者程序。控制台消费者提供了一个简单的命令行界面,允许用户实时查看从 Kafka 主题中接收到的消息。 -
--bootstrap-server hadoop102:9092
:这个参数指定了 Kafka 集群中用于建立初始连接的 broker(服务器)的地址和端口。在这个例子中,它指向了hadoop102
这台机器上的 Kafka broker,该 broker 监听的端口是9092
。控制台消费者将使用这个地址来连接到 Kafka 集群,并从那里获取主题的信息和消息。 -
--from-beginning
:这个参数指示控制台消费者从主题的起始位置(即最早的消息)开始读取消息。默认情况下,如果不指定这个参数,消费者将从它连接时主题的当前偏移量(offset)开始读取消息,这通常是主题中最新的消息。使用--from-beginning
参数可以确保消费者能够读取到主题中的所有历史消息。 -
--topic first
:这个参数指定了消费者要读取消息的目标主题名称。在这个例子中,目标是名为first
的主题。控制台消费者将连接到 Kafka 集群,并订阅该主题,然后从该主题的起始位置开始不断读取消息,并将它们输出到控制台。
注意事项
-
当使用
--from-beginning
参数时,如果主题中包含大量的历史消息,消费者可能需要一些时间才能追上最新的消息。这取决于消费者的处理能力、Kafka 集群的性能以及网络条件。 -
如果 Kafka 集群中有多个分区,控制台消费者将并行地从所有分区的起始位置开始读取消息。每个分区都会独立地维护其消息的顺序和偏移量。
-
这条命令是在 Kafka 0.10.x 及更高版本中推荐使用的,因为它避免了直接通过 ZooKeeper 与 Kafka 集群进行交互,从而减少了 ZooKeeper 的负载,并提高了整体的性能和可靠性。
-
在生产环境中,控制台消费者通常用于调试和监控目的,而不是作为长期运行的生产级消费者。对于生产级应用,建议使用 Kafka 客户端库(如 Kafka Java API)来编写自定义的消费者应用。
6、查看某个 Topic 的详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
-
bin/kafka-topics.sh
:这是 Kafka 安装目录(bin
子目录)下的一个脚本文件,用于管理和查询 Kafka 主题。通过这个脚本,用户可以创建、列出、修改或删除 Kafka 主题,以及查询主题的详细信息。 -
--zookeeper hadoop102:2181
:这个参数指定了 ZooKeeper 服务器的地址和端口。在 Kafka 0.10.x 之前的版本中,Kafka 客户端和工具(如kafka-topics.sh
)经常需要通过 ZooKeeper 来与 Kafka 集群交互,因为 ZooKeeper 被用作 Kafka 集群的元数据存储和协调服务。在这个例子中,它指向了hadoop102
这台机器上的 ZooKeeper 服务器,该服务器监听的端口是2181
。 -
--describe
:这个参数告诉kafka-topics.sh
脚本要执行的操作是描述(describe)一个或多个主题的详细信息。这包括主题的分区数、每个分区的副本因子、每个副本所在的 broker 的 ID、每个分区的 leader 副本和 ISR(In-Sync Replicas)列表等。 -
--topic first
:这个参数指定了要查询详细信息的主题名称。在这个例子中,要查询的主题是first
。
输出示例
执行这个命令后,你将会看到类似下面的输出(输出内容会根据实际的 Kafka 集群和主题配置而有所不同):
Topic:first PartitionCount:3 ReplicationFactor:2 Configs:
Topic: first Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
这个输出显示了 first
主题的详细信息,包括:
- PartitionCount:主题的分区数(在这个例子中是 3)。
- ReplicationFactor:每个分区的副本因子(在这个例子中是 2),即每个分区有多少个副本。
- Configs:与主题相关的配置(在这个例子中没有显示额外的配置)。
接下来,对于每个分区,输出还显示了:
- Leader:当前作为该分区 leader 的 broker 的 ID。
- Replicas:该分区所有副本所在的 broker 的 ID 列表(按照分配的顺序)。
- Isr:当前与该分区保持同步的副本(ISR)所在的 broker 的 ID 列表。ISR 是指那些能够跟上 leader 副本的副本集合。
注意
- 尽管
--zookeeper
参数在较新版本的 Kafka 工具中可能不再是首选(因为 Kafka 客户端现在更倾向于直接与 Kafka broker 通信),但在某些管理和查询操作中,它仍然是有用的,特别是当你需要快速获取关于 Kafka 集群和主题的元数据时。 - 随着 Kafka 的发展,官方文档和最佳实践可能会发生变化,因此建议查阅最新的 Kafka 文档以获取最准确的信息。
7、修改分区数
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
-
bin/kafka-topics.sh
:这是 Kafka 安装目录(bin
子目录)下的一个脚本文件,用于管理和查询 Kafka 主题。通过这个脚本,用户可以创建、列出、修改或删除 Kafka 主题。 -
--zookeeper hadoop102:2181
:这个参数指定了 ZooKeeper 服务器的地址和端口。在 Kafka 0.10.x 之前的版本中,Kafka 客户端和工具经常需要通过 ZooKeeper 来与 Kafka 集群交互,因为 ZooKeeper 被用作 Kafka 集群的元数据存储和协调服务。然而,从 Kafka 0.11.0.0 开始,Kafka 引入了对 Kafka AdminClient API 的支持,允许用户直接与 Kafka broker 交互以执行管理操作,而无需通过 ZooKeeper。不过,kafka-topics.sh
脚本仍然支持通过 ZooKeeper 进行操作,特别是在需要修改主题配置或元数据的场景中。在这个例子中,它指向了hadoop102
这台机器上的 ZooKeeper 服务器,该服务器监听的端口是2181
。 -
--alter
:这个参数告诉kafka-topics.sh
脚本要执行的操作是修改(alter)一个已存在的主题。 -
--topic first
:这个参数指定了要修改的主题名称。在这个例子中,要修改的主题是first
。 -
--partitions 6
:这个参数指定了要将主题的分区数修改为 6。请注意,增加主题的分区数是一个“扩展”操作,它不会删除现有的数据,而是将现有的数据分布到更多的分区中(在某些情况下,这可能需要重新平衡分区的数据)。然而,减少主题的分区数则是一个风险较高的操作,因为它可能会导致数据丢失(Kafka 不支持直接减少分区数)。
注意事项
-
在执行这个命令之前,请确保 Kafka 集群中的 broker 能够处理额外的分区。这包括有足够的磁盘空间来存储新的分区数据,以及足够的处理能力来管理更多的分区。
-
增加分区数通常是为了提高主题的吞吐量或并行处理能力。然而,它并不会自动改善现有数据的读取性能,因为 Kafka 消费者通常需要根据分区来并行读取数据。
-
如果 Kafka 集群配置了自动主题创建(即
auto.create.topics.enable
设置为true
),则当生产者向不存在的主题发送消息时,Kafka 会自动创建该主题,并使用默认的分区数和副本因子。但是,一旦主题被创建,就不能直接通过自动创建机制来修改其分区数。相反,需要使用kafka-topics.sh
脚本或 Kafka AdminClient API 来手动修改。 -
随着 Kafka 的发展,官方可能会引入更高级的工具或 API 来管理 Kafka 主题和元数据。因此,建议查阅最新的 Kafka 文档以获取最准确的信息和最佳实践。