kafka 依赖于 Zookeeper
1. Zookeeper 本地模式安装
修改配置文件
解压后的目录中的 conf 路径下,将文件 zoo_sample.cfg 修改为 zoo.cfg。
mv zoo_sample.cfg zoo.cfg
打开 zoo.cfg 文件,修改 dataDir 路径。
dataDir 路径 默认在 /tmp 下,这个目录是 Linux 系统的临时目录,会定期自动删除
vim /home/apache-zookeeper-3.7.1-bin/conf/zoo.cfg
更改以下参数
dataDir=/home/apache-zookeeper-3.7.1-bin/zkData
在相应目录下 创建文件夹
mkdir zkData
zoo.cfg 配置文件说明
- 通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒。
tickTime=2000
- Leader和Follower初始连接时能容忍的最多心跳数,单位次(即tickTime的数量)。
initLimit=10
- Leader和Follower连接之后,通信时能容忍的最多心跳数,单位次。时间如果超过syncLimit * tickTime,Leader认为Follwer挂掉,从服务器列表中删除Follwer。
syncLimit=5
- 保存Zookeeper中的数据的目录。
dataDir=/home/apache-zookeeper-3.7.1-bin/zkData
- 客户端连接端口,通常不做修改。
clientPort=2181
Zookeeper 启动
- 启动服务端
bin/zkServer.sh start
可以通过 jps 查看进程 QuorumPeerMain
就是 Zookeeper 服务端的进程。查看状态 bin/zkServer.sh status
- 启动客户端
bin/zkCli.sh
- 退出客户端
quit
- 停止 Zookeeper 服务端
bin/zkServer.sh stop
参考: https://blog.csdn.net/qq_39516106/article/details/119796922
2. kafka 安装
配置文件
进入目录 /config/
vim server.properties
更改以下参数
log.dirs=/home/kafka_2.12-3.4.1/kafka-logs
host.name=192.168.78.142
socket.request.max.bytes=1258291200
server.properties
配置说明
#####System######
#唯一标识在集群中的ID,要求是正数。
broker.id=0
#服务端口,默认9092
port=9092
#监听地址,不设为所有地址
host.name=debugo01
#socket的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=1048576
#socket的接收缓冲区 (SO_RCVBUF)
socket.receive.buffer.bytes=1048576
#socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
socket.request.max.bytes = 104857600
#####Topic ######
#####ZooKeeper ######
#Zookeeper quorum设置。如果有多个使用逗号分割
zookeeper.connect=debugo01:2181,debugo02,debugo03
#连接zk的超时时间
zookeeper.connection.timeout.ms=1000000
#ZooKeeper集群中leader和follower之间的同步实际
zookeeper.sync.time.ms = 2000
####Log #########
#日志存放目录,多个目录使用逗号分割
log.dirs=/var/log/kafka
其他参数 参考 https://zhuanlan.zhihu.com/p/103915575
启动 kafka
启动 kafka之前需要先启动 Zookeeper
- 启动kafka服务端命令
bin/kafka-server-start.sh config/server.properties
不要用浏览器访问 9092端口(不是这么用的, 会报错 InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600))
- 创建生产者 topic 和 消费者 topic
- 在一个终端执行创建生产者: (推消息到wd_test)
进入kafka目录 bin
./kafka-console-producer.sh --broker-list 192.168.23.31:9092 --topic wd_test #(注:wd_test你要建立的topic名)
- 在另一个终端执行创建消费者:(从wd_test上消费消息)
进入kafka目录 bin
./kafka-console-consumer.sh --bootstrap-server 192.168.23.31:9092 --topic wd_test #消费wd_test的topic消
参考链接:https://blog.csdn.net/weixin_42109071/article/details/107564094
可以通过jps 查看 kafka和zookeeper运行情况