kafka动态认证 自定义认证 安全认证-亲测成功

news2025/1/25 4:28:19

kafka动态认证 自定义认证 安全认证-亲测成功

背景

Kafka默认是没有安全机制的,一直在裸奔。用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL),认证机制是SASL/PLAIN。

kafka下载安装

我这里用windows做的测试,部署到Linux上也是一样

官方下载地址:https://kafka.apache.org/downloads

我这里下载的kafka版本是:kafka_2.12-3.5.0.tgz

直接解压,如下图

在这里插入图片描述

启动zookeeper

这里的zookeeper配置其实没有做任何修改,zookeeper这里不做认证控制。

zookeeper配置文件在kafka_2.12-3.5.0\config\zookeeper.properties下,不用做任何修改

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=D:\kafka_2.12-3.5.0\zookeeper

# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

#authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
#requireClientAuthScheme=sasl
#jaasLoginRenew=3600000

进入kafka主目录,打开cmd

#启动zookeeper
bin\windows\zookeeper-server-start.bat  config\zookeeper.properties

在这里插入图片描述

zookeeper-server-start.bat 启动脚本

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

IF [%1] EQU [] (
	echo USAGE: %0 zookeeper.properties
	EXIT /B 1
)

rem set KAFKA_OPTS=-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_zookeeper_jaas.conf
SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
	
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
    set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal

在这里插入图片描述

kafka自定义认证配置

kafka的用户认证,是基于java的jaas。所以我们需要先添加jaas服务端的配置文件。

在kafka_2.12-3.5.0\config目录下新建kafka_jaas.conf 配置信息如下:

KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-liang"
        user_admin="admin-123456"
		user_liang="liang-123456";
};

注意最后一个属性后面需要加分号!配置是不难理解的,第一行指定PlainLoginModule,算是声明这是一个SASL/PLAIN的认证类型,如果是其他的,那么就需要reqired其他的类。username和password则是用于集群内部broker的认证用的。

这里会让人疑惑的,应该是user_admin和user_liang这两个属性了。这个其实是用来定义用户名和密码的,形式是这样:user_userName=password。所以这里其实是定义了用户admin和用户liang对应的密码。

这一点可以在源码的PlainServerCallbackHandler类中找到对应的信息,kafka源码中显示,对用户认证的时候,就会到jaas配置文件中,通过user_username属性获取对应username用户的密码,再进行校验。当然这样也导致了该配置文件只有重启才会生效,即无法动态添加用户。

写完配置后,需要在kafka的配置中添加jaas文件的路径。在kafka_2.12-3.5.0/bin/kafka-run-class.sh中,找到下面的配置,修改KAFKA_OPTS到配置信息。如下:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (
	set KAFKA_OPTS=""
)

将上述到KAFKA_OPTS修改为:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (
	set KAFKA_OPTS="-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_jaas.conf"
)

修改Kafka配置文件

配置文件在kafka_2.12-3.5.0\config\server.properties 主要增加如下配置

sasl.enabled.mechanisms = PLAIN
sasl.mechanism.inter.broker.protocol = PLAIN
security.inter.broker.protocol = SASL_PLAINTEXT
listeners = SASL_PLAINTEXT://localhost:9092

其中SASL_PLAINTEXT的意思,是明文传输的意思,如果是SSL,那么应该是SASL_SSL。

这样就算是配置好kafka broker了,接下来启动kafka,观察输出日志,没有错误一般就没问题了。

进入kafka主目录,另外打开一个cmd

#启动kafka
bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述
在这里插入图片描述

使用Kafka客户端工具Kafka Tool连接

此时就可以根据上面配置的用户admin和用户liang和相应的密码去连接了

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其他用户或错误的密码连接就会提示没有权限,用户或密码错误
在这里插入图片描述

动态认证

以上的配置方案除了没有使用SSL加密之外,还存在一个严重的缺陷:用户信息是通过静态配置文件的方式存储的,当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群,而我们知道,作为消息中间件,Kafka的上下游与众多组件相连,重启可能造成数据丢失或重复,Kafka应当尽量避免重启。

如果要动态增加一个用户,得修改kafka_jaas.conf的配置,新增加一个用户,而且还得重启Kafka,这样显然不合适。

解决方案

还好,Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数,如果不希望采用静态配置文件存储用户认证信息的话,只需要编写一个实现了 AuthenticateCallbackHandler 接口的类,然后在配置文件中指明这个类即可,指明的方法为在Kafka配置文件中添加如下内容

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler

引入相关的maven依赖包,pom添加如下依赖包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.1</version>
        </dependency>
		
		 <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-cache</artifactId>
            <version>5.7.21</version>
        </dependency>

动态认证的完整代码如下

package com.liang.kafka.auth.handler;

import com.alibaba.druid.pool.DruidDataSource;
import com.liang.kafka.auth.util.DataSourceUtil;
import com.liang.kafka.auth.util.PasswordUtil;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.liang.kafka.auth.constants.Constants.*;

/**
 *  kafka自定义认证 sasl/plain二次开发
 *  liang
 */
public class MyPlainServerCallbackHandler implements AuthenticateCallbackHandler  {

    private static final Logger logger = LoggerFactory.getLogger(MyPlainServerCallbackHandler.class);

    /**
     * 数据源
     */
    private DruidDataSource dataSource = null;

    /**
     * 是否开启数据库验证开关
     */
    private boolean enableDbAuth;

    private static final String JAAS_USER_PREFIX = "user_";
    private List<AppConfigurationEntry> jaasConfigEntries;

    @Override
    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        //jaas配置信息,初始化一次,这就是为什么plain无法添加用户
        this.jaasConfigEntries = jaasConfigEntries;

        logger.info("==============configs:{}", JSON.toJSONString(configs));
        Object endbAuthObject = configs.get(ENABLE_DB_AUTH);
        if (Objects.isNull(endbAuthObject)) {
            logger.error("==============缺少开关配置 enable_db_auth!");
            enableDbAuth = Boolean.FALSE;
            return;
        }

        enableDbAuth = TRUE.equalsIgnoreCase(endbAuthObject.toString());
        if (!enableDbAuth) {
            return;
        }

        dataSource = DataSourceUtil.getInstance(configs);
    }

    //核心类,获取用户密码后,调用authenticate方法
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
                logger.info("===============认证 username:{},result:{}", username, authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }

    //用户密码是通过获取jaas文件的属性,属性名就是JAAS_USER_PREFIX变量当前缀+username
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null || password == null) {
            logger.error("===========用户名或密码为空!");
            return false;
        } else {
            //先读取配置文件里的用户验证
            String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,
                    JAAS_USER_PREFIX + username,
                    PlainLoginModule.class.getName());
            logger.info("===============读取密码 username:{},pwd:{}", username, expectedPassword);
            boolean jaasUserBool = expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());
            if (jaasUserBool) {
                return true;
            }
            //是否开启数据库验证
            if (enableDbAuth) {
                return dbAuthenticate(username, password);
            }
            return false;
        }
    }

    protected boolean dbAuthenticate(String usernameInfo, char[] passwordCharArray) throws IOException {
        String password = new String(passwordCharArray);
        logger.info("=====================begin dbAuthenticate usernameInfo:{},password:{}", usernameInfo, password);

  
        String username = usernameInfo;
        String userQuery = "select\n" +
                " u.username, u.password\n" +
                " from u_user u \n" +
                " where u.state='1' and u.username=?";
        Connection conn = null;
        try {
            conn = dataSource.getConnection();
            PreparedStatement statement = conn.prepareStatement(userQuery);
            statement.setString(1, username);

            ResultSet resultSet = statement.executeQuery();
            if (resultSet.next()) {
                String dbPassword = resultSet.getString("password");
                Boolean bl = PasswordUtil.matches(password, dbPassword);
                if (Boolean.TRUE.equals(bl)) {
                    logger.info("=====================密码验证成功username:{}", username);
                } else {
                    logger.error("=====================密码验证失败username:{}", usernameInfo);
                }
                return bl;
            } else {
                logger.error("=====================认证失败,username:{} 没有找到", usernameInfo);
                return false;
            }
        } catch (Exception e) {
            logger.error("=====================数据库查询用户异常{}", e);
            throw new RuntimeException(e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override
    public void close() throws KafkaException {
        if (dataSource != null) {
            dataSource.close();
        }
    }

}

获取数据源代码

package com.liang.kafka.auth.util;

import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;


/**
 * @author liang
 * @desc 获取数据源
 */
public class DataSourceUtil {

    private static final Logger LOG = LoggerFactory.getLogger(DataSourceUtil.class);

    /**
     * 保证 instance 在所有线程中同步
     */
    private static volatile DruidDataSource dataSource = null;

    public static synchronized DruidDataSource getInstance(Map<String, ?> configs) {
        if (dataSource == null || dataSource.isClosed()) {
            dataSource = initDataSource(configs);
        }

        return dataSource;
    }

    private static final DruidDataSource initDataSource(final Map<String, ?> configs) {
        Properties properties = new Properties();
        for (Map.Entry<String, ?> entry : configs.entrySet()) {
            if (entry.getKey().startsWith("druid.")) {
                String key = entry.getKey();
                String value = (String) entry.getValue();
                LOG.info("datasource connection config: {}:{}", key, value);
                properties.setProperty(key, value);
            }
        }
        dataSource = new DruidDataSource();
        dataSource.configFromPropety(properties);
        return dataSource;
    }

}

Kafka配置文件中添加数据源的相关配置

enable_db_auth = true
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler
druid.name = mysql_db
druid.type = com.alibaba.druid.pool.DruidDataSource
druid.url = jdbc:mysql://127.0.0.1:3306/test?useSSL=FALSE&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
druid.username = root
druid.password = root
druid.filters = stat
druid.driverClassName = com.mysql.cj.jdbc.Driver
druid.initialSize = 5
druid.minIdle = 2
druid.maxActive = 50
druid.maxWait = 60000
druid.timeBetweenEvictionRunsMillis = 60000
druid.minEvictableIdleTimeMillis = 300000
druid.validationQuery = SELECT 'x'
druid.testWhileIdle = true
druid.testOnBorrow = false
druid.poolPreparedStatements = false
druid.maxPoolPreparedStatementPerConnectionSize = 20

其中:enable_db_auth来控制是否开启动态认证。

编译打成jar包后,需要放到kafka_2.12-3.5.0\libs目录,还使用了相关的依赖包也要放入
在这里插入图片描述

重启Kafka后生效,Kafka的连接认证就会从数据库去查询,想增加,修改,删除用户,直接在数据库表里操作。

参考链接:
https://www.top8488.top/kafka/458.html/
https://zhuanlan.zhihu.com/p/301343840?utm_medium=social&utm_oi=886243404000944128&utm_id=0
https://www.jianshu.com/p/e4c50e4affb8

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1163867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于CMFB余弦调制滤波器组的频谱响应matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、CMFB余弦调制滤波器组原理 4.2、CMFB调制过程 4.3、CMFB特点 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ......................…

【Linux】 unzip 命令使用

Unzip 是一个在 Unix 或类 Unix 系统&#xff08;例如 Linux&#xff09;中常用的命令行工具&#xff0c;用于解压缩由 ZIP 压缩算法创建的压缩文件。这个工具是开源的&#xff0c;并且通常在大多数的 Linux 发行版中默认安装。 语法 su [选项] [-] [USER [参数]...] unzip命令…

题号1575 C.难度排名 (并查集知识点)

题目&#xff1a; 样例1&#xff1a; 输入 1 4 3 1 4 2 4 3 4 输出 No 样例2&#xff1a; 输入 1 4 2 1 3 2 3 输出 Yes 思路&#xff1a; 这题&#xff0c;有两种情况是由矛盾的。 第一种情况&#xff1a;当前题号存在大于两个题号的相连&#xff0c;情况是矛盾的&#x…

python之pip常用指令

文章目录 pip show xxx 查看是否安装该 module

Git客户端软件 Tower mac中文版特点说明

Tower mac是一款Mac OS X系统上的Git客户端软件&#xff0c;它提供了丰富的功能和工具&#xff0c;帮助用户更加方便地管理和使用Git版本控制系统。 Tower mac软件特点 1. 界面友好&#xff1a;Tower的界面友好&#xff0c;使用户能够轻松地掌握软件的使用方法。 2. 多种Git操…

Numpy数值计算Numpy 进阶在线闯关_头歌实践教学平台

Numpy数值计算进阶 第1关 Numpy 广播第2关 Numpy 高级索引第3关 Numpy 迭代数组 第1关 Numpy 广播 任务描述 本关任务&#xff1a;给定两个不同形状的数组&#xff0c;求出他们的和。 编程要求 首先用 arange() 生成一个数组&#xff0c;然后用 reshape() 方法&#xff0c;将数…

Java入门篇 之 逻辑控制(练习题篇)

博主碎碎念: 练习题是需要大家自己打的请在自己尝试后再看答案哦&#xff1b; 个人认为&#xff0c;只要自己努力在将来的某一天一定会看到回报&#xff0c;在看这篇博客的你&#xff0c;不就是在努力吗&#xff0c;所以啊&#xff0c;不要放弃&#xff0c;路上必定坎坷&#x…

Windows 11 PowerShell 安装 jq 命令

本心、输入输出、结果 文章目录 Windows 11 PowerShell 安装 jq 命令前言jq 命令简介基本语法案例 Windows 11 PowerShell 安装 jq 命令使用 jq 格式化 curl 输出的 json弘扬爱国精神 Windows 11 PowerShell 安装 jq 命令 编辑&#xff1a;简简单单 Online zuozuo 地址&#xf…

微型计算机组成原理

1、微型计算机组成 一个传统微型计算机硬件组成如下图 CPU通过地址线、数据线和控制信号线组成的本地总线&#xff08;内部总线&#xff09;与系统其他部分进行数据通信。 地址线用于提供内存或I/O设备的地址&#xff0c;即指明需要读/写数据的具体位置&#xff1b;数据线用…

项目实战之安装依赖npm install

文章目录 nvmdeasync包和node-gyp报错deasync包node-gyp报错 前言&#xff1a;有些人看着还活着其实已经凉了好一会儿了。 初拿到项目 初拿到项目肯定是先看配置 package.json的啦&#xff0c;看看都需要安装什么依赖&#xff0c;然后 npm install,OK结束 皆大欢喜。 ————…

OMV 介绍及安装

# Time: 2023/11/02 #Author: Xiaohong # 运行电脑: Lenovo X201I (Intel(R) Core(TM) i3 CPU M 370 2.40GHz) # 功能: OMV 介绍及安装 导图 若OMV6 安装Extras 插件失败&#xff0c;可以参考 OMV6 安装Extras 插件失败的解决方法

LV.12 D15 WDT实验 学习笔记

一、WDT简介 WDT Watch Dog Timer即看门狗定时器&#xff0c;其主要作用是当发生软件故障时可产生复位信号使SOC复位&#xff0c;其本质是一个计数器 工作原理 CPU正常工作时&#xff0c;需要定时往看门狗计数器里刷新一个比较大的值&#xff0c;来保证看门狗计数器中的值不会…

网络安全进阶学习第二十一课——XML介绍

文章目录 一、XML简介二、XML文档结构1、XML文档结构包括2、XML树结构 三、XML语法1、声明信息&#xff0c;用于描述xml的版本和编码方式2、XML有且只有一个根元素3、成对标签&#xff08;即标签必须关闭&#xff0c;html可以不关闭也能运行&#xff09;4、区分大小写5、不可交…

广汽传祺E9上市,3DCAT实时云渲染助力线上3D高清看车体验

今年5月21日&#xff0c;中国智电新能源旗舰MPV——广汽传祺智电新能源E9在北京人民大会堂举办上市发布会。 发布会现场&#xff08;图源官方&#xff09; 为了让更多的消费者能够在线上感受到广汽传祺E9的魅力&#xff0c;3DCAT实时渲染云与大圣科技合作为广汽传祺打造了一款…

文件批量改名:不再担心文件名太长,一键高效重命名文件的方法

在日常生活和工作中&#xff0c;我们经常遇到文件名过长的问题。过长的文件名可能会使得文件难以理解和管理&#xff0c;甚至导致一些错误。本文将介绍云炫文件管理器批量修改文件名的技巧&#xff0c;帮助您快速高效地处理文件名过长的问题。这些技巧可以广泛应用于日常生活和…

LeetCode 143. 重排链表(双指针、快慢指针)

题目&#xff1a; 链接&#xff1a;LeetCode 143. 重排链表 难度&#xff1a;中等 给定一个单链表 L 的头节点 head &#xff0c;单链表 L 表示为&#xff1a; L0 → L1 → … → Ln-1 → Ln 请将其重新排列后变为&#xff1a; L0 → Ln → L1 → Ln-1 → L2 → Ln-2 → … 不…

基于51单片机摩尔斯电码收发控制系统设计

**单片机设计介绍&#xff0c;1659【毕设课设】基于51单片机摩尔斯电码收发控制系统设计&#xff08;仿真电路&#xff0c;程序&#xff09; 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于51单片机的摩尔斯电码收发控制系统…

计算机编程入门,编程入门从什么学起,给大家分享一款中文编程工具,零基础学编程

计算机编程入门&#xff0c;编程入门从什么学起&#xff0c;给大家分享一款中文编程工具&#xff0c;零基础学编程 中文编程从入门到精通&#xff0c;中文编程语言开发工具分享 给大家分享一款中文编程工具&#xff0c;零基础轻松学编程&#xff0c;不需英语基础&#xff0c;…

企业上ERP的节奏商讨

导读&#xff1a;目前国内很多企业都选择ERP作为企业信息化系统&#xff0c;ERP系统的实施是一个系统的工程&#xff0c;实施过程中只有遵循正确的步骤&#xff0c;才能达到事半功倍的效果。 企业建立ERP管理系统&#xff0c;不是把现有的手工管理模式照搬到计算机上来操作&…

新体验:万圣节夜晚的新游戏!--愤怒的南瓜

引言&#xff1a; Chatgpt4.0 所带来的冲击似乎远超出人们想象&#xff0c;网页小游戏《愤怒的南瓜》在昨日&#xff08;万圣节夜晚&#xff09;火爆了外网。一位名为 Javi Lopez 的外国小哥使用 Midjourney、DALL•E 3 和 GPT-4 打开了一个无限可能的世界&#xff0c;重新演绎…