Apache Kafka (KRaft 集群)
Apache Kafka 是一个基于 TCP 的分布式流处理平台,提供高吞吐量、低延迟的消息传递和处理能力,用于构建实时数据管道和流应用程序。其底层通信依赖于 TCP Socket,但 Kafka 封装了许多高级特性,使其更加适合构建复杂的数据管道和流处理系统。
Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群,本文讲解 Kafka KRaft 模式集群搭建。
本文使用三台服务器部署集群,192.168.3.191
、192.168.3.192
、192.168.3.193
。
kafka部署(Ubuntu)
三台服务器都需进行部署
更新包列表和安装依赖
Kafka依赖于Java,因此首先需要安装Java环境。
sudo apt update
sudo apt install openjdk-11-jdk -y
开放端口
kafka 需要开放 9092 端口和 9093 端口,3台机器上都需开放 9092 和 9093 端口。
- 开放9092 端口
sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent
- 开放9093 端口
sudo firewall-cmd --zone=public --add-port=9093/tcp --permanent
- 更新防火墙规则(无需断开连接,动态添加规则)
sudo firewall-cmd --reload
- 查看开放端口
sudo firewall-cmd --zone=public --list-ports
下载和解压Kafka
访问Apache Kafka下载页面,获取最新版本的下载链接。
注意需要使用预编译的Kafka二进制文件,而不是源文件(src)
本文我们下载的是kafka-3.6.2版本。
wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
tar -xzf kafka_2.12-3.6.2.tgz
sudo mv kafka_2.12-3.6.2 /usr/local/kafka
配置kafka
进入配置目录:
cd /usr/local/kafka/config/kraft
编辑配置文件:
sudo vi server.properties
配置说明:
node.id
:kafka的broker节点id。
controller.quorum.voters
:配置的是 kafka 集群中的节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址。
advertised.listeners
:节点自己的监听地址。
1717119355943)
创建 KRaft 集群
生成集群id,在任意一个节点上执行就行,笔者使用 192.168.3.191 节点:
/usr/local/kafka/bin/kafka-storage.sh random-uuid
保存生成的字符串。
然后分别在3台机器上执行下面命令:
为方便执行命令,先回到 kafka安装目录
cd /usr/local/kafka
再执行命令,完成集群元数据配置
bin/kafka-storage.sh format -t [字符串] -c config/kraft/server.properties
启停Kafka
为方便执行命令,先回到 kafka安装目录:
cd /usr/local/kafka
启动kafka服务:
sudo bin/kafka-server-start.sh -daemon config/kraft/server.properties
验证Kafka是否启动成功,使用以下命令检查Kafka broker是否在监听端口9092:
netstat -tulnp | grep 9092
输出示例:
tcp6 0 0 :::9092 :::* LISTEN 1234/java
如果没有看到类似的输出,可能是Kafka没有正确启动或端口配置有误。
启动失败时可尝试的解决方案和注意事项:
- 方案:在server.properties 配置文件里面 找到 log.dirs 配置的路径,将该路径下的文件全部删除,重新创建KRaft集群,最后重启kafka。
- 注意:在server.properties 配置文件里面 找到 node.id 选项,确保每个节点的id不冲突。
- 注意:Controller中指定的那些节点,都要集群元数据配置后,再逐一启动kafka服务,否则启动失败。
停止kafka服务:
sudo bin/kafka-server-stop.sh
通信测试
可以通过 kafka-console-producer.sh
和 kafka-console-consumer.sh
两个kafka自带脚本进行测试。
创建测试主题:在任意一个服务器上创建一次,即可同步到集群
sudo /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.3.191:9092,192.168.3.192:9092,192.168.3.193:9092 --replication-factor 3 --partitions 1 --topic test-topic
列出主题:在集群上任意服务器上都可查看
sudo /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.3.191:9092,192.168.3.192:9092,192.168.3.193:9092
应当看到test-topic
,表示Kafka主题已经创建成功了。
生产消息: 使用 kafka-console-producer.sh
向 test-topic
发送消息。
sudo /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.3.191:9092,192.168.3.192:9092,192.168.3.193:9092 --topic test-topic
运行上述命令后,命令行会等待输入。在这里输入一些消息并按下回车发送,例如:
Hello, Kafka!
This is a test message.
消费消息: 使用 kafka-console-consumer.sh
从 test-topic
消费消息,验证是否能够收到刚才发送的消息。
sudo /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.191:9092,192.168.3.192:9092,192.168.3.193:9092 --topic test-topic --from-beginning
如果 Kafka 集群搭建成功并且生产和消费消息正常,会在控制台上看到刚才发送的消息。