前言
1、前面自己写了一篇关于各个环境各个模式的安装的文章,大家可以去看看 kafka各种环境安装(window,linux,docker,k8s),包含KRaft模式
2、使用版本 kafka_2.13-3.4.1
3、kafka验证方式,有两大类如下,文档内容在 kafka官方文档的 第七节 security,强烈建议大家去看下,不想看英文的可以翻译中文后看
- SSL ( 官方3.4.x SSL 文档链接)
- SASL ( 官方3.4.x SASL 文档链接)
4、而SASL 又细分如下 4 小类,这四种都可以使用
类型 | 说明 | 官方文档链接 |
---|---|---|
SASL/GSSAPI (Kerberos) | 使用的Kerberos认证,可以集成目录服务,比如AD。从Kafka0.9版本开始支持 | Kerberos |
SASL/PLAIN | 使用简单用户名和密码形式。从Kafka0.10版本开始支持,不支持动态增加账户和密码 | SASL/PLAIN |
SASL/SCRAM-SHA-256 | 主要解决PLAIN动态更新问题以及安全机制,从Kafka0.10.2开始支持 | SCRAM |
SASL/SCRAM-SHA-512 | 主要解决PLAIN动态更新问题以及安全机制,从Kafka0.10.2开始支持 | SCRAM |
SASL/OAUTHBEARER | 基于OAuth 2认证框架,从Kafka2.0版本开始支持 | OAUTHBEARER |
5、在后面指定java 实现的时候,可以去源码里面找对应的,如下
一、Linux 环境
1、采用 SASL/PLAIN
模式授权
2、在SASL/PLAIN
模式中 Kafka的SASL_SSL和SASL_PLAINTEXT是两种不同的安全协议,用于保护Kafka集群中的通信。它们提供了不同级别的安全性和身份验证选项:
-
SASL_SSL (Simple Authentication and Security Layer over SSL/TLS):这是Kafka的高度安全的传输协议。它结合了SSL/TLS(用于加密通信)和SASL(用于身份验证)来提供强大的安全性。使用SASL_SSL,Kafka客户端和服务器之间的通信将是加密的,并且需要经过身份验证才能建立连接。常见的身份验证机制包括GSSAPI(Kerberos)、PLAIN(用户名和密码)等。SASL_SSL是Kafka中最安全的选项,适用于敏感数据和合规性要求高的场景。
-
SASL_PLAINTEXT (Simple Authentication and Security Layer over plaintext):这是Kafka的另一种SASL支持方式,但不涉及加密。使用SASL_PLAINTEXT,身份验证是必需的,但通信不加密。这意味着数据在传输过程中是以明文形式传输的,因此对于保护数据隐私要求较低的场景或在内部网络中使用时,可以选择此选项。常见的身份验证机制也包括PLAIN(用户名和密码)等。
通常,SASL_SSL是更安全的选项,因为它不仅提供身份验证,还提供数据的加密,从而更好地保护了数据的隐私和完整性。但是,它的配置相对复杂,可能需要设置SSL/TLS证书和密钥以及身份验证机制。SASL_PLAINTEXT相对来说更容易配置,但数据在传输过程中不加密,可能不适用于对数据隐私有更高要求的场景。
你的选择应该根据你的具体安全需求来决定。在需要高度安全性的生产环境中,通常会选择SASL_SSL,而在开发和测试环境中,SASL_PLAINTEXT可能更为方便。无论选择哪种方式,都需要谨慎配置和管理Kafka的安全设置,以确保系统的安全性。
所以下面文章中的 SASL_PLAINTEXT 可以替换为 SASL_SSL,相应的配置可以改成如下(可以看官网的 SASL/PLAIN)这一节,采用的就是这种
3、下载后解压
tar -xzf kafka_2.13-3.4.1.tgz
cd kafka_2.13-3.4.1
1.1、Kafka with KRaft 单节点 授权配置
1.1.1、服务端
1.1.1.1、编写服务端授权文件
1、编写授权文件 kafka_server_jaas.conf
,此配置定义了两个用户(admin 和 client )。代理使用 KafkaServer 部分中的属性用户名和密码来启动与其他代理的连接。在此示例中,admin 是代理间通信的用户。属性集 user_用户名定义
是连接到代理的所有用户的密码,代理验证所有客户端连接。
# 因为我这里是使用 kraft 模式启动,所以,就把服务端的配置都放在这里了
cd /opt/kafka/kafka_2.13-3.4.1/config/kraft
# 创建文件内容如下
vim kafka_server_jaas.conf
### 末尾 分号一定不能忘记
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_client="client-secret";
};
1.1.1.2、编写服务端启动脚本
1、复制kafka服务端启动脚本
cd /opt/kafka/kafka_2.13-3.4.1/bin/
cp kafka-server-start.sh kafka-server-start-sasl.sh
2、修改我们copy的启动脚本,将我们前面将要创建的配置文件(kafka_jaas.conf
),给指定进去
cd /opt/kafka/kafka_2.13-3.4.1/bin
vim kafka-server-start-sasl.sh
# 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.4.1/config/kraft/kafka_server_jaas.conf"
1.1.1.3、修改服务端 配置文件 server.properties
1、我这边是启动的 kraft 模式,所以我就修改对应的 kraft 目录下的配置文件即可
# 进入kraft/config目录
cd /opt/kafka/kafka_2.13-3.4.1/config/kraft
# copy并编辑server.properties文件
cp server.properties server-sasl.properties
# 修改
vim server-sasl.properties
# 修改以下内容
###
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://192.168.173.129:9092
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
###
1.1.2、客户端
1.1.2.1、编写客户端授权文件
# 因为我这里是使用 kraft 模式启动,所以,就把客户端的配置都放在这里了
cd /opt/kafka/kafka_2.13-3.4.1/config/kraft
# 创建文件内容如下
vim kafka_client_jaas.conf
### 末尾 分号一定不能忘记
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client"
password="client-secret";
};
1.1.2.2、编写消费者启动脚本
cd /opt/kafka/kafka_2.13-3.4.1/bin
# copy 并修改
cp kafka-console-consumer.sh kafka-console-consumer-sasl.sh
# 修改,指定我们前面写的客户端配置文件
vim kafka-console-consumer-sasl.sh
# ★ 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.4.1/config/kraft/kafka_client_jaas.conf"
1.1.2.3、编写消费者启动脚本的配置文件 consumer.properties
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.4.1/config
# copy编辑consumer-sasl.properties内容
cp consumer.properties consumer-sasl.properties
vim consumer-sasl.properties
###
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
###
1.1.2.4、编写生产者启动脚本
cd /opt/kafka/kafka_2.13-3.4.1/bin
# copy 并修改
cp kafka-console-producer.sh kafka-console-producer-sasl.sh
# 修改,指定我们前面写的客户端配置文件
vim kafka-console-producer-sasl.sh
# ★ 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为:
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-3.4.1/config/kraft/kafka_client_jaas.conf"
1.1.2.3、编写生产者启动脚本的配置文件 producer.properties
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.4.1/config
# copy编辑consumer-sasl.properties内容
cp producer.properties producer-sasl.properties
vim producer-sasl.properties
###
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
###
1.1.1.5、修改通用命令脚本的配置文件
1、什么是通用命令脚本,比如说,创建topic的脚本,它链接kafka也是需要认证的,所以,我们为这一类没有指定配置的脚本,创建一个通用的
# 进入kafka/config目录
cd /opt/kafka/kafka_2.13-3.4.1/config/kraft
# 创建command_config文件
touch command_config
# 编辑command_config内容
vim command_config
### 千万注意 最后的分号 不能忘记了
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";
###
1.1.3、启动
1、完成上面的配置,那么我们就可以启动了,就是正常的 kraft 模式启动流程。
1.1.3.1、生成集群 UUID
# 进入到文件夹
cd /opt/kafka/kafka_2.13-3.4.1
# 创建 集群id
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# 查看集群id是多少
echo $KAFKA_CLUSTER_ID
1.1.3.2、格式化日志目录
1、使用上面的 KAFKA_CLUSTER_ID
参数,默认存储目录是/tmp/kraft-combined-logs
,你可以修改配置文件的值
注意这里使用的是 config/kraft/sasl.properties, 你可以点进去看下配置,可以看到,当前的这个配置的角色是 broker,controller了,就不再需要zookeeper了.
cd /opt/kafka/kafka_2.13-3.4.1
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-sasl.properties
2、可以看到了 /tmp/kraft-combined-logs
文件夹也存在了
1.1.3.3、启动kafka 服务
1、为了方便观察启动状态,这里就直接前台启动了
记得这里一定要用我们修改过后的脚本来启动
kafka-server-start-sasl.sh
,这个脚本里面指定认证文件
cd /opt/kafka/kafka_2.13-3.4.1
# 启动
bin/kafka-server-start-sasl.sh config/kraft/server-sasl.properties
# 后台启动 如下 加上 -daemon 即可
#bin/kafka-server-start-sasl.sh -daemon config/kraft/server-sasl.properties
1.1.4、链接测试
1.1.3.1、不使用认证的方式脚本命令行(服务端会提示无法链接)
1、我们先不使用认证授权的文件链接试一下,会发现下面这三个都是无法访问的,可以看到对应的服务端输出的日志
2、通用脚本
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 查看当前服务器中的所有 topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
3、消费者脚本
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 消费者链接
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
4、生产者脚本
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 生产者链接
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
5、springBoot 项目,具体配置这里就不再细说了,链接之后,只要对kafak执行操作,就会如下错误,超时
1.1.3.2、使用认证的方式脚本命令行(链接成功)
1、通用脚本,第一开始因为这里还未创建过topic ,所以没有数据,后面可以再运行一下。
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 查看当前服务器中的所有 topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config /opt/kafka/kafka_2.13-3.4.1/config/kraft/command_config
# 创建topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test --command-config /opt/kafka/kafka_2.13-3.4.1/config/kraft/command_config
3、消费者脚本,执行完成后,页面会等待队列消息
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 消费者链接 --consumer.config 指定消费者配置文件
bin/kafka-console-consumer-sasl.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config config/consumer-sasl.properties
4、生产者脚本
# 进入目录
cd /opt/kafka/kafka_2.13-3.4.1
# 生产者链接 --producer.config 指定消费者配置文件
bin/kafka-console-producer-sasl.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config config/producer-sasl.properties
此刻,消费者控制台也收到消息了
5、springBoot 项目,增加账号密码,如下,后面的分号一定不能忘记,加上如下配置之后就可以了
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";'
结尾
1、kafka配置个账号密码… 确实有些麻烦
参考文章:
-
Authentication using SASL/PLAIN
-
【保姆式通关宝典】使用Kraft快速搭建Kafka集群(含服务鉴权)
-
kafka服务端设置用户和密码登录及springboot访问实现
-
Kafka配置用户名密码访问