Kafka配置Kerberos安全认证及与Java程序集成

news2025/1/11 18:04:34

Background

  • 本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。
  • 本文演示为单机版。

所用软件版本

查看 Kerberos 版本命令:klist -V

软件名称版本
jdk1.8.0_202
kafka2.12-2.2.1
kerberos1.15.1

1、Kerberos Server 安装

  • Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密钥加密技术为客户端和服务器应用程序提供强身份验证。
  • Kerberos 是一种基于加密 Ticket 的身份认证协议,主要由三个部分组成:Key Distribution Center (即KDC)、Client 和 Service:
    客户端会先访问两次 KDC,然后再访问目标服务,如:HTTP 服务、Zookeeper 服务、Kafka 服务等。

12c395d0ab9fd49425fcfe07f585060.png

  • 在线安装
yum install krb5-server krb5-workstation krb5-libs -y
  • 配置主机名映射

/etc/hosts文件中新增本机映射(我这里的主机名是monkey)。

127.0.0.1 monkey
  • 配置 krb5.conf

根据需要修改 /etc/krb5.conf文件,其中WLF.COM你可以改成你需要的,还有monkey是你的主机映射。

# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 dns_lookup_realm = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true
 rdns = false
 pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt
 default_realm = WLF.COM
 #default_ccache_name = KEYRING:persistent:%{uid}

[realms]
WLF.COM = {
 kdc = monkey
 admin_server = monkey
}

[domain_realm]
.monkey = WLF.COM
monkey = WLF.COM
  • 配置 kdc.conf
  • 修改/var/kerberos/krb5kdc/kdc.conf,kdc的专属配置文件。
  • Java 使用 aes256-cts 验证方式需要安装额外的 jar 包,所以为了方便不用哈。
[kdcdefaults]
 kdc_ports = 88
 kdc_tcp_ports = 88

[realms]
 WLF.COM = {
  #master_key_type = aes256-cts
  acl_file = /var/kerberos/krb5kdc/kadm5.acl
  dict_file = /usr/share/dict/words
  admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
  max_file = 24h
  max_renewable_life = 7d
  supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
 }
  • 配置 kadm5.acl
  • 修改权限相关配置文件 /var/kerberos/krb5kdc/kadm5.acl
  • 其中前一个*号是通配符,表示像名为“abc/admin”或“xxx/admin”的人都可以使用此工具(远程或本地)管理kerberos数据库,后一个跟权限有关,*表示所有权限。WLF.COM是上面配置的realm。
*/admin@WLF.COM *
  • 初始化KDC数据库
kdb5_util create -r WLF.COM -s
ll -a /var/kerberos/krb5kdc/

image.png

  • 启动 Kerberos 服务
systemctl start krb5kdc kadmin
systemctl status krb5kdc kadmin

image.png

Kerberos 服务机器上可以使用 kadmin.local 来执行各种管理的操作!

2、Kafka 开启 Kerberos 认证

所有Kerberos 相关配置文件(java连接所需),我们都放在 Kafka 的 config/kerberos/目录下的(kerberos 目录需新建),把krb5.conf也拷贝过去。

  • 创建 keytab
cd /opt/kafka_2.12-2.2.1/
cp /etc/krb5.conf config/kerberos/cd
kadmin.local -q "add_principal -randkey kafka-server/monkey@WLF.COM"
kadmin.local -q "add_principal -randkey kafka-client@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-server.keytab kafka-server/monkey@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-client.keytab kafka-client@WLF.COM"
  • Kafka 服务端配置

修改config/server.properties配置文件,新增如下内容:

listeners=SASL_PLAINTEXT://monkey:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka-server
  • KafkaServer 配置

新建config/kerberos/kafka-server-jaas.conf文件,内容如下:

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-server/monkey@WLF.COM";
};
  • KafkaClient 配置
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-client@WLF.COM";
};
  • 修改bin/kafka-server-start.sh,倒数第二行增加如下配置:
export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server-jaas.conf"
  • 修改bin/kafka-topics.sh、kafka-console-producer.sh、bin/kafka-console-consumer.sh ,倒数第二行增加如下配置:
export KAFKA_OPTS="-Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client-jaas.conf"
  • 新建 config/kerberos/client.properties 文件,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server
  • 附赠kafka操作脚本operate.sh
#/bin/bash

:<<!
【脚本说明】
1、此脚本用于操作kafka:启动、停止、查看运行状态、重启、查看日志、查询所有主题、创建主题、删除主题、订阅或消费主题数据;
2、建议把脚本放在kafka安装目录下;
3、适用单机版。
注意安装kafka修改配置文件:
# IP替换为Kafka所在主机的IP
sed -i '31 a listeners=PLAINTEXT://localhost:9092' config/server.properties
!

# kafka安装目录
KAFKA_HOME=/opt/kafka_2.12-2.2.1
# zookeeper地址
ZK_SERVER=monkey
# kafka地址
KAFKA_SERVER=monkey
# zk启动日志
LOG_ZK=$KAFKA_HOME/logs/zookeeper-run.log
# kafka启动日志
LOG_KAFKA=$KAFKA_HOME/logs/kafka-run.log
# sasl
CONF_SASL=config/kerberos/client.properties


# 操作
operate=$1
# 参数
param=$2

# 进程
pids=`ps -ef | egrep "Kafka|QuorumPeerMain" | egrep -v grep | awk '{print $2}'`

# 提示信息
msg='Please input params [<run>|<kil>|<res>|<sta>|<log> [zk]|<list>|<add> <{topic}>|<del> [{topic}]|<consume> <{topic}>|<produce> <{topic}>]'

# 定制化shell输出
function custom_print(){
    echo -e "\033[5;34m ***** \033[0m"
    echo -e "\033[32m $@ ! \033[0m"
    echo -e "\033[5;34m ***** \033[0m"
}

function run(){
    rm -rf $LOG_ZK $LOG_KAFKA
    # 先启动zk
    echo "start zookeeper ..."
    nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > $LOG_ZK 2>&1 &
    sleep 5
    # 再启动kafka
    nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_KAFKA 2>&1 &
}

function stop(){
    if [[ $pids ]]; then
        kill -9 $pids
        msg='Stopped success'
        custom_print $msg
    else
        msg='The service is already down'
        custom_print $msg
    fi
}

function restart(){
    if [[ $pids ]]; then
        kill -9 $pids
    fi
    run
    msg='Restart success'
    custom_print $msg
}

function status(){
    jps | egrep "Kafka|QuorumPeerMain"
    if [[ $pids ]]; then
        # 黄底蓝字
        msg='RUNNING'
        custom_print $msg
    else
        # 蓝底黑字
        echo -e "\033[5;34m ***** \033[0m"
        echo -e "\033[31m STOPPED ! \033[0m"
        echo -e "\033[5;34m ***** \033[0m"
    fi
}

function log(){
    if [[ -e $1 ]]; then
        tail -f $1
    else
        msg='No log has been generated yet'
        custom_print $msg
    fi
}

# 判断输入参数
if [[ $operate = "run" || $operate = "start" ]]; then
    if [[ $pids ]]; then
        msg='The service is already running'
        custom_print $msg
    else
        run
        msg='Start success'
        custom_print $msg
    fi
elif [[ $operate = "kil" || $operate = "stop" ]]; then
    stop
elif [[ $operate = "res" || $operate = "restart" ]]; then
    restart
elif [[ $operate = "sta" || $operate = "status" ]]; then
    status
elif [[ $operate = "log" ]]; then
    if [[ $param = "zk" ]]; then
        log $LOG_ZK
    else
        log $LOG_KAFKA
    fi
elif [[ $operate = "list" ]]; then
    $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list
elif [[ $operate = "add" && ! -z $param ]]; then
    $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_SERVER:9092 --replication-factor 1 --partitions 1 --topic $param
    msg="$param create success"
    custom_print $msg
elif [[ $operate = "del" ]]; then
    if [[ -z $param ]]; then
        topics=`$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list`
        for topic in $topics; do
            if [[ $topic != "__consumer_offsets" ]]; then
                $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $topic> /dev/null
                msg="$topic delete success"
                custom_print $msg
            fi
        done
    else
        $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $param > /dev/null
        msg="$param delete success"
        custom_print $msg
    fi
elif [[ $operate = "consume" && ! -z $param ]]; then
    if [[ -z $CONF_SASL ]]; then
        $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $param
    else
        $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $param --consumer.config $CONF_SASL
    fi
elif [[ $operate = "produce" && ! -z $param ]]; then
    if [[ -z $CONF_SASL ]]; then
        $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param 
    else
        $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param --producer.config $CONF_SASL
    fi
else
    custom_print $msg
fi

3、java 程序连接 Kafka

  • 防火墙放行88/udp端口
  • kdc服务默认端口是88。
firewall-cmd --zone=public --add-port=88/udp --permanent
firewall-cmd --reload
  • 引入maven依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.1</version>
</dependency>
  • 程序示例

注意:需要修改 kafka-client-jaas.conf配置文件中配置的kafka-client.keytab路径!

package com.cloudansys;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

public class TestKafkaKerberos {

    public static void main(String[] args) {
        // 消费者
        testConsumer();

        // 生产者
        testProducer();
    }

    private static void testConsumer() {
        System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf");
        System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "monkey:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // sasl
        props.put("sasl.mechanism", "GSSAPI");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka-server");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test";
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
                            record.offset(), record.partition(), record.key(), record.value());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void testProducer() {
        // JAAS配置文件路径和Kerberos配置文件路径
        System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf");
        System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf");
        // kafka属性配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "monkey:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // kerberos安全认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "kafka-server");

        String topic = "test";
        String msg = "this is a test msg";
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        // 发送消息记录
        Future<RecordMetadata> future = kafkaProducer.send(record);
        try {
            RecordMetadata metadata = future.get();
            System.out.printf("Message sent to Kafka topic=%s, partition=%d, offset=%d\n", metadata.topic(), metadata.partition(), metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        }
        kafkaProducer.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1377588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RT-Thread:STM32 PHY 调试,使用软件包 WIZNET 驱动 W5500

说明&#xff1a; 1. 本文记录使用 RT-Thread 软件包 WIZNET驱动 W5500 的调试笔记。 2. 采用 RT-Thread Studio 工程 STM32F407VET6 芯片&#xff0c;W5500 PHY芯片&#xff0c;两者之间使用SPI接口链接 。 注意&#xff1a; 1.在按流程建立工程&#xff0c;和移植完 wizn…

ASP.NET摄影展示网站源码

ASP.NET摄影展示网站源码 项目描述 网站利用了ext技术&#xff0c;用户自定义了展示控件 前台展示类别有&#xff1a; 协会动态&#xff0c;摄影理论&#xff0c;影展影赛&#xff0c;采风路线&#xff0c; 影友之窗&#xff0c;佳作欣赏&#xff0c;器材专区&#xff0c;展览信…

「达摩院MindOpt」优化FlowShop流水线作业排班问题

FlowShop流水线作业 在企业在面临大量多样化的生产任务时&#xff0c;如何合理地安排流水线作业以提高生产效率及确保交货期成为了一个重要的问题。 一个典型的问题就是FlowShop流水线作业安排问题,也有称为生产下料问题。它涉及到多台机器、多个工序以及多个作业的调度安排。…

QWebEngineView类方法、属性、信号与槽汇总

文章目录 📖 介绍 📖🏡 环境 🏡📒 使用方法 📒📝 使用示例📝 方法📝 属性📝 信号(Signals)📝 槽(Slots)⚓️ 相关链接 ⚓️📖 介绍 📖 QWebEngineView 是 Qt 提供的一个用于呈现 Web 内容的类,基于 Google 的 Chromium 浏览器引擎。它提供了对现…

在线直线度测量仪确保了出厂圆棒无不合格品

在线直线度测量仪确保了出厂圆棒无不合格品 随着生产设备的改进&#xff0c;利用基础材料进行生产的厂家对品质要求也越来越高&#xff0c;其中圆形棒管材的直线度尺寸&#xff0c;也是广受关注&#xff0c;对其进行矫直检测&#xff0c;使其出厂无不合格品。 变抽检为全检 以前…

C 语言每日一题——旋转数组的最小数字

一、题目内容 提供一下该OJ题的链接&#xff1a;旋转数组的最小数字_牛客题霸_牛客网 (nowcoder.com) 二、题目分析 通过示例1可知&#xff0c;我们写代码的目的是在数组中找到一个最大值&#xff0c;并且返回来&#xff1b; 我们很容易的会想到创建一个变量&#xff1a;int…

企业培训系统源码:构建智能、可扩展的学习平台

企业培训系统在现代企业中扮演着至关重要的角色。本文将通过深度解析企业培训系统的源码&#xff0c;介绍如何构建一个智能、可扩展的学习平台&#xff0c;涉及关键技术和代码实例。 1. 技术栈选择与项目初始化 在构建企业培训系统之前&#xff0c;选择适当的技术栈是至关重…

先锋WEB燃气收费系统 Upload.aspx 文件上传漏洞复现

0x01 产品简介 先锋WEB燃气收费系统是一种先进的在线燃气收费解决方案,旨在简化和优化燃气收费的流程和管理。该系统基于Web平台,提供了一系列功能和工具,使燃气公司能够高效地进行收费、账单管理和客户服务。 0x02 漏洞概述 先锋WEB燃气收费系统/AjaxService/Upload.asp…

Windows压缩包的MySQL安装方式

1.下载压缩包 https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.35-winx64.zip 2.解压压缩包&#xff08;建议将解压到非C盘&#xff0c;路径不要出现特殊符号&#xff09; 3.在MySQL主目录下&#xff0c;创建my.ini空文件&#xff08;先创建一个txt文件&#xff0c;进…

软件测试|selenium 元素无法选择异常的原因及解决

简介 在进行 Web 自动化测试时&#xff0c;使用 Selenium 可能会遇到各种异常情况。其中之一就是 ElementNotSelectableException 异常&#xff0c;该异常通常意味着在尝试选择一个不可选元素时出现了问题。本文将详细介绍这个异常的原因、可能的解决方法&#xff0c;并提供示…

微信小程序开发学习笔记《10》页面导航

微信小程序开发学习笔记《10》页面导航 博主正在学习微信小程序开发&#xff0c;希望记录自己学习过程同时与广大网友共同学习讨论。导航 官方文档 一、介绍 1. 什么是页面导航 页面导航指的是页面之间的相互跳转。例如&#xff0c;浏览器中实现页面导航的方式有如下两种: …

<蓝桥杯软件赛>零基础备赛20周--第14周--BFS

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周。 在QQ群上交流答疑&am…

C++I/O流——(2)预定义格式的输入/输出(第一节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

snmp协议配置

引言 SNMP&#xff08;Simple Network Management Protocol&#xff09;是一种网络管理协议&#xff0c;用于管理和监控网络设备、操作系统和应用程序。它提供了一组用于检索和修改网络设备配置、监视设备状态和性能的标准化方法。 SNMP 是一个客户端-服务器协议&#xff0c;…

【Docker】数据管理之数据卷的挂载

一、什么是数据卷 为了很好的实现数据保存和数据共享&#xff0c;Docker提出了Volume这个概念&#xff0c;简单的说就是绕过默认的联合 文件系统&#xff0c;而以正常的文件或者目录的形式存在于宿主机上。又被称作数据卷。数据卷提供了一些有用的特性&#xff1a; 数据卷可以在…

java自定义排序Comparator

&#x1f4d1;前言 本文主要是【java】——java自定义排序Comparator的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;每…

《C++ Primer》第14章 重载运算与类型转换(一)

参考资料&#xff1a; 《C Primer》第5版《C Primer 习题集》第5版 14.1 基本概念&#xff08;P490&#xff09; 重载的运算符是具有特殊名字的函数&#xff0c;其名字有 operator 和要定义的运算符组合而成。和其他函数一样&#xff0c;重载运算符也具有返回类型、参数列表…

人工智能推动供应链革命的成功

人工智能推动供应链革命的成功 目录 人工智能推动供应链革命的成功一、供应链管理不断变化的面貌二、拥挤的解决方案景观三、踏上人工智能驱动的转型1. 价值创造识别、战略和路线图2. 目标解决方案设计和供应商选择3. 实施与系统集成4. 变革管理、能力建设和全面价值获取 新技术…

深思这届CES,前沿新物种「辣眼睛」背后

作者 | 陈然 来源 | 洞见新研社 每届CES都会让不少人发出“无趣”的感叹。的确&#xff0c;无论是置身会场还是看网络上的报道&#xff0c;到处都是熙熙攘攘的人群&#xff0c;很难从中发现哪些产品或创意值得把玩一番。 实际上&#xff0c;辣眼睛的新物种常有&#xff0c;制…

发动机装备3d虚拟在线云展馆360度展示每处细节

在当今数字化的时代&#xff0c;消费者对于线上购物的需求与期待日益增长。尤其在购车这一大宗消费行为上&#xff0c;消费者不再满足于传统的图片与文字介绍。为了满足这一市场需求&#xff0c;我们引入了3D线上展示技术。 3D汽车模型实景互动展示是一种通过先进的三维建模技术…