点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下的内容,基本都是特性概念相关的:
- 分区相关介绍
- 副本机制
- 同步节点
- 宕机恢复
- Leader选举 基础概念、选举过程、为何不少数服从多数
分区重分配
向已经部署好的Kafka集群里添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置文件,然后把里边的 BrokerID 修改为全局唯一的,最后启动这个节点即可让它加入到现有的Kafka集群中。
当前问题
新添加的Kafka节点并不会自动的分配数据,无法分担集群的负载,除非我们新建一个Topic。
在重新分布Topic分区之前,我们先来看看现在Topic的各个分区的分布位置。
启动服务
如果你的Kafka服务还未启动,需要先启动,再进行后续的测试实验。
我这里启动:
kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties
启动结果如下图:
创建主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 5 --replication-factor 1
我们的配置:
- 创建一个5个分区的主题
- Kafka此时的算法会保证所有分区都分配到现有的Kafka代理节点上
创建的结果如下:
查看主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_icu_test
创建的结果如下图,可以观察到5个分区。
新增Kafka
在新的机器上部署Kafka服务,记得修改BrokerID。
刚才我们是单节点的,Kafka在 h121 节点上。
# 配置内容参考 h121 中的配置
# 但是注意要修改 BrokerID
vim config/server.properties
- h121 broker 1
- h122 broker 2
- h123 broker 3 (暂时还不配置3节点)
此时我们来到 h122 用如下的命令启动Kafka,我启动的是临时的,如果你有需要,请用守护方式启动。
# 环境变量别忘了配置
kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties
启动过程如下图:
查看集群
# 先进入ZK 在ZK中进行查看
zkCli.sh
get /cluster/id
执行的过程是:
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] get /cluster/id
{"version":"1","id":"DGjwPmfLSk2OKosFFLZJpg"}
重新分区
我们使用Kafka自带的:kafka-reassign-partitions.sh 工具来重新发布分区,该工具有三种使用模式:
- generate模式,给定需要重新分配的的Topic,自动生成 reassign plan (不会自动执行)
- execute模式,根据指定的 reassign plan重新分配 Partition
- verify模式,验证重新分配Partition是否成功
生成JSON
vim wzk_icu_test_to_move.json
{
"topics": [
{
"topic": "wzk_icu_test"
}
],
"version": 1
}
当前结果如下:
执行如下的脚本,来对分区进行配置:
kafka-reassign-partitions.sh --zookeeper h121.wzk.icu:2181 --topics-to-move-json-file wzk_icu_test_to_move.json --broker-list "0,1" --generate
观察控制台的结果:
执行计划
Proposed Partition Reassignment Configuration 下面生成的就是将分区重新发布到 Broker 1上的结果,我们将这些内容保存到 result.json 中
vim result.json
{"version":1,"partitions":[{"topic":"wzk_icu_test","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"wzk_icu_test","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"wzk_icu_test","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"wzk_icu_test","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"wzk_icu_test","partition":4,"replicas":[1],"log_dirs":["any"]}]}
运行后的写入情况如下:
我们继续执行:
kafka-reassign-partitions.sh --zookeeper h121.wzk.icu:2181 --reassignment-json-file wzk_icu_test_to_move_result.json --execute
显示结果如下,已经完成分区:
校验结果
kafka-reassign-partitions.sh --zookeeper h121.wzk.icu:2181 --reassignment-json-file wzk_icu_test_to_move_result.json --verify
显示结果如下:
重新查看分区情况
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_icu_test
显示的内容如下:
可以看到我们已经顺利的完成了重新分区分配!