kafka与zookeeper的SSL认证教程

news2025/1/17 15:42:30

作者 乐维社区(forum.lwops.cn)许远
在构建现代的分布式系统时,确保数据传输的安全性至关重要。Apache Kafka 和 Zookeeper 作为流行的分布式消息队列和协调服务,提供了SSL(Secure Sockets Layer)认证机制,以增强数据传输过程中的安全性。
本文将详细介绍从生成SSL证书到配置服务端和客户端的全过程,确保数据在传输过程中得到充分的保护。

一、配置Kafka账号密码:
1、首先,需要修改kafka配置文件:vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties

broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://10.176.31.137:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#需要开启设置超级管理员,设置visitor用户为超级管理员
super.users=User:visitor

2、其次,为server创建登录验证文件,可以根据自己爱好命名文件,如vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf ,文件内容如下

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};

3、然后修改kafka安装目录vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh,在文件最上面添加变量

export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
在这里插入图片描述
4、接下来为consumer和producer创建登录验证文件,可以根据爱好命名文件,如kafka_client_jaas.conf,文件内容如下(如果是程序访问,如springboot访问,可以不配置)
vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”;
};

5、在consumer.properties和producer.properties里分别加上如下配置:
vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties
vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

6、修改kafka安装目录bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh,在文件最上面添加变量
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"在这里插入图片描述
7、分别启动zookeeper和kafka,至此服务端kafka用户登录验证配置完成(先关闭kafka后关闭zookeeper)

关闭服务kafka
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

启动服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

关闭服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

8、创建及查看主题

/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list 10.176.31.137:9092 --topic cmdb --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
在这里插入图片描述
接收消息

/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 10.176.31.137:9092 --topic cmdb --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN在这里插入图片描述

二、zk和kafka配置ssal账号密码:

  1. Zookeeper 配置 SASL
    1.1 新建 zoo_jaas.conf 文件
    zoo_jaas.conf 文件名、文件所在路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目录下vim /asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin@12”
user_kafka=“kafka@123”;
};

    Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
    Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名

1.2 配置 /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 文件
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true

zookeeper.sasl.client 设置为 true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已

1.3 导入依赖包
因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,新建文件夹 zk_sasl_lib,如下:从kafka/lib目录下复制以下几个jar包到zookeeper的lib和新建的zk_sasl_lib目录下:

kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.jar
snappy-java-1.1.7.3.jar

mkdir /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib

chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/

1.4 修改 zkEnv.sh 文件/asop/zk/zookeeper-3.4.13/bin/zkEnv.sh
修改前:如果没有就直接加

export SERVER_JVMFLAGS=“-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS”

修改后:

for jar in /asop/zk/zookeeper-3.4.13/zk_sasl_lib/*.jar;
do
CLASSPATH=“ j a r : jar: jar:CLASSPATH”
done

export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf "

重启 Zookeeper 服务即可

关闭服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

  1. Kakfa 配置 SASL
    2.1 新建 kafka_server_jaas.conf 文件
    kafka_server_jaas.conf 文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目录下/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“kafka”
password=“kafka@123”;
};

    KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上

KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致
KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据
Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有,日志如下)

[2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/Users/wjun/env/kafka/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

2.2 修改 server.properties 文件
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.168.157.198:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#需要开启设置超级管理员,设置visitor用户为超级管理员
super.users=User:visitor

其中 localhost 需要修改成 IP地址

super.users 配置超级用户,该用户不受之后的 ACL 配置影响

2.3 修改启动脚本
修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh

修改前:

if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G”
fi

修改后:
(先在首行加这一行,如果有了就不用加了)export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf”
fi

设置zookeeper的ACL规则
/asop/zk/zookeeper-3.4.13/bin/zkCli.sh #进入zk的命令行模式

addauth digest admin:admin@12 #切换登陆用户(超级管理员是在zk的配置文件/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf里面)

setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa #(设置可以登陆的IP和用户账号密码,admin是上面的zk的配置文件里面定义的管理员,Kafka用户是/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf文件里面的定义的kafka连接zk的用户 (Client下面的))

addauth digest kafka:kafka@123 #再切换为kafka用户再设置一次acl
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa

注意:如果要加白名单IP或者用户要在原来的基础上加,不然会覆盖
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa,auth:admin:admin@12:cdrwa,ip:1.1.1.1

需要恢复权限,不设置acl的话就运行
setAcl / world:anyone:cdrwa

重启 kafka 服务即可

关闭服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

启动服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties

至此,完成kafka与zookeeper配置ssl认证。更多运维技巧欢迎关注乐维社区,更多运维问题也欢迎留言提问。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920370.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在 PostgreSQL 里如何实现数据的缓存失效策略的优化?

文章目录 《在 PostgreSQL 中优化数据缓存失效策略》一、理解 PostgreSQL 中的数据缓存二、常见的数据缓存失效策略三、优化数据缓存失效策略的方法(一)合理调整共享缓冲区大小(二)使用 PostgreSQL 的缓存统计信息(三&…

SSE(Server-Send-Event)服务端推送数据技术

SSE(Server-Send-Event)服务端推送数据技术 大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。 客户端轮询更新数据。服务端与客户端建立 Socket 连接双向通信服务端与客户建立 SSE 连接单向通信 几种方案的比较&…

实变函数精解【3】

文章目录 点集求导集 闭集参考文献 点集 求导集 例1 E { 1 / n 1 / m : n , m ∈ N } 1. lim ⁡ n → ∞ ( 1 / n 1 / m ) 1 / m 2. lim ⁡ n , m → ∞ ( 1 / n 1 / m ) 0 3. E ′ { 0 , 1 , 1 / 2 , 1 / 3 , . . . . } E\{1/n1/m:n,m \in N\} \\1.\lim_{n \rightar…

Spark SQL 概述

Spark SQL 概述 Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 …

i18n、L10n、G11N 和 T9N 的含义

注:机翻,未校对。 Looking into localization for the first time can be terrifying, if only due to all of the abbreviations. But the meaning of i18n, L10n, G11N, and T9N, are all very easy to understand. 第一次研究本地化可能会很可怕&…

Git 删除包含敏感数据的历史记录及敏感文件

环境 Windows 10 Git 2.41.0 首先备份你需要删除的文件(如果还需要的话),因为命令会将本地也删除将项目中修改的内容撤回或直接提交到仓库中(有修改内容无法提交) 会提示Cannot rewrite branches: You have unstaged …

给后台写了一个优雅的自定义风格的数据日志上报页面

highlight: atelier-cave-dark 查看后台数据日志是非常常见的场景,经常看到后台的小伙伴从服务器日志复制一段json数据字符串,然后找一个JSON工具网页打开,在线JSON格式化校验。有的时候,一些业务需要展示mqtt或者socket的实时信息展示,如果不做任何修改直接展示一串字符…

音频语言学习领域数据集现状、分类及评估

Audio Language Learning (Audio-Text Learning) 是一个新兴的研究领域,专注于处理、理解和描述声音。它的发展动力是机器学习技术的进步以及越来越多地将声音与其相应的文本描述相结合的数据集的可用性。 Audio Language Models (ALMs) 是这个领域的关键技术&#…

零基础学python(二)

1. 字典 字典的创建 最常见的创建方式: d {"k1": "v1", "k2": "v2"} 再向字典中添加一对键值: d["k3"] "v3" 字典还可以用dict()创建,这种方式中,键是不加引…

嵌入式工程师从0开始,到底该学什么,怎么学?

作为嵌入式工程师,从零开始学习需要掌握以下几个关键方面。我收集归类了一份嵌入式学习包,对于新手而言简直不要太棒,里面包括了新手各个时期的学习方向编程教学、问题视频讲解、毕设800套和语言类教学,敲个22就可以免费获得。 基…

WPS打开PDF文件的目录

WPS打开PDF文件的目录 其实WPS中PDF文件并没有像Word那样标准的目录,但是倒是有书签,和目录一个效果 点击左上角书签选项,或者使用Alt Shift 1快捷键即可

java解决实例问题--拿硬币堆

题目🎊 编程梦想家(大学生版)-CSDN博客 桌上有 n 堆力扣币,每堆的数量保存在数组 coins 中。我们每次可以选择任意一堆,拿走其中的一枚或者两枚,求拿完所有力扣币的最少次数。 ❤ 这个问题实际上是一个贪…

【简历】西安某211大学研究生:Java简历面试通过率低

注:为保证用户信息安全,姓名和学校等信息已经进行同层次变更,内容部分细节也进行了部分隐藏 简历说明 这个同学是211研究生的一份Java简历,这个简历版面没有问题,但是因为主项目重复度过大,所以导致这个简历的简历通过率会大大降低,面试通过…

《Windows API每日一练》9.2.1 菜单

■和菜单有关的概念 窗口的菜单栏紧挨着标题栏下面显示。这个菜单栏有时叫作程序的“主菜单”或“顶级菜单“(top-level menu)。顶级菜单中的菜单项通常会激活下拉菜单(drop-downmenu),也 叫“弹出菜单”(…

头歌资源库(25)地图着色

一、 问题描述 任何平面区域图都可以用四种颜色着色,使相邻区域颜色互异。这就是四色定理。要求给定区域图,排出全部可能的着色方案。例如,区域图如下图所示: 要求用四种颜色着色。 则输入: 10 4 (分别表示…

什么是敏捷本地化

快速、敏捷的多语言产品和服务交付正逐渐成为众多行业的常态。在这种情况下,重点从传统的期望(即在合理的时间框架内翻译大量内容)转变为翻译工作量非常大的小片段,通常在2-3到12-24小时之间,通常在周末或假期。 Logr…

如何做好漏洞扫描工作提高网络安全

在数字化浪潮席卷全球的今天,企业数字化转型已成为提升竞争力、实现可持续发展的关键路径。然而,这一转型过程并非坦途,其中网络安全问题如同暗礁般潜伏,稍有不慎便可能引发数据泄露、服务中断乃至品牌信誉受损等严重后果。因此&a…

usbserver工程师手记(三)手工开通 OTP功能

1、设定密钥,用户自行选择一个密钥,以下以密钥为 EAZAYOKNGETBOPC5 为例说明 2、usb server 配置otp 密钥,目前还没有UI 界面开通,后续版本会支持从管理界面开通 curl -X POST -H Content-Type: application/json -H Accept: app…

mysql高可用解决方案:MHA原理及实现

MHA:Master High Availability。对主节点进行监控,可实现自动故障转移至其它从节点;通过提升某一从节点为新的主节点,基于主从复制实现,还需要客户端配合实现,目前MHA主要支持一主多从的架构,要…

应力平衡方程的推导

应力平衡方程的推导 对于一点,已知其应力状态有: σ x , τ x y , τ x z \sigma_x,\tau_{xy},\tau_{xz} σx​,τxy​,τxz​ 则其附近点的应力状态为: σ x ∂ σ x ∂ x d x , τ x y ∂ τ x y ∂ x d x , τ x z ∂ τ x z ∂ x d …