1.下载Kafka
有2个下载网站都可以:
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.4.0/kafka_2.13-3.4.0.tgz
 https://kafka.apache.org/downloads

 下载完后解压缩:
 [root@test ~]# tar -xzf kafka-3.4.0-src.tgz
 [root@test ~]# cd kafka_2.12-3.4.0/
 [root@test kafka_2.12-3.4.0]#
2.启动KAFKA
前提:启动kafka之前,先要启动zookeeper,启动zookeeper前,先要保证本地环境必须安装Java 8+
 [root@test kafka_2.12-3.4.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
 [root@test ~]# lsof -i:2181
 COMMAND     PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
 java    1484292 root  122u  IPv6 9693917      0t0  TCP *:eforward (LISTEN)
 [root@test ~]# jps
 1484292 QuorumPeerMain
 1484944 Jps
 [root@test ~]#
可以看到 zk 已经监听再2181端口,可以看到启动后有个 QuorumPeerMain,它就是 Zookeeper 集群的启动入口类,是用来加载配置启动QuorumPeer 线程
3.启动 Kafka
如果已经启动了Zookeeper服务,则直接运行下面的命令来启动或者停止Kafka服务
 #启动 kafka 服务命令:
 bin/kafka-server-start.sh config/server.properties &
 #停止 kafka 服务则运行下面命令:
 bin/kafka-server-stop.sh config/server.properties
[root@test config]# lsof -i:9092     # 查看9092端口是否被监听,来验证kafka是否正常启动
 COMMAND     PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
 java    1485365 root  128u  IPv6 9701151      0t0  TCP *:XmlIpcRegSvc (LISTEN)
 java    1485365 root  141u  IPv6 9701162      0t0  TCP test:57344->test:XmlIpcRegSvc (ESTABLISHED)
 java    1485365 root  142u  IPv6 9701163      0t0  TCP test:XmlIpcRegSvc->test:57344 (ESTABLISHED)
 [root@test config]#
查看kafka参数:
[root@test config]# pwd
 /root/kafka_2.12-3.4.0/config
 [root@test config]# cd /root/kafka_2.12-3.4.0/config
 [root@test config]# cat server.properties |grep '^[a-z]'
 broker.id=0
 num.network.threads=3
 num.io.threads=8
 socket.send.buffer.bytes=102400
 socket.receive.buffer.bytes=102400
 socket.request.max.bytes=104857600
 log.dirs=/tmp/kafka-logs
 num.partitions=1
 num.recovery.threads.per.data.dir=1
 offsets.topic.replication.factor=1
 transaction.state.log.replication.factor=1
 transaction.state.log.min.isr=1
 log.retention.hours=168
 log.retention.check.interval.ms=300000
 zookeeper.connect=localhost:2181
 zookeeper.connection.timeout.ms=18000
 group.initial.rebalance.delay.ms=0
 [root@test config]#
4.示例测试
首先创建一个名为 message 的 test1,只使用单个分区和一个副本。
4.1.创建一个名为test1的Topic 命令:
 kafka3.0之前命令:
 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test1
 kafka3.0后命令:
 bin/kafka-topics.sh --create --topic test1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
4.2.查看Topic命令:
 kafka3.0之前命令:
 bin/kafka-topics.sh --list --zookeeper localhost:2181
 kafka3.0后命令:
 bin/kafka-topics.sh --describe --topic test1 --bootstrap-server localhost:9092
 Topic: test1    TopicId: fmqvg2PKTOKG_G1cQcLiIw    PartitionCount: 3    ReplicationFactor: 1    Configs:
     Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
     Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
     Topic: test1    Partition: 2    Leader: 0    Replicas: 0    Isr: 0
4.3.发送消息命令:
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
 >
 >
 >s
 >ll
 >1
 >2
 >3
 >4
 >5
 >hello kafka
 >
 4.4.打开另一个终端进行消费:
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
 s
 ll
 1
 2
 3
 4
 5
 hello kafka
删除本地 Kafka 环境数据
rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
至此,简单安装完成。












![[Datawhale][CS224W]图机器学习(四)](https://img-blog.csdnimg.cn/img_convert/752329d821f32e26640657e79137d495.png)






