文章目录
- 1. Kafka Broker 工作流程
- 2. Kafka 节点服役
- 1. 增加一个Kafka节点
- 2. 执行负载均衡操作
- 3. Kafka 节点退役
1. Kafka Broker 工作流程
Kafka上下线时Zookeeper中的数据变化:
[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, kafka_cluster]
[zk: localhost:2181(CONNECTED) 10] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 11] ls /kafka_cluster/brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 12] get /kafka_cluster/controller
{"version":1,"brokerid":0,"timestamp":"1669535410717"}
[zk: localhost:2181(CONNECTED) 13] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}
停止 kafka-01的 kafka节点,执行上面的3个步骤:
[zk: localhost:2181(CONNECTED) 14] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 15] ls /kafka_cluster/brokers/ids
[1, 2]
[zk: localhost:2181(CONNECTED) 16] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}
[zk: localhost:2181(CONNECTED) 17] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":2,"version":1,"leader_epoch":1,"isr":[2,1]}
启动 kafka-01的 kafka节点,执行上面的3个步骤:
[zk: localhost:2181(CONNECTED) 18] ls /kafka_cluster/brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 19] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}
[zk: localhost:2181(CONNECTED) 20] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":10,"leader":0,"version":1,"leader_epoch":2,"isr":[2,1,0]}
2. Kafka 节点服役
1. 增加一个Kafka节点
① 将hadoop-103虚拟机关机,右键-管理-克隆一个新的节点,节点名称 hadoop-104。
② 修改hadoop-104的ip地址:vi /etc/sysconfig/network-scripts/ifcfg-ens33
IPADDR=192.168.38.26
③ 修改 hostname 主机名称:vi /etc/hostname
hadoop104
④ 将 hostname 和 ip 绑定:vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.38.23 hadoop101
192.168.38.24 hadoop102
192.168.38.25 hadoop103
192.168.38.26 hadoop104
⑤ 修改kafka的 server.properties 配置文件,主要关注以下几个配置:
broker.id=3
listeners=PLAINTEXT://192.168.38.26:9092
advertised.listeners=PLAINTEXT://192.168.38.26:9092
# 使用3台zookeeper集群即可
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster
⑥ 删除/opt/kafka/kafka_2.12-2.2.1安装目录下之前创建的log目录,并新建该目录:
[root@hadoop104 kafka_2.12-2.2.1]# rm -rf logs/
[root@hadoop104 kafka_2.12-2.2.1]# mkdir logs
⑦ 启动 hadoop-104节点的 kafka:
[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server.properties
2. 执行负载均衡操作
① 我们创建过主题test2,查看下它的主题详情:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
Topic: test2 Partition: 1 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2
Topic: test2 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
可以看到以前创建的 test2 主题,仍然存储在 hadoop101、hadoop102、hadoop103 上,并没有存储在hadoop104 这个新的节点上,只有新创建的主题才会在 hadoop104 节点上,如何解决?
② 创建一个要均衡的主题:在集群中会有很多的主题,要指定对哪一个主题的存储数据进行负载均衡
[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{
"topics": [
{"topic": "test2"}
],
"version": 1
}
如果有多个 topic 就添加在 topics 列表中。
③ 创建执行计划:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Missing required argument "[zookeeper]"
上面的命令报错缺少zookeeper参数,加上zookeeper连接集群地址,继续执行:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
Current partition replica assignment:当前分区副本分配
Proposed partition reassignment configuration:建议的分区重新分配配置,这个就是副本执行计划文件内容
④ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):
[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
⑤ 执行副本执行计划:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
⑥ 验证副本存储计划:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test2 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test2 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 0,2,3
3. Kafka 节点退役
现在有 hadoop101、hadoop102、hadoop103、hadoop104 四个节点,将 hadoop104 从集群节点中退出,不能直接将 hadoop105 节点关闭,因为test2主题的数据已经有一部分存在 hadoop105 节点上了,那么如何退役呢?
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
① 创建一个要均衡的主题:
[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{
"topics": [
{"topic": "test2"}
],
"version": 1
}
② 创建执行计划:节点服役时配置–broker-list “0,1,2,3”,退出服役时只需要配置–broker-list “0,1,2” 接口,把3删除,那么 test2 主题的数据就不会存在 broker.id=3 的 hadoop105 节点上了。
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
③ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):
[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}
④ 执行副本存储计划:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
⑤ 验证副本存储计划:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test2 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 1,2,0
Topic: test2 Partition: 1 Leader: 1 Replicas: 0,2,1 Isr: 1,2,0
Topic: test2 Partition: 2 Leader: 2 Replicas: 1,0,2 Isr: 2,1,0
⑥ 执行停止命令,将 hadoop105 节点退役,在 hadoop105 上执行停止命令即可
[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-stop.sh