安装Kafka(需要JDK和Zookeeper):
- 下载Kafka安装包,并解压至node01节点中的/opt/apps目录下。
- 修改配置文件。在server.properties配置文件中指定broker编号、Kafka运行日志存放的路径、指定Zookeeper地址和本地IP。
- 添加环境变量。在/etc/profile文件中添加Kafka环境变量。
- 分发文件。将安装目录kafka及环境配置文件profile分发至node02、node03上,并修改broker.id和host.name。
启动Kafka服务
- 分别在三个节点上启动Zookeeper服务
[root@node01 ~]# zkServer.sh start
[root@node02 ~]# zkServer.sh start
[root@node03 ~]# zkServer.sh start
2. 分别在三个节点上启动Kafka服务(启动三个broker)
# 进入kafka根目录,执行下面的命令
[root@node01 kafka]# ./bin/kafka-server-start.sh config/server.properties
[root@node02 kafka]# ./bin/kafka-server-start.sh config/server.properties
[root@node01 kafka]# ./bin/kafka-server-start.sh config/server.properties
# 复制当前会话,用jps查看每个节点上运行的进程
基于命令行方式使用Kafka
创建Topic
# 创建Topic的语法格式
bin/kafka-topic.sh --bootstrap-server node100:9092 --create --topic topic-name --partitions integer --replication-factor integer
(1) 创建Topic (2 partitions & 2 replication-factor),并查看详细情况
(2) 创建Topic (3 partitions & 2 replication-factor),并查看详细情况
向主题中发送消息数据
[root@node01 ~]# kafka-console-producer.sh --broker-list node01:9092 --topic zueb
消费主题中的消息
[root@node02 kafka]# kafka-console-consumer.sh --bootstrap-server node01:9092 --topic zueb --partition 0 --from-beginning
[root@node02 kafka]# kafka-console-consumer.sh --bootstrap-server node01:9092 --topic zueb --partition 1 --from-beginning
[root@node02 kafka]# kafka-console-consumer.sh --bootstrap-server node01:9092 --topic zueb --from-beginning
使用–list查看所有主题
[root@node02 kafka]# kafka-topics.sh --list --bootstrap-server node01:9092,node02:9092,node03:9092
使用–delete 删除某个主题
[root@node02 bin]#kafka-topics.sh --delete --bootstrap-server node01:9092,node02:9092,node03:9092 --topic zueb
注意:如果当前主题正在使用,则无法删除。