目录
一、准备工作
1、安装好Zookeeper集群
2、安装好Kafka集群
二、Kafka扩容broker
三、Kafka数据迁移
1、查看主题列表
2、创建Topic
3、查看Topic详细信息
4、生成需要迁移的json
5、生成迁移计划
6、执行迁移计划
7、查看迁移计划
8、确认topic数据分布
一、准备工作
1、安装好Zookeeper集群
【大数据入门核心技术-Zookeeper】(五)ZooKeeper集群搭建
2、安装好Kafka集群
【大数据入门核心技术-Kafka】(三)Kafka高可用集群部署
二、Kafka扩容broker
kafka集群扩容比较简单,机器配置一样的前提下只需要把配置文件里的brokerid改一个新的启动起来就可以。
复制kafka文件目录到新的节点目录
scp -r /usr/local/kafka_2.13-2.7.1 hadoop104:/usr/local/
cd /usr/local/kafka_2.13-2.7.1/config
vi server.properties
#指定该节点brokerID
broker.id=4
#分区数量
num.partitions=3
#kafka日志目录
log.dirs=/usr/local/kafka_2.13-2.7.1/logs
#指定kafka绑定监听地址
listeners=PLAINTEXT://hadoop104:9092
#指定zk配置
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
配置完后,启动kafka即可
# 启动Kafka
cd /usr/local/kafka_2.13-2.7.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
同理也可以可以拓展更多broker节点
三、Kafka数据迁移
集群扩容后数据是不会自动均衡到新机器上的,需要采用kafka-reassign-partitions.sh这个工具脚本。脚本可以工作在三种模式--generate,--execute,--verify
因服务器数量有限,本文以 某些topic的所在的broker节点为1,迁移到3个broker节点为例进行演示。如果是更多台,也是类似使用方法
1、查看主题列表
cd $KAFKA_HOME
bin/kafka-topics.sh --list --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181
2、创建Topic
bin/kafka-topics.sh --create --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --replication-factor 2 --partitions 2 --topic test_kafka
副本为2,分区为2
3、查看Topic详细信息
bin/kafka-topics.sh --describe --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic test_kafka
注意看参数 partitions为2
4、生成需要迁移的json
vi /tmp/topic-to-move.json
文件内容为需要迁移的topic,若有多个则填入多个:
{"topics": [
{"topic": "test_topic_001"}
],
"version":1
}
多个则为:
{"topics": [
{"topic": "test_topic_001"},
{"topic": "test_topic_002"},
{"topic": "test_topic_003"}
],
"version":1
}
5、生成迁移计划
例如迁移后的正常点的brokerid有1001, 1002, 1003, 1004, 1005, 1006
bin/kafka-reassign-partitions.sh --bootstrap-server 10-1-200-11:9092,10-1-200-15:9092 --topics-to-move-json-file /tmp/topic-to-move.json --broker-list "1001,1002,1003,1004,1005,1006" --generate
vi /tmp/expand-cluster-reassignment.json
复制自动推荐的"Proposed partition reassignment configuration"迁移计划到/tmp/expand-cluster-reassignment.json文件中。
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1001],"log_dirs":["any"]},{"topic":"test_topic_001","partition":0,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"test_topic_001","partition":1,"replicas":[1003,1004],"log_dirs":["any","any"]},{"topic":"test_topic_001","partition":2,"replicas":[1004,1001],"log_dirs":["any","any"]},{"topic":"test_topic_001","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"test_topic_001","partition":4,"replicas":[1002,1004],"log_dirs":["any","any"]}]}
vi /tmp/cur-cluster-assignment.json
备份当前的数据分布即“Current partition replica assignment”,以便有问题回滚
6、执行迁移计划
bin/kafka-reassign-partitions.sh --bootstrap-server 10-1-200-11:9092,10-1-200-15:9092 --reassignment-json-file /tmp/expand-cluster-reassignment.json --execute
7、查看迁移计划
bin/kafka-reassign-partitions.sh --bootstrap-server 10-1-200-11:9092,10-1-200-15:9092 --reassignment-json-file /tmp/expand-cluster-reassignment.json --verify
8、确认topic数据分布
查看kafka-manager UI