1.下载Kafka
有2个下载网站都可以:
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.4.0/kafka_2.13-3.4.0.tgz
https://kafka.apache.org/downloads
下载完后解压缩:
[root@test ~]# tar -xzf kafka-3.4.0-src.tgz
[root@test ~]# cd kafka_2.12-3.4.0/
[root@test kafka_2.12-3.4.0]#
2.启动KAFKA
前提:启动kafka之前,先要启动zookeeper,启动zookeeper前,先要保证本地环境必须安装Java 8+
[root@test kafka_2.12-3.4.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
[root@test ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 1484292 root 122u IPv6 9693917 0t0 TCP *:eforward (LISTEN)
[root@test ~]# jps
1484292 QuorumPeerMain
1484944 Jps
[root@test ~]#
可以看到 zk 已经监听再2181端口,可以看到启动后有个 QuorumPeerMain,它就是 Zookeeper 集群的启动入口类,是用来加载配置启动QuorumPeer 线程
3.启动 Kafka
如果已经启动了Zookeeper服务,则直接运行下面的命令来启动或者停止Kafka服务
#启动 kafka 服务命令:
bin/kafka-server-start.sh config/server.properties &
#停止 kafka 服务则运行下面命令:
bin/kafka-server-stop.sh config/server.properties
[root@test config]# lsof -i:9092 # 查看9092端口是否被监听,来验证kafka是否正常启动
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 1485365 root 128u IPv6 9701151 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 1485365 root 141u IPv6 9701162 0t0 TCP test:57344->test:XmlIpcRegSvc (ESTABLISHED)
java 1485365 root 142u IPv6 9701163 0t0 TCP test:XmlIpcRegSvc->test:57344 (ESTABLISHED)
[root@test config]#
查看kafka参数:
[root@test config]# pwd
/root/kafka_2.12-3.4.0/config
[root@test config]# cd /root/kafka_2.12-3.4.0/config
[root@test config]# cat server.properties |grep '^[a-z]'
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
[root@test config]#
4.示例测试
首先创建一个名为 message 的 test1,只使用单个分区和一个副本。
4.1.创建一个名为test1的Topic 命令:
kafka3.0之前命令:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test1
kafka3.0后命令:
bin/kafka-topics.sh --create --topic test1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
4.2.查看Topic命令:
kafka3.0之前命令:
bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka3.0后命令:
bin/kafka-topics.sh --describe --topic test1 --bootstrap-server localhost:9092
Topic: test1 TopicId: fmqvg2PKTOKG_G1cQcLiIw PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
4.3.发送消息命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>
>
>s
>ll
>1
>2
>3
>4
>5
>hello kafka
>
4.4.打开另一个终端进行消费:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
s
ll
1
2
3
4
5
hello kafka
删除本地 Kafka 环境数据
rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
至此,简单安装完成。