Apache kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,是消息中间件的一种,用于构建实时数据管道和流应用程序。
Kafka官网:http://kafka.apache.org/
安装环境:
Kafka集群环境搭建,依赖于zookeeper环境,前面已经搭建好了。
可以参考之前的
Zookeeper 集群安装_俗尘某某的博客-CSDN博客
kafka集群
主机
|
IP
|
SoftWare
|
Port
|
OS
|
node1
|
192.168.230.128
|
kafka_2.13-3.3.1
|
9092
|
Centos 7
|
node2
|
192.168.230.129
|
kafka_2.13-3.3.1
|
9092
|
Centos 7
|
node3
|
192.168.230.130
|
kafka_2.13-3.3.1
|
9092
|
Centos 7
|
一、集群部署
1、下载kafka并解压(3台都执行)
tar -xf kafka_2.13-3.3.1.tgz -C /usr/local/
2、编辑配置文件
vim /usr/local/kafka_2.13-3.3.1/config/server.properties
# 每个broker在集群中的唯一标识,不能重复
broker.id=1
# 端口
port=9092
listeners=PLAINTEXT://192.168.230.128:9092 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://192.168.230.128:9092
# broker处理消息的线程数
num.network.threads=3
# broker处理磁盘io的线程数
num.io.threads=8
# socket发送数据缓冲区
socket.send.buffer.bytes=102400
# socket接收数据缓冲区
socket.receive.buffer.bytes=102400
# socket接收请求最大值
socket.request.max.bytes=104857600
# kafka数据日志文件存放位置
log.dirs=/usr/local/kafka_2.13-3.3.1/kafka-logs
# topic默认的分区数
num.partitions=1
# 恢复线程数
num.recovery.threads.per.data.dir=1
# 默认副本数
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 消息日志最大存储时间,这里是7天
log.retention.hours=168
# 每个日志分段文件大小,这里是1g
log.segment.bytes=1073741824
# 消息日志文件大小检查间隔时间
log.retention.check.interval.ms=300000
# zookeeper集群地址
zookeeper.connect=192.168.230.128:2181,192.168.230.129:2181,192.168.230.130:2181
#zookeeper.connect=zknode1:2181,zknode2:2181,zknode3:2181
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=6000
# 推迟初始消费者再平衡时间
group.initial.rebalance.delay.ms=0
特此说明:如果需要修改为主机名连接则需修改三台主机的hosts文件设置节点名称
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
broker.id 默认是0,node1配置为1,node2配置为2,node3配置为3
host.name 新增这个字段,node1配置为192.168.10.201,node2配置为192.168.10.202,node3配置为192.168.10.203
log.dirs 默认路径为/tmp/kafka-logs,修改为/usr/local/kafka_2.13-3.3.1/kafka-logs
zookeeper.connect 默认为localhost:2181,修改为zookeeper集群的地址192.168.230.128:2181,192.168.230.129:2181,192.168.230.130:2181
其他保持默认配置
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
3、启动kafka
(3个节点都启动)
#到bin目录下执行
cd /usr/local/kafka_2.13-3.3.1/bin/
#后台启动加参数-daemon ./kafka-server-start.sh -daemon ../config/server.properties
4、停止kafka
./kafka-server-stop.sh //不带任何参数即可
# kafka集群部署成功后,会在zookeeper集群里面写入数据,通过zkCli.sh或者zkui可以看见kafka生成的数据:
如果加了环境变量,可以执行下面命令:
kafka-server-start.sh /usr/local/kafka_2.13-3.3.1/config/server.properties
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
5. 配置环境变量 三台节点都配置/etc/profile文件
# 配置Kafka环境变量
export KAFKA_HOME=/usr/local/kafka_2.13-3.3.1
export PATH=$PATH:$KAFKA_HOME/bin
#让新环境变量生效
source /etc/profile
6. 配置防火墙
firewall-cmd --zone=public --add-port=2181/tcp --permanent #zk端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload
7. 创建主题
./kafka-topics.sh --create --bootstrap-server zknode1:9092 --replication-factor 1 --partitions 1 --topic iot
8. 查看主题
./kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --list
9. 删除主题
./kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
10. 订阅主题
./kafka-topics.sh --bootstrap-server zknode1:9092 --describe --topic iot
#./kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --describe --topic iot
11. 生产消息
./kafka-console-producer.sh --bootstrap-server zknode1:9092 -topic iot
#./kafka-console-producer.sh --bootstrap-server 192.168.230.128:9092 --topic iot
12. 消费消息
./kafka-console-consumer.sh --bootstrap-server zknode2:9092 --topic iot
#./kafka-console-consumer.sh --bootstrap-server 192.168.230.129:9092 --topic iot
#后续持续完善中