1,准备好Kafka 镜像包:
- bitnami/kafka:3.9.0 镜像资源包
2,准备好kafka.keystore.jks 和 kafka.truststore.jks证书
具体操作可参考:
Docker部署Kafka SASL_SSL认证,并集成到Spring Boot-CSDN博客
3,配置文件 docker-compose.yml
配置中使用的IP 1.14.165.18为主机IP,需要更换,提供外部访问
注意1.14.165.18要替换成主机IP
version: '3.8'
services:
kafka1:
image: bitnami/kafka:3.9.0
container_name: kafka1
ports:
- "9092:9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka
# Listeners
- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
# SASL
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=kafka
- KAFKA_CONTROLLER_PASSWORD=kafka2024
- KAFKA_INTER_BROKER_USER=kafka
- KAFKA_INTER_BROKER_PASSWORD=kafka2024
- KAFKA_CLIENT_USERS=kafka
- KAFKA_CLIENT_PASSWORDS=kafka2024
# SSL
- KAFKA_TLS_TYPE=JKS
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_CERTIFICATE_PASSWORD=kafka2024
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- '/data/kafka/kafka_1_data:/bitnami/kafka'
- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'
networks:
- kafka-net
kafka2:
image: bitnami/kafka:3.9.0
container_name: kafka2
ports:
- "9093:9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka
# Listeners
- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
# SASL
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=kafka
- KAFKA_CONTROLLER_PASSWORD=kafka2024
- KAFKA_INTER_BROKER_USER=kafka
- KAFKA_INTER_BROKER_PASSWORD=kafka2024
- KAFKA_CLIENT_USERS=kafka
- KAFKA_CLIENT_PASSWORDS=kafka2024
# SSL
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_TLS_TYPE=JKS
- KAFKA_CERTIFICATE_PASSWORD=kafka2024
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- '/data/kafka/kafka_2_data:/bitnami/kafka'
- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'
networks:
- kafka-net
kafka-:
image: bitnami/kafka:3.9.0
container_name: kafka3
ports:
- "9094:9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=3
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka
# Listeners
- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9094
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
# SASL
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
- KAFKA_CONTROLLER_USER=kafka
- KAFKA_CONTROLLER_PASSWORD=kafka2024
- KAFKA_INTER_BROKER_USER=kafka
- KAFKA_INTER_BROKER_PASSWORD=kafka2024
- KAFKA_CLIENT_USERS=kafka
- KAFKA_CLIENT_PASSWORDS=kafka2024
# SSL
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_TLS_TYPE=JKS
- KAFKA_CERTIFICATE_PASSWORD=kafka2024
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- '/data/kafka/kafka_3_data:/bitnami/kafka'
- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
4,创建数据挂载目录
sudo mkdir -p /data/kafka/kafka_1_data /data/kafka/kafka_2_data /data/kafka/kafka_3_data
sudo chmod 777 /data/kafka/*
5,创建 Docker 网络
为了确保 Kafka 节点之间可以相互通信,我们需要创建一个 Docker 网络。
sudo docker network create kafka-net
6,启动服务
在 kafka-cluster 目录中运行以下命令来启动 Kafka 集群:
sudo docker-compose up -d
7,测试验证:
在容器修改producer.properties和consumer.properties
增加以下参数:
具体操作可查看上篇文章
ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm=
注意1.14.165.18要替换成主机IP
测试发送消息:
sudo docker exec -it kafka1 kafka-console-producer.sh --bootstrap-server 1.14.165.18:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
测试接收消息:
sudo docker exec -it kafka1 kafka-console-consumer.sh --bootstrap-server 1.14.165.18:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties
10,使用Spring Boot 集成Kafka集群
添加pom依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置application.yml,并修改对应服务IP地址
注意1.14.165.18要替换成Kafka服务器IP
spring:
application:
name: ncc
kafka:
bootstrap-servers:
- 1.14.165.18:9092
- 1.14.165.18:9093
- 1.14.165.18:9094
properties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka2024";
ssl.truststore.location: kafka.truststore.jks
ssl.truststore.password: kafka2024
ssl.keystore.location: kafka.keystore.jks
ssl.keystore.password: kafka2024
ssl.key.password: kafka2024
ssl.endpoint.identification.algorithm:
producer.ssl.endpoint.identification.algorithm:
consumer.ssl.endpoint.identification.algorithm:
并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目
11,创建KafkaTest测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest(classes = NccApplication.class)
public class KafkaTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void send() {
kafkaTemplate.send("test","hello client ");
}
}
测试通过