Docker搭建kafka集群
- 集群规划
- 镜像版本
- kafka为什么需要依赖zookeeper
- 创建docker网络
- 搭建zk集群
- 新建文件docker-compose-zk.yml
- 启动
- 搭建kafka集群
- 新建docker-compose-kafka.yml
- 启动集群
- 安装kafka-manager
- 新建 docker-compose-kafka-manager.yml
- 启动kafka-manager
- 配置cluster
- 修改kafka-run-class.sh
- 修改原因
- 修改文件
- 测试
集群规划
镜像版本
-
Zookeeper采用zookeeper
-
Kafka采用wurstmeister/kafka
-
Kafka-Manager采用scjtqs/kafka-manager
kafka为什么需要依赖zookeeper
ZooKeeper 作为给分布式系统提供协调服务的工具被 kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现负载均衡
1.Brork管理
在Kafka的设计中,选择了使用Zookeeper来进行所有Broker的管理,体现在zookeeper上会有一个专门用来进行Broker服务器列表记录的点,节点路径为/brokers/ids
Zookeeper用一个专门节点保存Broker服务列表,也就是 /brokers/ids。
broker启动时,向Zookeeper发送注册请求,Zookeeper会在/brokers/ids下创建这个broker节点,如/brokers/ids/[0…N],并保存broker的IP地址和端口,Broker 创建的是临时节点,在连接断开时节点就会自动删除,所以在 ZooKeeper 上就可以通过 Broker 中节点的变化来得到 Broker 的可用性。
2、负载均衡
broker向Zookeeper进行注册后,生产者根据broker节点来感知broker服务列表变化,这样可以实现动态负载均衡。
3、Topic 信息
在 Kafka 中可以定义很多个 Topic,每个 Topic 又被分为很多个 Partition。一般情况下,每个 Partition 独立在存在一个 Broker 上,所有的这些 Topic 和 Broker 的对应关系都由 ZooKeeper 进行维护。
创建docker网络
docker network create kafaka-net --subnet 172.28.10.0/16
搭建zk集群
新建文件docker-compose-zk.yml
version: '3.4'
services:
zook1:
image: zookeeper:latest
#restart: always #自动重新启动
hostname: zook1
container_name: zook1 #容器名称,方便在rancher中显示有意义的名称
ports:
- 39181:2181 #将本容器的zookeeper默认端口号映射出去
volumes: # 挂载数据卷 前面是宿主机即本机的目录位置,后面是docker的目录
- "/volumn/kafaka/zkcluster/zook1/data:/data"
- "/volumn/kafaka/zkcluster/zook1/datalog:/datalog"
- "/volumn/kafaka/zkcluster/zook1/logs:/logs"
environment:
ZOO_MY_ID: 1 #即是zookeeper的节点值,也是kafka的brokerid值
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
docker-net:
ipv4_address: 172.28.10.11
zook2:
image: zookeeper:latest
#restart: always #自动重新启动
hostname: zook2
container_name: zook2 #容器名称,方便在rancher中显示有意义的名称
ports:
- 39182:2181 #将本容器的zookeeper默认端口号映射出去
volumes:
- "/volumn/kafaka/zkcluster/zook2/data:/data"
- "/volumn/kafaka/zkcluster/zook2/datalog:/datalog"
- "/volumn/kafaka/zkcluster/zook2/logs:/logs"
environment:
ZOO_MY_ID: 2 #即是zookeeper的节点值,也是kafka的brokerid值
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
docker-net:
ipv4_address: 172.28.10.12
zook3:
image: zookeeper:latest
#restart: always #自动重新启动
hostname: zook3
container_name: zook3 #容器名称,方便在rancher中显示有意义的名称
ports:
- 39183:2181 #将本容器的zookeeper默认端口号映射出去
volumes:
- "/volumn/kafaka/zkcluster/zook3/data:/data"
- "/volumn/kafaka/zkcluster/zook3/datalog:/datalog"
- "/volumn/kafaka/zkcluster/zook3/logs:/logs"
environment:
ZOO_MY_ID: 3 #即是zookeeper的节点值,也是kafka的brokerid值
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
docker-net:
ipv4_address: 172.28.10.13
networks:
docker-net:
name: kafaka-net
启动
docker compose -p zookeeper -f ./docker-compose-zk.yml up -d
搭建kafka集群
新建docker-compose-kafka.yml
version: '2'
services:
kafka1:
image: docker.io/wurstmeister/kafka
#restart: always #自动重新启动
hostname: kafka1
container_name: kafka1
ports:
- 39093:9093
- 39193:9193
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9193
#KAFKA_ADVERTISED_LISTENERS=INSIDE://<container>:9092,OUTSIDE://<host>:9094
SKAFKA_ADVERTISED_LISTENERS: INSIDE://kafka1:9093,OUTSIDE://localhost:9193
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zook1:2181,zook2:2181,zook3:2181
ALLOW_PLAINTEXT_LISTENER : 'yes'
JMX_PORT: 9999 #开放JMX监控端口,来监测集群数据
volumes:
- /volumn/kafaka/kafka1/wurstmeister/kafka:/wurstmeister/kafka
- /volumn/kafaka/kafka1/kafka:/kafka
#- /volumn/kafaka/kafka1/bin/kafka-run-class.sh:/opt/kafka_2.13-2.8.1/bin/kafka-run-class.sh
external_links:
- zook1
- zook2
- zook3
networks:
docker-net:
ipv4_address: 172.28.10.14
kafka2:
image: docker.io/wurstmeister/kafka
#restart: always #自动重新启动
hostname: kafka2
container_name: kafka2
ports:
- 39094:9094
- 39194:9194
environment:
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: INSIDE://:9094,OUTSIDE://:9194
#KAFKA_ADVERTISED_LISTENERS=INSIDE://<container>:9092,OUTSIDE://<host>:9094
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka2:9094,OUTSIDE://localhost:9194
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zook1:2181,zook2:2181,zook3:2181
ALLOW_PLAINTEXT_LISTENER : 'yes'
JMX_PORT: 9999 #开放JMX监控端口,来监测集群数据
volumes:
- /volumn/kafaka/kafka2/wurstmeister/kafka:/wurstmeister/kafka
- /volumn/kafaka/kafka2/kafka:/kafka
# - /volumn/kafaka/kafka2/bin/kafka-run-class.sh:/opt/kafka_2.13-2.8.1/bin/kafka-run-class.sh
external_links:
- zook1
- zook2
- zook3
networks:
docker-net:
ipv4_address: 172.28.10.15
kafka3:
image: docker.io/wurstmeister/kafka
#restart: always #自动重新启动
hostname: kafka3
container_name: kafka3
ports:
- 39095:9095
- 39195:9195
environment:
KAFKA_BROKER_ID: 3
KAFKA_LISTENERS: INSIDE://:9095,OUTSIDE://:9195
#KAFKA_ADVERTISED_LISTENERS=INSIDE://<container>:9092,OUTSIDE://<host>:9094
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka3:9095,OUTSIDE://localhost:9195
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zook1:2181,zook2:2181,zook3:2181
ALLOW_PLAINTEXT_LISTENER : 'yes'
JMX_PORT: 9999 #开放JMX监控端口,来监测集群数据
volumes:
- /volumn/kafaka/kafka3/wurstmeister/kafka:/wurstmeister/kafka
- /volumn/kafaka/kafka3/kafka:/kafka
# - /volumn/kafaka/kafka3/bin/kafka-run-class.sh:/opt/kafka_2.13-2.8.1/bin/kafka-run-class.sh
external_links:
- zook1
- zook2
- zook3
networks:
docker-net:
ipv4_address: 172.28.10.16
networks:
docker-net:
name: kafaka-net
启动集群
docker compose -f ./docker-compose-kafka.yml up -d
安装kafka-manager
新建 docker-compose-kafka-manager.yml
version: '2'
services:
kafka-manager:
image: scjtqs/kafka-manager:latest
restart: always
hostname: kafka-manager
container_name: kafka-manager
ports:
- 39196:9000
external_links: # 连接本compose文件以外的container
- zook1
- zook2
- zook3
- kafka1
- kafka2
- kafka3
environment:
ZK_HOSTS: zook1:2181,zook2:2181,zook3:2181
KAFKA_BROKERS: kafka1:9093,kafka2:9094,kafka3:9095
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
docker-net:
ipv4_address: 172.28.10.20
networks:
docker-net:
name: kafaka-net
启动kafka-manager
docker compose -f ./docker-compose-kafka-manager.yml up -d
配置cluster
修改kafka-run-class.sh
修改原因
主要防止kafka-topics.sh命令创建主题时报错
Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 9966; nested exception is:
java.net.BindException: Address in use (Bind failed)
修改文件
kafka-run-class.sh在容器中/opt/kafka_2.13-2.6.0/bin/路径下
40 file=$1
41 if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
42 return 0
43 else
44 return 1
45 fi
46 }
------加入以下4行配置------
- 47 ISKAFKASERVER="false"
- 48 if [[ "$*" =~ "kafka.Kafka" ]]; then
- 49 ISKAFKASERVER="true"
- 50 fi
------------以上-----------
51 base_dir=$(dirname $0)/..
52
53 if [ -z "$SCALA_VERSION" ]; then
54 SCALA_VERSION=2.13.2
55 if [[ -f "$base_dir/gradle.properties" ]]; then
56 SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
57 fi
58 fi
......
187 # JMX settings
188 if [ -z "$KAFKA_JMX_OPTS" ]; then
189 KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false - Dcom.sun.management.jmxremote.ssl=false "
190 fi
191
192 # JMX port to use
- 193 #if [ $JMX_PORT ]; then #注释掉这一行
- 194 if [ $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then #添加这一行新的配置进去
195 KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
196 fi
197
198 # Log directory to use
199 if [ "x$LOG_DIR" = "x" ]; then
200 LOG_DIR="$base_dir/logs"
201 fi
测试
进入kafka
docker exec -ti kafka1 /bin/bash
cd opt/kafka_2.13-2.8.1/
创建topic
bin/kafka-topics.sh --create --zookeeper zook1:2181 --replication-factor 2 --partitions 2 --topic partopic
查看topic的状态
bin/kafka-topics.sh --describe --zookeeper zook1:2181 --topic partopic