一、topic命令
下面Windows
命令需要把cmd
路径切换到bin/windows
下。
而Linux
命令只需要在控制台切换到bin
目录下即可。
下面都以Windows
下的操作为例,在Linux
下也是一样的。
1.1 查看主题命令的参数
kafka-topics.bat # Windows
kafka-topics.sh # Linux
输入以上命令就可以看到主题命令可以附加哪些参数来执行,参数有很多,这里归纳几个常用的:
参数 | 说明 |
---|---|
–bootstrap-server | 连接的 Kafka Broker 主机名称和端口号。 |
–topic | 操作的 topic 名称。 |
–create | 创建主题。 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题。 |
–describe | 查看主题详细描述。 |
–partitions <Integer: # of partitions> | 设置分区数。 |
–replication-factor <Integer: replication factor> | 设置分区副本。 |
–config <String: name=value> | 更新系统默认的配置。 |
1.2 查看当前服务器中的所有 topic
kafka-topics.bat --bootstrap-server localhost:9092 --list
Kafka默认在9092端口上运行
1.3 创建名为test的topic
kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test
-
--partitions
:指定分区数,参数要根据broker
数和数据量决定,有几个broker
则可以指定几个分区 -
--replication-factor
: 指定副本数,也有根据有几个broker
来决定 -
--topic
:指定topic
名字
1.4 查看 test主题的详情
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test
这里主要注意后面第二排参数的意义即可。
Topic:test
:主体的名称是test
。
Partition
:分区编号是0,代表只有一个分区,编号从0开始
Leader
:每个分区多个副本的主节点编号,这个在集群中才会是其它值,这里是单机模式,值等于唯一副本编号
Replicas
:副本编号,也是从0开始。
1.5 修改分区数
kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic test --partitions 2
注意:分区数只能增加,不能减少
1.6 再次查看 test主题的详情
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test
1.7 删除 topic
kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic test
二、生产者命令
下面Windows
命令需要把cmd
路径切换到bin/windows
下。
2.1 查看生产者命令的参数
kafka-console-producer.bat
常用参数:
参数 | 说明 |
---|---|
–bootstrap-server | 连接的 Kafka Broker 主机名称和端口号。 |
–topic | 操作的 topic 名称。 |
2.2 发送消息
主要就是要说明要连接哪个Kafka
服务器或者集群。
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
此时光标处就可以输入数据,输入的数据会放入本机kafka
服务器test
这个主题当中。
三、消费者命令
3.1 查看消费者命令的参数
kafka-console-consuer.bat
常用参数:
参数 | 说明 |
---|---|
–bootstrap-server | 连接的 Kafka Broker 主机名称和端口号。 |
–topic | 操作的 topic 名称。 |
3.2 消费消息
1)消费 first 主题中的数据。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
2)把主题中所有的数据都读取出来(包括历史数据)
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic test
四、使用示例
这些示例都基于上面的主题test
来进行操作。
4.1 生产者生产一条数据
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
此时输入一个hello
数据然后回车,队列中就加入了一条数据。
4.2 创建一个消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
发现此时并没有消费到刚才的数据。
4.3 生产者再生产一个数据
就是在刚才的光标下继续输入一个数据,再回车即可。
此时再看消费者,发现消费者已经消费到了Kafka
这条数据。
这时候就发现了一个问题,消费者只能消费消费者出现之后的数据,之前的历史数据则不能直接被消费。
4.4 把主题中所有的数据都读取出来
如果想要读取历史数据,可以选择把主题中的数据全部读取出来。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic test
执行此命令后,再来新的数据,也会被继续消费,就和普通的消费者一样。