下面聊聊Kafka常用命令
1、Topic管理命令
以topic:test_1为例
1.1、创建topic
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test_1
参数说明:
- –bootstrap-server:用来指定Kafka服务的连接,若有该参数,则–zookeeper参数可以不需要;
- –zookeeper:废弃,通过zk的方式连接到Kafka集群;
- –replication-factor:用来指定副本数量,这里需要注意的是不能大于broker的数量,不提供的话使用集群默认值;
- –partitions:用来指定分区数量,当创建或修改topic时用来指定分区数,若创建时没提供该参数,则使用集群默认值,这里需要注意的是不能比之前的小,不然会报错;
1.2、topic扩容分区
将分区从3个扩容到4个
# kafka旧版本使用zk方式,不推荐使用
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_1 --replication-factor 3 --partitions 4
# kafka2.2版本以后使用broker_host:port方式,推荐使用,注意topic一旦创建,partition只能增加不能减少
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test_1 --replication-factor 3 --partitions 4
1.3、删除topic
# 指定topic进行删除
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test_1
# 支持以正则表达式匹配方式进行删除,比如删除以test_开头的topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "test_.*"
1.4、查看topic列表
# 查看所有topic列表
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal
# 查看正则匹配的topic列表
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --exclude-internal --topic "test_.*"
参数说明:
- –exclude-internal:排除kafka内部的topic,如_consumer_offsets-*
- –topic:支持正则匹配topic
1.5、查看topic详情
# 查看单个topic,排除内部topic
./bin/kafka-topics.sh --topic test_1 --bootstrap-server localhost:9092 --describe --exclude-internal
1.6、查看topic message数量
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_1 -time -1
参数说明:
- -time-1:表示要获取指定topic所有分区当前的最大位移;
2、Consumer管理命令
2.1、查看consumer group列表
./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
2.2、查看指定group.id的消费情况
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --describe
2.3、删除group
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --delete
2.4、重置offset
# 查看当前group是否为active状态,若是active状态则不能修改
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --describe
# 重置
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --reset-offsets -to-offset 100 --topic test_1 --execute
# 导出offset
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_1 --reset-offsets -to-offset 100 --topic test_1 --export > offset.log
3、Kafka操作命令
3.1、启动kafka
# 启动kafka命令
./bin/kafka-server-start.sh config/server.properties &
# 停止kafka服务
./bin/kafka-server-stop.sh config/server.properties &
3.2、发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1
3.3、启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_1 --from-beginning