Apache zookeeper kafka 开启SASL安全认证

news2025/1/11 16:49:27

背景:我之前安装的kafka没有开启安全鉴权,在没有任何凭证的情况下都可以访问kafka。搜了一圈资料,发现有关于sasl、acl相关的,准备试试。

简介

Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可以水平扩展和具有高吞吐量特性而被广泛使用,并与多类开源分布式处理系统进行集成使用。

  Kafka作为一款开源的、轻量级的、分布式、可分区和具备复制备份的、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统消息系统相比,Kafka能够更好的处理活跃的流数据,让数据在各个子系统中高性能、低延迟地不停流转。

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka集群的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此方案主要介绍SASL方式。

SASL验证分类
验证方式    kafka版本    特点
SASL/PLAIN    0.10.0.0    不能动态添加用户
SASL/SCRAM    0.10.2.0    可以动态添加用户
SASL/Kerberos    0.9.0.0    需要独立部署验证服务
SASL/oauthbearer    2.0.0    需要自己实现接口,实现token的创建和验证,需要额外的oauth服务

使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据

SCRAM认证配置的优点:

  如果使用PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。

  因此,在生产环境中,这种认证方式不符合实际业务场景,不利于后期扩展。然而使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境推荐使用SCRAM+PLAIN搭配的认证方案。

配置zookeeper集群启用SASL

1. 配置zookeeper,启用sasl认证,cat zoo.cfg查看到如下内容:

tickTime=2000
initLimit=1
syncLimit=5
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort=2181
admin.serverPort=8888
maxClientCnxns=3000
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=localhost:2888:3888
4lw.commands.whitelist=conf,stat,srvr,mntr.envi
#zk SASL
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

2. 配置zookeeper JAAS

cat zk_jaas.conf文件内容如下,如果没有改文件则使用vi命令编辑


Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="admin123"
    user_kafka="kafka123";
};
  1. 注意:admin用户 是zk 集群之间使用的。kafka用户 是 broker 与 zk 之间使用的。

3. 修改zkEnv.sh

将上一步添加的 jaas 配置文件添加到zookeeper的环境变量中,zkEnv.sh文件最后添加一行:
 
vim zkEnv.sh
 
ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."
 
# 添加如下 新增变量SERVER_JVMFLAGS:
 
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=../conf/zk_jaas.conf"

配置kafka sasl动态认证

SASL/SCRAM认证是把凭证(credential)存储在Zookeeper,使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,所以在启动Kafka broker之前需要创建代理间通信的凭据。

  这里配置的 Kafka和生产者/消费者之间 采用SASL/PLAIN和SASL/SCRAM两种方式共同完成认证,授权使用ACL方式。PLAIN方式的用户是在jaas文件中写死的,不能动态的添加;SCRAM支持动态的添加用户。

1. 创建用户

配置SASL/SCRAM认证的第一步,是配置可以连接到kafka集群的用户。本案例创建了3个用户:admin,producer,consumer。kafka_server_admin用户用于broker之间的认证通信,producer用户用于生产者连接kafka,consumer用户用于消费者连接kafka 。

./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name admin
 
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name producer
 
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name consumer

2. 查看创建的用户信息

kafka-configs 脚本是用来设置主题级别参数的。其实,它的功能还有很多。比如在这个例子中,我们使用它来创建 SASL/SCRAM 认证中的用户信息。可以使用下列命令来查看刚才创建的用户数据。

./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users 
 
#(可以单独指定某个用户 --entity-name producer,如下)
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

ZK客户端命令行查看:
 
./zkCli.sh  -server localhost:2181
 
ls /config/users

3. 配置kafka jaas文件

配置了用户之后,我们需要为 Broker 创建一个对应的 JAAS 文件。在实际场景中,需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。

Kafka 的 jaas认证配置文件,配置的是登录类,超管密码和管理的帐号密码列表

vim kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username ="admin"
    password="admin123"
    user_admin="admin123"
    user_producer="producer123"
    user_consumer="consumer123";
};
KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin123"
    user_producer="producer123"
    user_consumer="consumer123";
};
Client {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="kafka"
    password="kafka123";
};

KafkaServer中usename配置的是kafka服务端使用的账号和密码,后面的user_xxx事预设的普通帐号认证信息。
 
中间部分配置的是PLAIN认证方式的账户和密码,其中producer1是账户名,producer123是密码。
Client配置了broker到Zookeeper的连接用户名密码,这里要和前面zookeeper配置中的zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。
 
关于这个文件内容,需要注意以下两点:
 
1)不要忘记最后一行和倒数第二行结尾处的分号;
 
2)JAAS 文件中不需要任何空格键。

4. kafka 配置文件启用SASL认证

Kafka 服务配置文件 server.propertis,配置认证协议及认证实现类
cat server.properties其它内容都注释掉,然后追加如下内容:

broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

Host.name=43.138.0.199

5. kafka 启动脚本添加认证文件路径的环境变量

Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置,修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向 jaas 配置文件的绝对路径

vi kafka-server-start.sh

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/lighthouse/kafka_2.12-2.2.1/config/kafka_server_jaas.conf  kafka.Kafka "$@"

 6.  kafka客户端配置

1) 配置consumer.properties和producer.properties,都要加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512

2) 生产者配置

使用kafka-console-producer.sh脚本测试生产者,由于开启安全认证和授权,此时使用console-producer脚本来尝试发送消息,那么消息会发送失败,原因是没有指定合法的认证用户,因此客户端需要做相应的配置,需要创建一个名为producer.conf的配置文件给producer程序使用。

config目录下创建一个producer.conf的文件,cat producer.conf文件内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="producer" password="producer123";

注意:Topic设置写权限

3) 消费者配置

使用kafka-console-consumer.sh脚本测试生产者,由于开启安全认证和授权,因此客户端需要做相应的配置。需要为 consumer 用户创建consumer.conf给消费者程序,同时设置对topic的读权限。

config目录下创建一个consumer.conf的文件,cat consumer.conf文件内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="consumer" password="consumer123";

注意:Topic设置读权限。

4) 在生产者和消费者启动脚本中引入JAAS文件

vim bin/kafka-console-producer.sh
 
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
 
# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"
 
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
 
 
# vim bin/kafka-console-consumer.sh
 
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
 
# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"
 
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

启动kafka

nohup kafka-server-start.sh /path-to-kafka/config/server.properties &

我自己写了一个脚本,同时启动zookeeper和kafka

文件名称叫startup.sh内容如下:

cd /home/lighthouse/zk-3.4.14/zookeeper-3.4.14/bin
./zkServer.sh start
cd /home/lighthouse/kafka_2.12-2.2.1/
nohup bin/kafka-server-start.sh config/server.properties > output.txt &

zookeeper没有启动成功,我找下原因。把整个过程重新整理了一遍,发现zookeeper、kafka启动成功了。

检查验证

./zkCli.sh  -server localhost:2181

ls /brokers/ids

1. 生产者测试

1) 创建一个测试主题test_topic

./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic

2)查看创建的Topic

./kafka-topics.sh  --list --zookeeper localhost:2181 test_topic

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

注:kafka开启认证后,当生产者往Topic写数据需要为主题配置权限(write),即对生产者赋予写的权限。

这里使用producer用户认证授权,通过ACL为producer 用户分配操作test_topic权限:

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic 'test_topic'

注意allow-后面不要换行(会报错),注意整段命令要放在一行上。

  • 启动生产者发送消息

特别注意:在生产者生产消息之前如果不设置生产者用户的ACL权限会报错:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config ../config/producer.conf

./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config /home/lighthouse/kafka_2.12-2.2.1/config/producer.conf 

使用相对路径和绝对路径都报错了,错误相同

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:299)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS config
        at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)
        at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:90)
        at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
        at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:439)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:420)
        ... 3 more

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1521288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL基础架构

文章目录 MySQL基础架构一、连接器 - 建立连接&#xff0c;权限认证二、查缓存 - 提高效率三、分析器 - 做什么四、优化器 - 怎么做五、执行器 - 执行语句六、存储引擎1、存储引擎的概述2、存储引擎的对比3、存储引擎的命令4、存储引擎的选择 MySQL基础架构 大体来说&#xff…

力扣-20. 有效的括号(回顾知识哈希表,栈)

给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’&#xff0c;‘}’&#xff0c;‘[’&#xff0c;‘]’ 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以正确的顺序闭合。 每个右括号都有…

HTML万字学习总结

html文本标签特殊符号图片音频与视频超链接表单列表表格语义标签(布局) html文本标签 标签简介<html></html>根目录<head></head>规定文档相关的配置信息&#xff08;元数据<body></body>元素表示文档的内容<meta></meta>表示…

“技多不压身”是什么意思?看完这篇文章你会明白:有了手艺,走遍天下都不怕!

“技多不压身”是什么意思&#xff1f;看完这篇文章你会明白&#xff1a;有了手艺&#xff0c;走遍天下都不怕&#xff01; 咱们的老祖宗流传一句话&#xff1a;“一招鲜&#xff0c;吃遍天。”这话说得直白&#xff0c;却道出了学一门手艺或技术对于人生的重要性。“李秘书讲…

Linux 学习笔记(16)

十六、 计划任务 在很多时候为了自动化管理系统&#xff0c;我们都会用到计划任务&#xff0c;比如关机&#xff0c;管理&#xff0c;备份之类的操作&#xff0c;我 们都可以使用计划任务来完成&#xff0c;这样可以是管理员的工作量大大降低&#xff0c;而且可靠度更好。 l…

电商经济和实体经济,哪个更有发展力?这篇文章讲的很详细!

我是电商珠珠 三年的冲击&#xff0c;使实体经济走向衰败&#xff0c;以至于到今天很多实体店仍无法正常营业&#xff0c;面临赔本的惨状。花大好几十万去做这件事&#xff0c;结果分币未挣。近年来&#xff0c;电商虽然发展的很快&#xff0c;但是平台众多&#xff0c;规则也…

“入站营销“VS“即时流量变现“客户该进谁家门?

在不断变化的市场环境下&#xff0c;两种主导战略“入站营销 ”和 “即时流量变现”决定了企业接触受众的方式。了解这些方法之间的根本区别对于企业制定有效的营销战略至关重要。 受众体的差别线索质量 “即时流量变现”&#xff0c;顾名思义&#xff0c;短时间&#xff0c;…

Leetcode - 周赛388

目录 一&#xff0c;3074. 重新分装苹果 二&#xff0c;3075. 幸福值最大化的选择方案 三&#xff0c;3076. 数组中的最短非公共子字符串 四&#xff0c;3077. K 个不相交子数组的最大能量值 一&#xff0c;3074. 重新分装苹果 本题是一道阅读理解题&#xff0c;就是将数组a…

springboot“力炫”健身馆网站

摘要 随着网络科技的不断发展以及人们经济水平的逐步提高&#xff0c;网络技术如今已成为人们生活中不可缺少的一部分&#xff0c;而信息管理系统是通过计算机技术&#xff0c;针对用户需求开发与设计&#xff0c;该技术尤其在各行业领域发挥了巨大的作用&#xff0c;有效地促…

7.1strcmp

strcmp 函数原型 extern int strcmp(const char *s1,const char *s2);规则 当s1<s2时&#xff0c;返回为负数&#xff1b; 当s1s2时&#xff0c;返回值0&#xff1b; 当s1>s2时&#xff0c;返回正数。 即&#xff1a;两个字符串自左向右逐个字符相比&#xff08;按ASCI…

springboot项目自定义切面增强方法功能(springboot记录日志)

说明 背景&#xff1a;记录系统接口日志入库&#xff0c;包含接口方法、入参、回参、响应时间、操作人、操作时间等信息。 方案&#xff1a;添加自定义切面处理 一、自定义切面注解 package com.gstanzer.supervise.annotation;import com.gstanzer.supervise.enums.Busine…

【BFS】走迷宫问题——acwing844

问题描述 给定一个 nm 的二维整数数组&#xff0c;用来表示一个迷宫&#xff0c;数组中只包含 0 或 1&#xff0c;其中 0表示可以走的路&#xff0c;1 表示不可通过的墙壁。 最初&#xff0c;有一个人位于左上角 (1,1) 处&#xff0c;已知该人每次可以向上、下、左、右任意一…

Ubuntu20下C/C++编程开启TCP KeepAlive

1、在linux下&#xff0c;测试tcp保活&#xff0c;可以使用tcp自带keepalive功能。 2、几个重要参数&#xff1a; tcp_keepalive_time&#xff1a;对端在指定时间内没有数据传输&#xff0c;则向对端发送一个keepalive packet&#xff0c;单位&#xff1a;秒 tcp_keep…

学习开发小程序的起航日记

2024年3月16日 不知不觉中三月份还只剩了一半的光景&#xff0c;我想写的内容还很多没有写&#xff0c;或者更应该说&#xff0c;是想积累的还有很多。现在最应该去完善Java的内容&#xff0c;可还是想先等等。想等搞清楚小程序部分&#xff0c;想等积累完小程序的内容。 这几…

unraid docker.img扩容

unraid 弹Docker image disk utilization of 99%&#xff0c;容器下载/更新失败 我的版本是6.11.5&#xff0c;docker.img满了导致容器不能更新&#xff0c;遇到同样问题的可以先用docker命令清除一下仓库(当然不一定能清理出来&#xff0c;我已经清理过只清理出来1G多点&…

黑马微服务p30踩坑

报错详情 : orderservice开不起来 : 发生报错 : 然后检查了以下端口啥的 &#xff0c;配置啥的都是没有问题的 ; 解决办法 : 1 . 修改nacos1,2,3中的端口&#xff0c;将conf 中 cluster.conf中 的 127.0.0.1 全部改成自己本机的真实ipv4地址; 本机真实ipv4地址查看 :…

Java学习笔记(14)

常用API Java已经写好的各种功能的java类 Math Final修饰&#xff0c;不能被继承 因为是静态static的&#xff0c;所以使用方法不用创建对象&#xff0c;使用里面的方法直接 math.方法名 就行 常用方法 Abs,ceil,floor,round,max,minm,pow,sqrt,cbrt,random Abs要注意参数的…

数据结构:树和二叉树

树的概念 1.树是一种非线性的数据结构。它是由n个有限节点的集合。 2.树分为根节点和子树。根节点没有前驱节点。 3.树的子树是由一个个子树组成&#xff0c;它们可以看作一个个集合。每个集合下面又有集合。 因此&#xff0c;树是递归定义的。 树形结构中&#xff0c;子树…

Leetcode刷题-(21~25)-Java

算法是码农的基本功&#xff0c;也是各个大厂必考察的重点&#xff0c;让我们一起坚持写算法题吧。 遇事不决&#xff0c;可问春风&#xff0c;春风不语&#xff0c;即是本心。 我们在我们能力范围内&#xff0c;做好我们该做的事&#xff0c;然后相信一切都事最好的安排就可…

软件应用,物流运单填写模板,物流打印快递单教程,货运单怎么打印视频教程

软件应用&#xff0c;物流运单填写模板&#xff0c;物流打印快递单教程&#xff0c;货运单怎么打印视频教程 一、前言 以下软件操作教程以 佳易王物流货运管理系统软件V17.2为例说明 件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 1、物流管理系统打印&a…