Docker部署Kafka SASL_SSL认证,并集成到Spring Boot

news2025/1/19 16:59:51

1,创建证书和密钥

需要openssl环境,如果是Window下,下载openssl

Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions

还需要keytool环境,此环境是在jdk环境下 

本案例所使用的账号密码均为:

kafka: kafka2024

2, 生成 CA 证书

# 创建CA
openssl req -new -x509 -keyout ca-key.pem -out ca-cert.pem -days 3650 -nodes -passin pass:kafka2024 -passout pass:kafka2024 -subj "/CN=kafka CA"

# 创建Kafka服务器证书请求
keytool -keystore kafka.keystore.jks -validity 3650 -genkey -keyalg RSA -storepass kafka2024 -keypass kafka2024 -dname "CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown"

# 将CA证书导入Kafka服务器的密钥库
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert.pem -storepass kafka2024 -noprompt

# 创建证书签名请求(CSR)
keytool -keystore kafka.keystore.jks -certreq -file cert.csr -storepass kafka2024 -keypass kafka2024


# 使用CA签署证书
openssl x509 -req -CA ca-cert.pem -CAkey ca-key.pem -in cert.csr -out cert.pem -days 3650 -CAcreateserial -passin pass:kafka2024

# 导入签署后的证书到密钥库
keytool -keystore kafka.keystore.jks -import -file cert.pem -storepass kafka2024 -keypass kafka2024 -noprompt

# 创建信任库并导入CA证书
keytool -keystore kafka.truststore.jks -alias CARoot -import -file cert.pem -storepass kafka2024 -noprompt

3,得到kafka.keystore.jks、kafka.truststore.jks:

4,准备好镜像包: 

  1. bitnami/kafka:3.9.0 镜像资源包

 

5,配置文件docker-compose.yml 

注意:将 IP地址改成主机IP 1.14.165.18

version: '3.8'

services:
  kafka:
    image: 'bitnami/kafka:3.9.0'
    container_name: kafka
    ports:
      - '9092:9092'
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
      - KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
      # SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=kafka
      - KAFKA_CONTROLLER_PASSWORD=kafka2024
      - KAFKA_INTER_BROKER_USER=kafka
      - KAFKA_INTER_BROKER_PASSWORD=kafka2024
      - KAFKA_CLIENT_USERS=kafka
      - KAFKA_CLIENT_PASSWORDS=kafka2024
      # SSL
      - KAFKA_TLS_TYPE=JKS
      - KAFKA_CERTIFICATE_PASSWORD=kafka2024
    volumes:
      - './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
      - './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'

6,在当前目录下,准备好如下三个文件:

7,运行以下命令启动Kafka服务:

sudo docker-compose up -d

8,测试验证:

测试发送消息

sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

 发现有报错:

[2024-11-14 08:41:04,117] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,118] WARN [Producer clientId=console-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,120] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
        at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:383)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:444)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:533)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:382)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:302)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:501)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:596)
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
        at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:569)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:490)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:251)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.security.cert.CertificateException: No subject alternative names present
        at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:142)
        at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:458)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:432)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:292)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329)

解决办法:

在容器内配置文件两个文件加上参数algorithm

/opt/bitnami/kafka/config/producer.properties
/opt/bitnami/kafka/config/consumer.properties

ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm= 

 

如果容器内没办法编辑,可以先把文件拷贝出来修改,然后再拷贝覆盖

sudo docker cp kafka:/opt/bitnami/kafka/config/consumer.properties consumer.properties
sudo docker cp kafka:/opt/bitnami/kafka/config/producer.properties producer.properties


sudo vi consumer.properties
sudo vi producer.properties

sudo docker cp consumer.properties kafka:/opt/bitnami/kafka/config/consumer.properties
sudo docker cp producer.properties kafka:/opt/bitnami/kafka/config/producer.properties

再次执行发送消息:

sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

 

另外开一个窗口测试接受消息:

sudo docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties

10,使用Spring Boot 集成Kafka

在Spring Boot应用中配置Kafka客户端以使用SASL_SSL认证。
在pom.xml中添加Kafka客户端依赖。

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

 配置application.yml,修改对应IP地址

spring:
  application:
    name: ncc
  kafka:
    bootstrap-servers: 1.14.165.11:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-512
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka2024";
      ssl.truststore.location: kafka.truststore.jks
      ssl.truststore.password: kafka2024
      ssl.keystore.location: kafka.keystore.jks
      ssl.keystore.password: kafka2024
      ssl.key.password: kafka2024
      ssl.endpoint.identification.algorithm:
      producer.ssl.endpoint.identification.algorithm:
      consumer.ssl.endpoint.identification.algorithm:












并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目:

11,创建KafkaTest测试类

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest(classes = NccApplication.class)
public class KafkaTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void send() {
        kafkaTemplate.send("test","hhh");
    }

}

测试通过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【进阶系列】python简单爬虫实例

python有一个很强大的功能就是爬取网页的信息&#xff0c;这里是CNBlogs 网站&#xff0c;我们将以此网站为实例&#xff0c;爬取指定个页面的大标题内容。代码如下&#xff1a; 首先是导入库&#xff1a; # 导入所需的库 import requests # 用于发送HTTP请求 from bs4 impor…

基于Java和Vue实现的上门做饭系统上门做饭软件厨师上门app

市场前景 生活节奏加快&#xff1a;在当今快节奏的社会中&#xff0c;越来越多的人因工作忙碌、时间紧张而无法亲自下厨&#xff0c;上门做饭服务恰好满足了这部分人群的需求&#xff0c;为他们提供了便捷、高效的餐饮解决方案。个性化需求增加&#xff1a;随着人们生活水平的…

CentOS 7中查找已安装JDK路径的方法

使用yum安装了jdk8&#xff0c;但是其他中间件需要配置路径的时候&#xff0c;却没办法找到&#xff0c;如何获取jdk路径&#xff1a; 一、确认服务器是否存在jdk java -version 二、查找jdk的 java 命令在哪里 which java 三、找到软链指向的地址 ls -lrt /usr/bin/java l…

分布式----Ceph部署

目录 一、存储基础 1.1 单机存储设备 1.2 单机存储的问题 1.3 商业存储解决方案 1.4 分布式存储&#xff08;软件定义的存储 SDS&#xff09; 1.5 分布式存储的类型 二、Ceph 简介 三、Ceph 优势 四、Ceph 架构 五、Ceph 核心组件 #Pool中数据保存方式支持两种类型&…

UE5 材质里面画圆锯齿严重的问题

直接这么画圆会带来锯齿&#xff0c;我们对锯齿位置进行模糊 可以用smoothstep&#xff0c;做值的平滑过渡&#xff08;虽然不是模糊&#xff0c;但是类似&#xff09;

即插即用的3D神经元注意算法!

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月18日10点39分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文连接 点击开启你的论文编制之旅…

Mac的Terminal随机主题配置

2024年8月8日 引言 对于使用Mac的朋友&#xff0c;如果你是一个程序员&#xff0c;那肯定会用到Terminal。一般来说Terminal就是一个黑框&#xff0c;但其实Terminal是有10款官方皮肤。 每个都是不一样的主题&#xff0c;颜色和字体都会有所改变。现在就有一个方法可以很平均…

《Probing the 3D Awareness of Visual Foundation Models》论文解析——单图像表面重建

一、论文简介 论文讨论了大规模预训练产生的视觉基础模型在处理任意图像时的强大能力&#xff0c;这些模型不仅能够完成训练任务&#xff0c;其中间表示还对其他视觉任务&#xff08;如检测和分割&#xff09;有用。研究者们提出了一个问题&#xff1a;这些模型是否能够表示物体…

泷羽sec学习打卡-云技术基础1-docker

声明 学习视频来自B站UP主 泷羽sec,如涉及侵权马上删除文章 笔记的只是方便各位师傅学习知识,以下网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负 关于云技术基础的那些事儿-Base1 一、云技术基础什么是云架构&#xff1f;什么是云服务&#xff1f;什么…

03-axios常用的请求方法、axios错误处理

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

Spring Boot 与腾讯云 MySQL 监听 Binlog 数据变化,并使用 UI 展示页面效果

引言 在现代的分布式系统和微服务架构中&#xff0c;数据同步和变更监控是保证系统一致性和实时性的核心问题之一。MySQL 数据库的 binlog&#xff08;二进制日志&#xff09;功能能够记录所有对数据库的修改操作&#xff0c;如插入&#xff08;INSERT&#xff09;、更新&…

Spring Boot汽车资讯:科技与速度的新纪元

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了汽车资讯网站的开发全过程。通过分析汽车资讯网站管理的不足&#xff0c;创建了一个计算机管理汽车资讯网站的方案。文章介绍了汽车资讯网站的系统分析部分&…

thinkphp6模板调用URL方法生成的链接异常

var uul params.url ;console.log(params.url);console.log("{:Url(UserLog/index)}");console.log("{:Url("uul")}"); 生成的链接地址 UserLog/index /jjg/index.php/Home/UserLog/index.html /jjg/index.php/Home/Index/UserLog/index.html…

NodeJS 百度智能云文本转语音(实测)

现在文本转语音的技术已经非常完善了&#xff0c;尽管网络上有许多免费的工具&#xff0c;还是测试了专业的服务&#xff0c;选择了百度的TTS服务。 于是&#xff0c;在百度智能云注册和开通了文本转语音的服务&#xff0c;尝试使用NodeJS 实现文本转语音服务。但是百度的文档实…

UML 类图讲解

UML 类图符号含义 在 UML 类图中&#xff0c;每个符号都有其特定的含义。以下是常见符号的解释&#xff1a; : Public&#xff08;公共访问权限&#xff09;-: Private&#xff08;私有访问权限&#xff09;#: Protected&#xff08;受保护访问权限&#xff09;~: Package&…

【GAT】 代码详解 (1) 运行方法【pytorch】可运行版本

GRAPH ATTENTION NETWORKS 代码详解 前言0.引言1. 环境配置2. 代码的运行2.1 报错处理2.2 运行结果展示 3.总结 前言 在前文中&#xff0c;我们已经深入探讨了图卷积神经网络和图注意力网络的理论基础。还没看的同学点这里补习下。接下来&#xff0c;将开启一个新的阶段&#…

远程控制步骤

当远在千里之外的朋友想求助你帮他找到他电脑上的文件、或者是给他安装软件时。但是你给他说了他又找不到&#xff0c;那么这时你就可以通过控制对方的电脑去做一系列的操作。 如何远程控制对方的电脑非常关键。 方法一&#xff08;Windows自带远程桌面功能&#xff09;&#…

C指针之舞——指针探秘之旅

❤博客主页&#xff1a;折枝寄北-CSDN博客 ❤专栏内容&#xff1a;C语言学习专栏https://blog.csdn.net/2303_80170533/category_12794764.html?spm1001.2014.3001.5482 指针基础学习 在之前的博客文章中&#xff0c;简单总结了指针的基础概念 我们知道了指针的概念&#xf…

前端 JS 浅拷贝与深拷贝

目录 一、问题引出 二、浅拷贝 1、通过解构重构实现浅拷贝 三、深拷贝 1、自定义实现深拷贝 2、JSON实现深拷贝 四、总结 一、问题引出 基础类型的数据存放&#xff1a; let a 100let b aconsole.log("a:" a, "b:" b)a 50console.log("a…