Kafka - 08 Kafka Broker工作流程 | 节点服役 | 节点退役

news2025/1/18 8:46:38

文章目录

      • 1. Kafka Broker 工作流程
      • 2. Kafka 节点服役
        • 1. 增加一个Kafka节点
        • 2. 执行负载均衡操作
      • 3. Kafka 节点退役

1. Kafka Broker 工作流程

在这里插入图片描述

Kafka上下线时Zookeeper中的数据变化:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, kafka_cluster]

[zk: localhost:2181(CONNECTED) 10] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

[zk: localhost:2181(CONNECTED) 11] ls /kafka_cluster/brokers/ids
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 12] get /kafka_cluster/controller
{"version":1,"brokerid":0,"timestamp":"1669535410717"}

[zk: localhost:2181(CONNECTED) 13] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}

停止 kafka-01的 kafka节点,执行上面的3个步骤:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 14] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

[zk: localhost:2181(CONNECTED) 15] ls /kafka_cluster/brokers/ids
[1, 2]

[zk: localhost:2181(CONNECTED) 16] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}

[zk: localhost:2181(CONNECTED) 17] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":2,"version":1,"leader_epoch":1,"isr":[2,1]}

启动 kafka-01的 kafka节点,执行上面的3个步骤:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 18] ls /kafka_cluster/brokers/ids
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 19] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}

[zk: localhost:2181(CONNECTED) 20] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":10,"leader":0,"version":1,"leader_epoch":2,"isr":[2,1,0]}

2. Kafka 节点服役

1. 增加一个Kafka节点

① 将hadoop-103虚拟机关机,右键-管理-克隆一个新的节点,节点名称 hadoop-104。

② 修改hadoop-104的ip地址:vi /etc/sysconfig/network-scripts/ifcfg-ens33

IPADDR=192.168.38.26

③ 修改 hostname 主机名称:vi /etc/hostname

hadoop104

④ 将 hostname 和 ip 绑定:vi /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.38.23 hadoop101
192.168.38.24 hadoop102
192.168.38.25 hadoop103
192.168.38.26 hadoop104

⑤ 修改kafka的 server.properties 配置文件,主要关注以下几个配置:

broker.id=3
listeners=PLAINTEXT://192.168.38.26:9092
advertised.listeners=PLAINTEXT://192.168.38.26:9092
# 使用3台zookeeper集群即可 
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster

⑥ 删除/opt/kafka/kafka_2.12-2.2.1安装目录下之前创建的log目录,并新建该目录:

[root@hadoop104 kafka_2.12-2.2.1]# rm -rf logs/
[root@hadoop104 kafka_2.12-2.2.1]# mkdir logs

⑦ 启动 hadoop-104节点的 kafka:

[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server.properties

2. 执行负载均衡操作

① 我们创建过主题test2,查看下它的主题详情:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2     PartitionCount:3   ReplicationFactor:3   Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 1,0,2
Topic: test2    Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
Topic: test2    Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到以前创建的 test2 主题,仍然存储在 hadoop101、hadoop102、hadoop103 上,并没有存储在hadoop104 这个新的节点上,只有新创建的主题才会在 hadoop104 节点上,如何解决?

② 创建一个要均衡的主题:在集群中会有很多的主题,要指定对哪一个主题的存储数据进行负载均衡

[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{
    "topics": [
        {"topic": "test2"}
    ],
    "version": 1
}

如果有多个 topic 就添加在 topics 列表中。

③ 创建执行计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Missing required argument "[zookeeper]"

上面的命令报错缺少zookeeper参数,加上zookeeper连接集群地址,继续执行:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

Current partition replica assignment:当前分区副本分配

Proposed partition reassignment configuration:建议的分区重新分配配置,这个就是副本执行计划文件内容

④ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

⑤ 执行副本执行计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

⑥ 验证副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster  --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2  PartitionCount:3   ReplicationFactor:3    Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 1,0,2
Topic: test2    Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test2    Partition: 2    Leader: 2       Replicas: 2,3,0 Isr: 0,2,3

3. Kafka 节点退役

现在有 hadoop101、hadoop102、hadoop103、hadoop104 四个节点,将 hadoop104 从集群节点中退出,不能直接将 hadoop105 节点关闭,因为test2主题的数据已经有一部分存在 hadoop105 节点上了,那么如何退役呢?

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

① 创建一个要均衡的主题:

[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{
    "topics": [
        {"topic": "test2"}
    ],
    "version": 1
}

② 创建执行计划:节点服役时配置–broker-list “0,1,2,3”,退出服役时只需要配置–broker-list “0,1,2” 接口,把3删除,那么 test2 主题的数据就不会存在 broker.id=3 的 hadoop105 节点上了。

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

③ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

④ 执行副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

⑤ 验证副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2  PartitionCount:3   ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2,0
Topic: test2    Partition: 1    Leader: 1       Replicas: 0,2,1 Isr: 1,2,0
Topic: test2    Partition: 2    Leader: 2       Replicas: 1,0,2 Isr: 2,1,0

⑥ 执行停止命令,将 hadoop105 节点退役,在 hadoop105 上执行停止命令即可

[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-stop.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用nw.js快速开发一个基于浏览器的小型桌面端(适用于高校学生实验作业)

首先讲下退坑事项,节约读者时间 生成的exe会依赖SDK文件夹下的一些dll,所以不能简单的交付这个exe,需要使用额外的软件进行打包,如Enigma Virtual Box、inno setup自定义可执行文件的icon也要额外软件,如Resource Hac…

SCDM 实例教程:基本几何建模

作者 | 张杨 ANSYS SpaceClaim Direct Modeler(简称 SCDM)是基于直接建模思想的新一代3D建模和几何处理软件。SCDM可以显著地缩短产品设计周期,大幅提升CAE分析的模型处理质量和效率,为用户带来全新的产品设计体验。 本文将主要…

常用 CMD 命令

前言 作为一个程序员,可能更多的是在 Linux 中使用命令来操作。但在日常使用 Windows 的过程中,或多或少会使用到命令提示符窗口,也就是 Windows 中的 CMD。这个时候,掌握一些常用的命令就尤为重要了,一方面方便自己使…

排序-指标解读

一、ROC ROC曲线全称是(receiver operating characteristic cure)受试者工作特征曲线。 首先大家看到这里肯定会好奇,为啥名字这么奇怪,来一波背景介绍先。 “ROC起先应用于军事领域,据说在第二次世界大战期间,ROC 曲线最先是由…

几分钟快速学会Linux开启启动服务

背景 最近在银行遇到一个部署问题,uat、prod 两个环境的ECS中的服务要求制作好基础镜像,上环境的时候只需要在对应的ECS中选择更换系统即可,不允许传统连接SSH上去安装,这就要求我们就得提前把需要运行的服务内置到系统中&#x…

债券数据集:绿色债券数据集、历时新发、发行债券、DCM定价估值四大指标数据

1、绿色债券数据集 1、数据来源:wind 2、时间跨度:2016.01-2021.11年 3、区域范围:全国 4、指标说明: 部分指标如下: 数据截图如下: 2、历史新发债券数据库 1、数据来源:wind 2、时间跨度…

领悟《信号与系统》之 傅立叶变换的性质与应用

傅立叶变换的性质与应用一、傅里叶变换性质表二、傅里叶性质详细1. 线性性质2. 尺度变换特性3. 时移特性4. 频移特性5. 时域微分特性6. 频域微分特性7. 时域积分特性8. 频域积分特性9. 卷积定理1. 时域卷积定理2. 频域卷积定理10. 对称性11. 帕塞瓦尔定理依据傅里叶变换对概念&…

xxl-job 快速使用

xxl-job 快速使用xxl-job 简单使用注意事项执行xxl-job 简单使用 xxl-job 官方使用说明 XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 注意事项 详细可…

【Canvas】js用Canvas绘制阴阳太极图动画效果

学习JavaScript是否兴趣缺缺,那就需要来一个兴趣学习,问一下有没有兴趣用Canvas画图呢,可以画很多有趣的事物,自由发挥想象,收获多多哦,这里有一个例子,如何用canvas画阴阳太极图动图效果&#…

王道考研——操作系统(第二章 进程管理)(死锁)

一、死锁的概念 什么是死锁 死锁、饥饿、死循环的区别 死锁产生的必要条件 什么时候会发生死锁 死锁的处理策略 知识回顾与重要考点 二、死锁的处理策略——预防死锁 知识总览 破坏互斥条件 破坏不剥夺条件 破坏请求和保持条件 破坏循环等待条件 知识回顾与重要考点 与前面哲…

分省/市/县最低工资标准(2012-2021年)和 全国/省/市/县GDP数据(1949-2020年)

一、最低工资数据 1、数据来源:各省\市\县政府公布资料 2、时间跨度:2012-2021年 3、区域范围:全国各省\市\县 4、指标说明: 部分数据如下: 二、各省市县人均GDP 1、数据来源:地方统计局 2、时间跨度…

常用PC,移动浏览器User-Agent大全

常用PC,移动浏览器User-Agent大全,提供PC浏览器user-agent,Android手机浏览器user-agent大全 PC端User-Agent IE 9.0 IE 8.0 IE 7.0 IE 6.0 Firefox 4.0.1–MAC Firefox 4.0.1–Windows Opera 11.11–MAC Opera 11.11–Windows Chrome 17.0–MAC safari 5…

现在的湖仓一体像是个伪命题

文章目录开放的计算引擎SPL助力湖仓一体开放且完善的计算能力多数据源混合计算文件计算支持完善的计算能力直接访问源数据数据整理后的高性能计算SPL资料从一体机、超融合到云计算、HTAP,我们不断尝试将多种应用场景融合在一起并试图通过一种技术来解决一类问题&…

AWS Lambda从入门到精通

这里写自定义目录标题AWS Lambda从入门到精通介绍调用AWS Lambda的函数请求响应(RequestResponse)式调用采用请求响应的同步方式以函数作为应用程序的后端后端应用程序的用例和需求AWS Lambda从入门到精通 Amazon发布AWS Lambda服务,云计算的…

Windows配置python3环境变量解决无法识别pip指令报错

Python官网下载 Python官网下载地址 在Windows系统中,推荐下载.msi或.exe安装包。需要注意,python3和python2的语法有很大区别,按照实际情况下载对应版本。本次安装的是Python 3.10.8稳定版。 环境变量配置 安装过程 如果是以.exe安装包方…

机器学习中的数学基础(三):随机变量

机器学习中的数学基础(三):随机变量3 随机变量3.1 离散型随机变量3.2 连续型随机变量3.3 简单随机抽样3.4 似然函数3.5 极大似然估计在看西瓜书的时候有些地方的数学推导(尤其是概率论的似然、各种分布)让我很懵逼&…

【学习笔记46】JavaScript购物车的实现

一、案例效果 1、将通过数据重构页面 查询数据, 渲染页面 2、全选 选中全选按钮后, 根据全选按钮的选中状态, 修改所有商品的选中状态重新渲染视图 3、清空购物车 清空商品数据重新渲染视图 4、结算 找到所有选中的商品计算所有选中商品各自的总价计算所有选中商品的总价…

【MySQL】MVCC原理分析 + 源码解读 -- 必须说透

文章目录前言一、MVCC 介绍二、MySQL MVCC 介绍三、MySQL MVCC实现原理源码分析3.1 隐式字段源码验证3.2 undo logundo log格式undo log源码验证写insert undo log源码写update undo log源码写undo log源码roll_ptr是如何指向insert undo log的?roll_ptr是如何指向update undo…

Thymeleaf模板

Thymeleaf可用于前后端分离, 下图,value"aa", 在本地静态资源可以改变值,但是在web端不可以 前端可以在本地测试,有数据了显示数据 所以前后端分离 th属性 常用th属性解读html有的属性,Thymel…

集合框架----源码解读LinkedList篇

1.LinkedList官方介绍 双链表实现的list和Deque接口。实现所有可选的列表操作,并允许所有元素(包括null)。 所有的操作都按照双链表的预期执行。索引到列表中的操作将从列表的开始或结束遍历列表,以更接近指定索引的为准。 注意,这个实现不是…