Kafka安全模式之身份认证

news2025/2/24 14:59:14

一、简介

Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。

二、原理介绍

Kafka身份认证主要分为以下几种:

(1)客户端与broker之间的连接认证

(2)broker与broker之间的连接认证

(3)broker与zookeeper之间的连接认证

日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。

图1 客户端与broker之间认证过程图

目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:

(1)基于Kerberos的GSSAPI

SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。

(2)SASL-PLAIN

SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。

(3)SASL-SCRAM

SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。

对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。

三、具体实现

本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。

Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:

  1. Kerberos Database:储存用户密码以及其他信息
  2. Authentication Service(AS):进行用户身份信息验证,为客户端提供Ticket Granting Tickets(TGT)
  3. Ticket Granting Service(TGS):验证TGT,为客户端提供Service Tickets

我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:

  • 用户名principle:标识客户端的用户身份,也即用于登录的用户名
  • 指定用户名对应的秘钥文件xx.keytab:存储了用户的加密密码
  • 指定安全认证的服务配置文件krb5.conf:客户端根据该文件中的信息去访问KDC

获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:

1、将安全认证文件xx.keytabkrb5.conf放置于某一路径下,确保后续java代码可进行读取

2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录

3、修改Kafka生产者配置,开启安全连接

4、调用认证工具类进行登录认证

LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

/**
 * @ProjectName: stdp-security-demo
 * @Package: 
 * @ClassName: LoginUtil
 * @Author: stdp
 * @Description: ${description}
 */
public class LoginUtil {
	
	public enum Module {
		KAFKA("KafkaClient"), ZOOKEEPER("Client");

		private String name;

		Module(String name) {
			this.name = name;
		}

		public String getName() {
			return name;
		}
	}

	private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);

	/**
	 * line operator string
	 */
	private static final String LINE_SEPARATOR = System.getProperty("line.separator");

	/**
	 * jaas file postfix
	 */
	private static final String JAAS_POSTFIX = ".jaas.conf";

	private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";

	public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";

	private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

	
	private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");

	/**
	 * oracle jdk login module
	 */
	private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";


	public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)
			throws IOException
	{
		// 1.check input parameters
		if ((userPrincipal == null) || (userPrincipal.length() <= 0))
		{
			LOGGER.error("input userPrincipal is invalid.");
			throw new IOException("input userPrincipal is invalid.");
		}

		if ((userKeytabPath == null) || (userKeytabPath.length() <= 0))
		{
			LOGGER.error("input userKeytabPath is invalid.");
			throw new IOException("input userKeytabPath is invalid.");
		}

		if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0))
		{
			LOGGER.error("input krb5ConfPath is invalid.");
			throw new IOException("input krb5ConfPath is invalid.");
		}

		// 2.check file exsits
		File userKeytabFile = new File(userKeytabPath);
		if (!userKeytabFile.exists())
		{
			LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
			throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
		}
		if (!userKeytabFile.isFile())
		{
			LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
			throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
		}

		File krb5ConfFile = new File(krb5ConfPath);
		if (!krb5ConfFile.exists())
		{
			LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
			throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
		}
		if (!krb5ConfFile.isFile())
		{
			LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
			throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
		}

		// 3.set and check krb5config
		setKrb5Config(krb5ConfFile.getAbsolutePath());

//        LOGGER.info("check zookeeper server Principal =============================================");
        setZookeeperServerPrincipal(userPrincipal);
//        LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");
        setJaasFile(userPrincipal,userKeytabPath);
		LOGGER.info("Login success!!!!!!!!!!!!!!");
	}


	public static void setKrb5Config(String krb5ConfigFile) throws IOException {
		System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);
		String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);
		if (ret == null) {
			LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
			throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
		}
		if (!ret.equals(krb5ConfigFile)){
			LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
			throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
		}
	}

	public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {
		String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;
		LOGGER.info("jaasPath = {}",jaasPath);
		//windows路径下分隔符替换
		jaasPath = jaasPath.replace("\\","\\\\");
		userKeytabPath = userKeytabPath.replace("\\","\\\\");
		//删除jaas文件
		deleteJaasFile(jaasPath);
		writeJaasFile(jaasPath,userPrincipal,userKeytabPath);
		System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);
	}

	private static void deleteJaasFile(String jaasPath) throws IOException {
		File jaasFile = new File(jaasPath);
		if (jaasFile.exists()){
			if (!jaasFile.delete()){
				throw new IOException("failed to delete exists jaas file.");
			}
		}
	}

	private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {
		FileWriter writer = new FileWriter(new File(jaasPath));
		try{
			writer.write(getJaasConfContext(userPrincipal,userKeytabPath));
			writer.flush();
		}catch (IOException e){
			throw new IOException("Failed to create jaas.conf File.");
		}finally {
			writer.close();
		}
	}


	private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{
		Module[] allModule = Module.values();
		StringBuffer builder = new StringBuffer();
		for (Module module: allModule){
			String serviceName = null;
			if ("Client".equals(module.getName())){
				serviceName = "zookeeper";
			}else if ("KafkaClient".equals(module.getName())){
				serviceName = "kafka";
			}
			builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));
		}
		return builder.toString();
	}

	private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {
		StringBuffer builder = new StringBuffer();
		if (IS_IBM_JDK){
			builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
			builder.append("credsType=both").append(LINE_SEPARATOR);
			builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
			builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
            builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
			builder.append("debug=true;").append(LINE_SEPARATOR);
			builder.append("};").append(LINE_SEPARATOR);
		}else {
			builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
			builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
			builder.append("useKeyTab=true").append(LINE_SEPARATOR);
			builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
			builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
            builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
			builder.append("useTicketCache=false").append(LINE_SEPARATOR);
			builder.append("storeKey=true").append(LINE_SEPARATOR);
			builder.append("debug=true;").append(LINE_SEPARATOR);
			builder.append("};").append(LINE_SEPARATOR);
		}
		return builder.toString();
	}


	public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {
		System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);
		String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);
		if (ret == null) {
			LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
			throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
		}
		if (!ret.equals(zkServerPrincipal)){
			LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
			throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
		}
	}
}

经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。

四、常见问题

1、认证文件找不到

这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。

2、 principal和keytab不匹配

不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:

  •  配置文件中出现问题:检查kerberos.principle和kerberos.keytab中的用户名(即hkjj)是否一致。

  •  检查生成的jaas文件中用户名和配置的用户名是否相同

如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。

为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。

3、用户密码keytab更新,导致出现checksum failed

这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab

4jaas文件找不到

该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。

该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。

五、总结

在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。

基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1474806.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

六、防御保护---防火墙内容安全篇

六、防御保护---防火墙内容安全篇 一、IAE&#xff08;Intelligent Awareness Engine&#xff09;引擎二、深度检测技术(DFI和DPI&#xff09;2.1 DPI -- 深度包检测技术2.1.1 基于“特征字”的检测技术2.1.2 基于应用网关的检测技术2.1.3 基于行为模式的检测技术 2.2 DFI -- 深…

CGI程序与ShellShock漏洞

CGI是什么&#xff1f; CGI&#xff08;通用网关接口&#xff0c;Common Gateway Interface&#xff09;程序是一种用于在Web服务器上执行动态内容的技术。与服务器上普通的后端代码相比&#xff0c;CGI程序有几个区别&#xff1a; 执行环境&#xff1a; CGI程序在服务器上作为…

k8s部署java微服务程序时,关于配置conusl acl token的方法总结

一、背景 java微服务程序使用consul作为服务注册中心&#xff0c;而consul集群本身的访问是需要acl token的&#xff0c;以增强服务调用的安全性。 本文试着总结下&#xff0c;有哪些方法可以配置consul acl token&#xff0c;便于你根据具体的情况选择。 个人认为&#xff…

BL0942 内置时钟免校准计量芯片 用于智能家居领域 低成本

BL0939是上海贝岭股份有限公司开发的一款用于智能家居领域进行电能测量的专用芯片&#xff0c;支持两路测量&#xff0c;可同时进行计量和漏电故障检测&#xff0c;漏电检测电流可设&#xff0c;响应时间快&#xff0c;具有体积小&#xff0c;外围电路简单&#xff0c;成本低廉…

C++ 前缀和

目录 1、DP34 【模板】前缀和 2、DP35 【模板】二维前缀和​编辑 3、724. 寻找数组的中心下标 4、238. 除自身以外数组的乘积 5、560. 和为 K 的子数组 6、974. 和可被 K 整除的子数组 7、525. 连续数组 8、1314. 矩阵区域和 1、DP34 【模板】前缀和 思路&#xff1a;…

Project_Euler-11 题解

Project_Euler-11 题解 题目 题目中给出的数据如下&#xff1a; 08 02 22 97 38 15 00 40 00 75 04 05 07 78 52 12 50 77 91 08 49 49 99 40 17 81 18 57 60 87 17 40 98 43 69 48 04 56 62 00 81 49 31 73 55 79 14 29 93 71 40 67 53 88 30 03 49 13 36 65 52 70 95 23 04 …

ESP32语音转文字齐护百度在线语音识别

一、导入(10分钟&#xff09; 学习目的 二、新授(70分钟) 1.预展示结果(5分钟) 2.本节课所用的软硬件(5分钟) 4.图形化块介绍(10分钟) 5.单个模块的简单使用(10分钟) 6.在线语音转换工具逻辑分析(10分钟) 7.在线语音转换工具分步实现(30分钟) 三、巩固练习(5分钟) 四、课堂小结…

考研数据结构算法机试训练1

中南大学上机压轴题 测试数据&#xff1a; 3 500 0.6 100 0.8 200 0.7 100 输出 390首先要对输入的折扣进行排序&#xff0c;优先使用比率低的z进行支付。 然后用lowcost记录目前多少钱是打过折的。T-lowcost就是剩余没打折的。 每次循环用上一个人的折扣额度。若所有人折扣额…

VR转接器:破解虚拟与现实边界的革命性设备

VR转接器&#xff0c;这一革命性的设备&#xff0c;为虚拟现实体验带来了前所未有的自由度。它巧妙地连接了虚拟与现实&#xff0c;使得用户在享受VR眼镜带来的奇幻世界的同时&#xff0c;也能自由地在现实世界中活动。这一设计的诞生&#xff0c;不仅解决了VR眼镜续航的瓶颈问…

react-组件基础

1.目标 能够使用函数创建组件 能够使用class创建组件 能够给React元素绑定事件 能够使用state和setState() 能够处理事件中的this指向问题 能够使用受控组件方式处理表单 2.目录 React组件介绍 React组件的两种创建方式 React事件处理 有状态组件和无状态组件 组件中的state…

Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

数据结构:树/二叉树

一、树的概念 逻辑结构&#xff1a;层次结构&#xff0c;一对多 节点&#xff1a;树中的一个数据元素根节点&#xff1a;树中的第一个节点&#xff0c;没有父节点孩子节点&#xff1a;该节点的直接下级节点父(亲)节点&#xff1a;该结点的直接上级节点兄弟节点&#xff1a;有…

代码随想录算法训练营第44天|● 完全背包 ● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

文章目录 ● 完全背包卡码网&#xff1a;52. 携带研究材料-完全背包理论练习代码&#xff1a; ● 518. 零钱兑换 II思路&#xff1a;五部曲 代码&#xff1a;滚动数组代码二&#xff1a;二维数组 ● 377. 组合总和 Ⅳ思路&#xff1a;五部曲 代码&#xff1a; ● 完全背包 卡码…

第十二篇【传奇开心果系列】Python文本和语音相互转换库技术点案例示例:深度解读SpeechRecognition语音转文本

传奇开心果系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、SpeechRecognition语音转文本一般的操作步骤和示例代码二、SpeechRecognition 语音转文本的优势和特点三、易用性深度解读和示例代码四、多引擎支持深度解读和示例代码五、灵活性示…

windows系统使用Vscode在WSL调试golang本地进程

背景&#xff1a; windows10企业版 vscodegolang1.20 wsl编译运行。 vscode 使用本地wsl进行进程attach操作&#xff0c;发现&#xff1a;Access is denied. 本地进程启动&#xff0c;vscode调试进程。windows-Linux控制台: Starting: C:\Users\book\go\bin\dlv.exe dap --l…

express+mysql+vue,从零搭建一个商城管理系统5--用户注册

提示&#xff1a;学习express&#xff0c;搭建管理系统 文章目录 前言一、新建user表二、安装bcryptjs、MD5、body-parser三、修改config/db.js四、新建config/bcrypt.js五、新建models文件夹和models/user.js五、index.js引入使用body-parser六、修改routes/user.js七、启动项…

vscode不能远程连接ubuntu18.04.6

目录 问题解决Portable Mode 安装vscode 补充说明学习资料 问题 vscode远程ssh连接ubuntu18.04.6时&#xff0c;出现如下提示框&#xff0c;单击Learn More后&#xff0c;定位到问题。Can I run VS Code Server on older Linux distributions? 原始是&#xff1a;需要glibc …

LeetCode 热题 100 | 图论(上)

目录 1 200. 岛屿数量 2 994. 腐烂的橘子 2.1 智障遍历法 2.2 仿层序遍历法 菜鸟做题&#xff0c;语言是 C 1 200. 岛屿数量 解题思路&#xff1a; 遍历二维数组&#xff0c;寻找 “1”&#xff08;若找到则岛屿数量 1&#xff09;寻找与当前 “1” 直接或间接连接在…

未来新质生产力Agent的起源与应用

Agent是什么&#xff1f; AI Agent的发展经历了从哲学思想启蒙到计算机科学助力、专家系统兴起、机器学习崛起、深度学习突破等多个阶段。如今&#xff0c;AI Agent已经成为人工智能领域的重要组成部分&#xff0c;为人类带来了巨大的便利和发展机遇。早在古希腊时期&#xff0…

《opencv实用探索·二十二》支持向量机SVM用法

1、概述 在了解支持向量机SVM用法之前先了解一些概念&#xff1a; &#xff08;1&#xff09;线性可分和线性不可分 如果在一个二维空间有一堆样本&#xff0c;如下图所示&#xff0c;如果能找到一条线把这两类样本分开至线的两侧&#xff0c;那么这个样本集就是线性可分&#…