文章目录
- kafka安装
- 1 上传安装包
- 2 解压安装包
- 3 创建logs文件夹
- 4 修改配置文件
- 5 分发kafka
- 6 启动kafka
- kafka使用
- 1 启动kafka
- 2 关闭kafka
- 3 查看topic
- 4 创建topic,名称为test
- 5 删除名称为test的topic
- 6 向topic发送数据
- 7 从topic里消费数据
kafka安装
kafka安装前需要确认zookeeper已安装
1 上传安装包
将安装包kafka_2.11-2.4.1.tgz上传至/opt/install_packages
2 解压安装包
tar -zxvf /opt/install_packages/kafka_2.11-2.4.1.tgz -C /opt/softs/
3 创建logs文件夹
cd /opt/softs/kafka_2.11-2.4.1
mkdir logs
4 修改配置文件
cd config/
##更名配置文件
mv server.properties server.properties_bak
vim server.properties
输入如下内容
#broker 的全局唯一编号,不能重复(根据实际修改)
broker.id=1
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径(根据实际修改)
log.dirs=/opt/softs/kafka_2.11-2.4.1/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
log.retention.hours=168
#配置连接Zookeeper集群地址(根据实际修改)
zookeeper.connect=bigdata003:2181,bigdata004:2181,bigdata005:2181
5 分发kafka
##在bigdata003上执行
scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata004:/opt/softs/
scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata005:/opt/softs/
##在bigdata004上修改server.properties,将broker.id修改为2
##在bigdata005上修改server.properties,将broker.id修改为3
6 启动kafka
## 依次在bigdata003,bigdata004,bigdata005节点上启动kafka
cd /opt/softs/kafka_2.11-2.4.1/bin
./kafka-server-start.sh ../config/server.properties &
kafka使用
1 启动kafka
--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别启动kafka
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行启动命令
./kafka-server-start.sh ../config/server.properties &
2 关闭kafka
--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别关闭kafka
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行关闭命令
./kafka-server-stop.sh stop
3 查看topic
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行查看topic命令
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --list
4 创建topic,名称为test
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行创建topic的命令
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --create --replication-factor 3 --partitions 1 --topic test
选项说明:
--topic 定义topic名称
--replication-factor 定义topic的副本数,副本的概率和hdfs上的文件副本一致,相同的数据存储在不同的节点上一共多少份
--partitions 定义topic的分区数
5 删除名称为test的topic
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行删除topic的命令
-- 删除topic需要在kafka目录下config目录中的server.properties中配置delete.topic.enable=true,如果不配置而去删除topic只会对topic进行标记删除
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --delete --topic test
6 向topic发送数据
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行发送消息的命令
./kafka-console-producer.sh --broker-list bigdata03:9092,bigdata04:9092,bigdata05:9092 --topic test
选项说明
--broker-list 定义kafka集群的节点
7 从topic里消费数据
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行消费的命令
./kafka-console-consumer.sh --bootstrap-server bigdata03:9092,bigdata04:9092,bigdata05:9092 --from-beginning --topic test
选项说明
--bootstrap-server 定义kafka集群的节点
--from-beginning 定义消费方式为从头消费