Kafka Cluster 扩容
针对kafka集群,可以通过向群集添加新节点来扩展群集。新节点将仅服务于新主题或新分区,现有分区将不会自动重新平衡以使用新节点。如果需要对现有的TOPIC进行重新分配分区,需要运维人员手动进行干预。今天学习下如何对已有的kafka集群进行扩容?如何将现有TOPIC分区迁移到新添加的节点上?
新建集群
集群规划
在本机搭建一个3个Broker节点的kafka集群,组成一个简单的单机版集群环境,用于学习使用,各节点信息如下:
-
Node1 节点
broker_id=10 listeners=PLAINTEXT://:19092 log.dirs=/tmp/19092/kafka-logs
-
Node2 节点
broker_id=11 listeners=PLAINTEXT://:19093 log.dirs=/tmp/19093/kafka-logs
-
Node3 节点
broker_id=12 listeners=PLAINTEXT://:19094 log.dirs=/tmp/19094/kafka-logs
集群配置
- 在kafka安装目录 创建clusters目录
mkdir clusters
- 复制kafka默认配置文件到目录中
# 复制node1 节点配置
cp config/server.properties clusters/node1.properties
# 复制node2 节点配置
cp config/server.properties clusters/node2.properties
# 复制node3 节点配置
cp config/server.properties clusters/node3.properties
并根据之前的节点信息,对配置属性进行对应的修改
启动集群
# 1. 启动 zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 1. 启动节点1
./bin/kafka-server-start.sh clusters/node1.properties
# 2. 启动节点2
./bin/kafka-server-start.sh clusters/node2.properties
# 3. 启动节点3
./bin/kafka-server-start.sh clusters/node3.properties
创建主题
./bin/kafka-topics.sh --create --topic clustersTopic --bootstrap-server localhost:19092 --partitions 6
- 查看topic信息
./bin/kafka-topics.sh --describe --topic clustersTopic --bootstrap-server localhost:19092
集群扩容
假设,由于业务发展迅速,现有的3个节点的kafka集群不足以满足实际要求,现在需要对集群进行扩容,重新新增一个broker节点,同时对现有的Topic - clustersTopic 进行扩容,将原来的分区重新分配到新增加的Node4 节点上。接下来看操作步骤
新增节点
- 新增一个Node4节点,节点配置信息如下
broker_id=13
listeners=PLAINTEXT://:19095
log.dirs=/tmp/19095/kafka-logs
- 复制 broker 配置文件 并更改以上配置属性
cp config/server.properties clusters/node4.properties
#更改Node4 以上配置属性
vim clusters/node4.properties
- 启动节点
# 3. 启动节点4
./bin/kafka-server-start.sh clusters/node4.properties
然而,这些新服务器不会自动分配任何数据分区,因此除非手动进行人工干预,否则在创建新主题之前,它们不会执行任何工作。因此,将机器添加到集群时,通常需要将一些现有数据迁移到这些机器。
迁移分区工具
现在手动将之前创建的topic - clusterTopic中的分区6 手动迁移到新的节点- Node4上。kafka 提供对应的脚本工具可以进行分区数据的迁移工作,分区分配工具可以在3种互斥模式下运行:
-
–generate - 在此模式下,给定toic列表和broker列表,该工具将生成一个候选重新分配,以将指定主题的所有分区移动到新broker。该选项仅提供了一种在给定主题和目标broker列表的情况下生成分区重新分配计划的方便方法。
-
–execute - 在此模式下,该工具根据用户提供的重新分配计划开始重新分配分区。(使用–recassignment json文件选项)。这可以是管理员手工编制的自定义重新分配计划,也可以使用–generate选项提供
-
–verify - 在此模式下,该工具将验证上次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行
迁移步骤
指定主题
生成一个JSON文件,指定待迁移的topic信息
vim clusters/move_to_topic.json
# 内容如下
{
"topics": [{
"topic": "clustersTopic"
}],
"version": 1
}
生成迁移计划
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --topics-to-move-json-file clusters/move_to_topic.json --broker-list "10,11,12,13" --generate
# --broker-list 表示json文件的topic分区 将在 "10,11,12,13" 节点中进行重新分配
如上图,工具生成两部分内容
# 第一部分内容 表示topic 现有的分区信息 用户可以将内容存储起来,用于后续备份
Current partition replica assignment
{"version":1,"partitions":[{"topic":"clustersTopic","partition":0,"replicas":[11],"log_dirs":["any"]},{"topic":"clustersTopic","partition":1,"replicas":[12],"log_dirs":["any"]},{"topic":"clustersTopic","partition":2,"replicas":[10],"log_dirs":["any"]},{"topic":"clustersTopic","partition":3,"replicas":[11],"log_dirs":["any"]},{"topic":"clustersTopic","partition":4,"replicas":[12],"log_dirs":["any"]},{"topic":"clustersTopic","partition":5,"replicas":[10],"log_dirs":["any"]}]}
# 第二部分内容 表示topic 重新分配后的分区信息 用户需要将内容存储起来 用于执行
# 存储文件名: expand-cluster-reassignment.json
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"clustersTopic","partition":0,"replicas":[10],"log_dirs":["any"]},{"topic":"clustersTopic","partition":1,"replicas":[11],"log_dirs":["any"]},{"topic":"clustersTopic","partition":2,"replicas":[12],"log_dirs":["any"]},{"topic":"clustersTopic","partition":3,"replicas":[13],"log_dirs":["any"]},{"topic":"clustersTopic","partition":4,"replicas":[10],"log_dirs":["any"]},{"topic":"clustersTopic","partition":5,"replicas":[11],"log_dirs":["any"]}]}
// clusters/expand-cluster-reassignment.json
{"version":1,"partitions":[{"topic":"clustersTopic","partition":0,"replicas":[10],"log_dirs":["any"]},{"topic":"clustersTopic","partition":1,"replicas":[11],"log_dirs":["any"]},{"topic":"clustersTopic","partition":2,"replicas":[12],"log_dirs":["any"]},{"topic":"clustersTopic","partition":3,"replicas":[13],"log_dirs":["any"]},{"topic":"clustersTopic","partition":4,"replicas":[10],"log_dirs":["any"]},{"topic":"clustersTopic","partition":5,"replicas":[11],"log_dirs":["any"]}]}
执行迁移
# 执行脚本 进行分区迁移
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --reassignment-json-file clusters/expand-cluster-reassignment.json --execute
验证结果
# 验证校验
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --reassignment-json-file clusters/expand-cluster-reassignment.json --verify
增加副本
为现有TOPIC现有分区增加复制因子很容易。只需在自定义重新分配json文件中指定额外的副本,并使用–execute选项来增加指定分区的复制因子。
# 添加副本之前的topic 状况
$ ./bin/kafka-topics.sh --describe --topic clustersTopic --bootstrap-server localhost:19092
Topic: clustersTopic TopicId: hNgIV8naQj-BMc5hpe9hVQ PartitionCount: 6 ReplicationFactor: 1 Configs:
Topic: clustersTopic Partition: 0 Leader: 10 Replicas: 10 Isr: 10
Topic: clustersTopic Partition: 1 Leader: 11 Replicas: 11 Isr: 11
Topic: clustersTopic Partition: 2 Leader: 12 Replicas: 12 Isr: 12
Topic: clustersTopic Partition: 3 Leader: 13 Replicas: 13 Isr: 13
Topic: clustersTopic Partition: 4 Leader: 10 Replicas: 10 Isr: 10
Topic: clustersTopic Partition: 5 Leader: 11 Replicas: 11 Isr: 11
指定分区
vim clusters/increase-replication-factor.json
# 创建文件,内容如下
{
"version": 1,
"partitions": [{
"topic": "clustersTopic",
"partition": 0,
"replicas": [10, 11, 12]
}]
}
执行脚本
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --reassignment-json-file clusters/increase-replication-factor.json --execute
验证结果
- 验证方式一
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --reassignment-json-file clusters/increase-replication-factor.json --verify
- 验证方式二
./bin/kafka-topics.sh --describe --topic clustersTopic --bootstrap-server localhost:19092
可以很明显看到分区0,多了2个同步副本信息