Kafka SASL_SSL双重认证

news2024/11/17 11:33:04

文章目录

    • 1. 背景
    • 2. 环境
    • 3. 操作步骤
      • 3.1 生成SSL证书
      • 3.2 配置zookeeper认证
      • 3.3 配置kafka安全认证
      • 3.4 使用kafka客户端进行验证
      • 3.5 使用Java端代码进行认证

1. 背景

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

  • SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
  • SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、JDK1.8

3. 操作步骤

  1. 生成SSL证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="1qaz@WSX"
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};
 
Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="1qaz@WSX";
};

kafka-client-jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="1qaz@WSX";
};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi

...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

producer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>

#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

3.5 使用Java端代码进行认证

第一步: yaml 配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9093
    properties:
      sasl:
        mechanism: PLAIN
        jaas:
          #此处填写 SASL登录时分配的用户名密码(注意password结尾;)
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
      security:
        protocol: SASL_SSL
    ssl:
      trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks
      trust-store-password: Q06688
      key-store-type: JKS
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 106384
      acks: -1
      retries: 3
      properties:
        linger-ms: 1
        retry.backoff.ms: 1000
        buffer-memory: 33554432

第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.properties.linger-ms}")
    private int lingerMs;
    @Value("${spring.kafka.producer.properties.buffer-memory}")
    private int bufferMemory;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;
    @Value("${spring.kafka.properties.security.protocol}")
    private String protocol;
    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.ssl.trust-store-location}")
    private String trustStoreLocation;
    @Value("${spring.kafka.ssl.trust-store-password}")
    private String trustStorePassword;
    @Value("${spring.kafka.ssl.key-store-type}")
    private String keyStoreType;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * the producer factory config
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put("security.protocol", protocol);
        props.put(SaslConfigs.SASL_MECHANISM, mechanism);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
        props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        return new DefaultKafkaProducerFactory<String, String>(props);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SRS视频服务器使用记录

SRS是一个开源的&#xff08;MIT协议&#xff09;简单高效的实时视频服务器&#xff0c;支持RTMP、WebRTC、HLS、HTTP-FLV、SRT、MPEG-DASH和GB28181等协议。 SRS媒体服务器和FFmpeg、OBS、VLC、 WebRTC等客户端配合使用&#xff0c;提供流的接收和分发的能力&#xff0c;是一个…

鸿蒙开发系列教程(十二)--布局应用:Flex布局

相关属性参数与css3的flex布局参数相似 排列方向&#xff1a;direction: FlexDirection.Row, 换行&#xff1a;wrap: FlexWrap.NoWrap, 水平垂直对齐方式&#xff1a; justifyContent: FlexAlign. SpaceBetween, alignItems: ItemAlign.Center Entry Component struct Flex…

【开源】SpringBoot框架开发高校学生管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学生管理模块2.2 学院课程模块2.3 学生选课模块2.4 成绩管理模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 学生表3.2.2 学院课程表3.2.3 学生选课表3.2.4 学生成绩表 四、系统展示五、核心代码5.1 查询课程5.2 新…

目标检测:3采用YOLOv8 API训练自己的模型

​ 目录 ​1.YOLOv8 的新特性 2.如何使用 YOLOv8? 3使用YOLOv8训练模型 4.验证训练集 5.测试训练集 6.测验其他图片 7 其他问题 参考: 1.YOLOv8 的新特性 Ultralytics 为 YOLO 模型发布了一个全新的存储库。它被构建为 用于训练对象检测、实例分割和图像分类模型的统…

【UE】游戏运行流程的简单理解

流程图 官方的游戏流程图&#xff1a; 一般顺序为初始化引擎、创建并初始化 GameInstance、加载关卡&#xff0c;最后开始游戏。 总的来说就是&#xff1a; 开始游戏-》游戏实例-》关卡-》游戏模式-》玩家控制器-》Pawn、玩家状态、HUD、UMG&#xff08;可有可无&#xff09; …

【力扣】Z字形变换,模拟+直接构造

Z字形变换原题地址 方法一&#xff1a;利用二维矩阵模拟 对于特殊情况&#xff0c;z字形变换后只有一行或只有一列&#xff0c;则变换后的字符串和原字符串相同。 对于一般情况&#xff0c;我们可以考虑按照题目要求&#xff0c;把字符串按照Z字形存储到二维数组中&#xff…

亿级流量高并发春晚互动前端技术揭秘

前言 2022年1月&#xff0c;京东成为央视总台2022年春节联欢晚会独家互动合作伙伴&#xff0c;双方在红包互动、电商等方面展开全方位深度合作。在除夕当天产生691亿次互动&#xff0c;送出15亿元红包好物。 如何在这种大规模、高并发的场景下&#xff0c;确保系统的稳定性和…

计算机毕业设计 | SSM 医药信息管理系统(附源码)

1&#xff0c; 概述 1.1 课题背景 本系统由说书客面向广大民营药店、县区级医院、个体诊所等群体的药品和客户等信息的管理需求&#xff0c;采用SpringSpringMVCMybatisEasyui架构实现&#xff0c;为单体药店、批发企业、零售连锁企业&#xff0c;提供有针对性的信息数据管理…

【机器学习与自然语言处理】预训练 Pre-Training 各种经典方法的概念汇总

【NLP概念合集&#xff1a;一】预训练 Pre-Training&#xff0c;微调 Fine-Tuning 及其方法的概念区别 前言请看此正文预训练 Pre-Training无监督学习 unsupervised learning概念&#xff1a;标签PCA 主成分分析&#xff08;Principal Component Analysis&#xff09;降维算法L…

想要精准跟进客户,试试CRM系统!

客户跟进是任何成功企业的命脉&#xff0c;它是从初始联系到转化、从培育到购买之间的桥梁。然而&#xff0c;客户们每天都被各种信息轰炸&#xff0c;很难将注意力集中在任何一个事情上。因此&#xff0c;企业想要在客户中脱颖而出&#xff0c;就必须能够吸引并维持他们的注意…

代驾应用系统(ssm)

登录首页 管理员界面 代驾司机界面 普通用户界面 前台页面 1、系统说明 &#xff08;1&#xff09; 框架&#xff1a;spring、springmvc、mybatis、mysql、jsp &#xff08;2&#xff09; 系统分为前台系统、后端管理系统 2、欢迎留言联系交流学习讨论&#xff1a;qq 97820625…

【JS逆向学习】今日头条

逆向目标 目标网页&#xff1a;https://www.toutiao.com/?wid1707099375036目标接口&#xff1a;https://www.toutiao.com/api/pc/list/feed目标参数&#xff1a;_signature 逆向过程 老规矩先观察网络请求&#xff0c;过滤XHR请求观察加密参数&#xff0c;发现Payload的_s…

代码手术刀-自定义你的代码重构工具

前言 笔者近日在做代码仓库的存量代码缩减工作&#xff0c;首先考虑的是基于静态扫描的缩减&#xff0c;尝试使用了很多工具来对代码进行优化&#xff0c;例如PMD、IDEA自带的inspect功能、findBugs等。但是无一例外&#xff0c;要么过于“保守”&#xff0c;只给出扫描结果&a…

计算机网络自顶向下Wireshark labs-HTTP

我直接翻译并在题目下面直接下我的答案了。 1.基本HTTP GET/response交互 我们开始探索HTTP&#xff0c;方法是下载一个非常简单的HTML文件 非常短&#xff0c;并且不包含嵌入的对象。执行以下操作&#xff1a; 启动您的浏览器。启动Wireshark数据包嗅探器&#xff0c;如Wir…

HGAME 2024 WEEK1 Web方向题解 全

---------【WEEK-1】--------- Bypass it 题目描述&#xff1a;This page requires javascript to be enabled &#x1f603; 开题 不给注册&#xff0c;进注册就弹窗。根据题目描述&#xff0c;禁用JS 注册成功登录给flag 2048*16 前端小游戏出这么难。JS源码各种混淆手段…

Java-并发高频面试题-2

接着之前的Java-并发高频面试题 7. synchronized的实现原理是怎么样的&#xff1f; 首先我们要知道synchronized它是解决线程安全问题的一种方式&#xff0c;而具体是怎么解决的呢&#xff1f;主要是通过加锁的方式来解决 在底层实现上来看 是通过 monitorenter、monitorexit…

LabVIEW多功能接口卡驱动

LabVIEW多功能接口卡驱动 随着自动化测试系统的复杂性增加&#xff0c;对数据采集与处理的需求不断提高。研究基于LabVIEW开发平台&#xff0c;实现对一种通用多功能接口卡的驱动&#xff0c;以支持多通道数据采集及处理功能&#xff0c;展现LabVIEW在自动化和测量领域的强大能…

破除Github API接口的访问次数限制

破除Github API接口的访问次数限制 1、Github介绍2、Github API接口2.1 介绍2.2 使用方法 3、Github API访问限制3.1 访问限制原因3.2 访问限制类别 4、Github API访问限制破除4.1 限制破除原理4.2 限制破除示例 1、Github介绍 Github&#xff0c;是一个面向开源及私有软件项目…

opencv0014 索贝尔(sobel)算子

前面学习的滤波器主要是用来模糊图像&#xff0c;今天一起来了解关于边缘识别的滤波吧&#xff01;嘿嘿 边缘 边缘是像素值发生跃迁的位置&#xff0c;是图像的显著特征之一&#xff0c;在图像特征提取&#xff0c;对象检测&#xff0c;模式识别等方面都有重要的作用。 人眼如…

大数据企业应用场景分析

目录 一、企业分析 1.1 企业领域维度分析 1.2 技术服务型维度分析 1.3 细分领域维度分析 二、大数据应用场景 2.1 数据分析 2.2 智能推荐 2.3 产品/流程优化 2.4 异常监测 2.5 智能管理 2.6 人工智能和机器学习 三、总结 前言&#xff1a;想讲清楚大数据应用对企业…