【Kafka】Kafka的Broker概述

news2025/1/9 5:03:09

【Kafka】Kafka的Broker概述

文章目录

  • 【Kafka】Kafka的Broker概述
    • 1. Broker的工作流程
      • 1.1 Zookeeper存储的Kafka信息
      • 1.2 Broker 总体工作流程
      • 1.3 Broker重要参数
    • 2. 节点服役和退役
      • 2.1 服役新节点
      • 2.2 退役旧节点
    • 3. Kafka副本
      • 3.1 副本信息
      • 3.2 Leader选举流程
      • 3.3 Leader 和 Follower 故障处理细节
      • 3.4 分区副本分配
      • 3.5 手动调整分区副本存储
      • 3.6 Leader Partition 负载均衡

1. Broker的工作流程

1.1 Zookeeper存储的Kafka信息

image-20230708163601517

1.2 Broker 总体工作流程

image-20230708163826939


1.3 Broker重要参数

参数名称描述
replica.lag.time.max.msISR 中,如果 Follower 长时间未向 Leader 发送通 信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s。
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes默认 4kb,kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引。
log.retention.hoursKafka 中数据保存的时间,默认 7 天
log.retention.minutesKafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.msKafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是 5 分钟
log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总 大小,删除最早的 segment。
log.cleanup.policy默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策 略。
num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占 总核数的 50%。
num.replica.fetchers副本拉取线程数,这个参数占总核数的 50%的 1/3
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最 大值,9223372036854775807。一般不建议修改, 交给系统自己管理。
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建 议修改,交给系统自己管理。

2. 节点服役和退役

2.1 服役新节点

  1. 克隆集群中任意一个节点,以node3节点为例,关闭node3节点,克隆该节点为node4
  2. 开启node4,修改ip地址,修改主机名为node4
  3. 重启node3和node4
  4. 修改node4中的kafka的broker.id为3
  5. 删除node4中kafka目录下的data目录和logs目录
  6. 启动node1,node2,node3中的kafka集群
  7. 单独启动node4中的kafka,这样就完成了新节点的服役。

但是我们发现,新节点服役之后,之前创建的topic还是只存在之前的节点中,于是我们创建一个要均衡的主题。

  1. 创建一个要均衡的主题。
[atguigu@hadoop102 kafka]$ vim topics-to-move.json

//粘贴如下内容
{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

  1. 生成一个负载均衡的计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

Current partition replica assignment
{"version":1,
 "partitions":[
     {"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},
     {"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},
     {"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}
 ]
}
                            
Proposed partition reassignment configuration
{"version":1,
 "partitions":[
    {"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}
    ]
}
  1. 创建副本存储计划(所有副本存储在broker0、broker1、broker2、broker3 中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

//粘贴如下内容
{"version":1,"partitions":[
    {"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}
    ]
}
  1. 执行副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  1. 验证副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

2.2 退役旧节点

退役node4节点,退役旧节点操作如下:

  1. 创建一个要均衡的主题
[atguigu@hadoop102 kafka]$ vim topics-to-move.json

{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

  1. 创建执行计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

Current partition replica assignment
{"version":1,"partitions":[
    {"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}
    ]
}
                            
Proposed partition reassignment configuration
{"version":1,"partitions":[
    {"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}
    ]
}
  1. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

{"version":1,"partitions":[
    {"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
    {"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}
    ]
}
  1. 执行副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  1. 验证副本存储计划
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
  1. 在退役节点上执行停止命令
bin/kafka-server-stop.sh 

3. Kafka副本

3.1 副本信息

  • Kafka副本作用:提高数据可靠性
  • Kafka默认副本1个,生产环境一般配置2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka中副本分为:Leader 和 Follower 。Kafka生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  • Kafka分区中所有副本统称为 AR(Assigned Replicas)
    • AR=ISR+OSR
    • ISR:表示和Leader保持同步的Follower集合,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader
    • OSR:表示Follower与Leader副本同步时,延迟过多的副本。

3.2 Leader选举流程

Kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作。

Controller的信息同步工作是依赖于Zookeeper的。

Leader选举流程如下:

  1. broker启动后依次在ZK中注册。
  2. 由第一个注册的broker中的Controller监听brokers节点的变化
  3. 由第一个Controller决定Leader的选举
    • 选举规则:在isr中存活作为前提,按照AR中排在前面的优先。例如ar[1,0,2],isr[1,0,2],那么leader就会按照1,0,2的顺序轮询。
  4. Controller将节点信息上传到ZK,其他broker的Controller从ZK中同步相关信息
  5. 假设Broker1中的Leader挂了,由于第一个Controller一直监听brokers节点的变化,于是拉取ISR进行新的Leader选举

image-20230708221527198


3.3 Leader 和 Follower 故障处理细节

  • LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
  • HW(High Watermark):所有副本中最小的LEO

image-20230708230138764

Follower故障

  1. Follower发生故障后会被临时提出ISR
  2. 这个期间Leader和Follower继续接收数据
  3. 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向Leader进行同步
  4. 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

Leader故障

  1. Leader发生故障之后,会从ISR中选出一个新的Leader
  2. 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。


3.4 分区副本分配

如果kafka服务器只有4个节点,那么设置kafka的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

  1. 创建16个分区,3个副本
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second
  1. 查看分区和副本情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second
Topic: second4 Partition: 0  Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1  Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2  Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3  Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

Topic: second4 Partition: 4  Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5  Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6  Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7  Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: second4 Partition: 8  Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9  Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

image-20230708233201671

这样分配的目的是为了负载均衡,让每个节点均匀分配副本


3.5 手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所以需要手动调整分区副本的存储。

需求:创建一个新的topic,4个分区,2个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

image-20230708235131193

手动调整分区副本存储的步骤如下:

  1. 创建一个新的topic,名称为three
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
  1. 查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
  1. 创建副本存储计划(所有副本都指定存储在broker0,broker1中)
vim increase-replication-factor.json

//粘贴如下内容
{
	"version":1,
	"partitions":[
					{"topic":"three","partition":0,"replicas":[0,1]},
					{"topic":"three","partition":1,"replicas":[0,1]},
					{"topic":"three","partition":2,"replicas":[1,0]},
					{"topic":"three","partition":3,"replicas":[1,0]}
				]
}
  1. 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  1. 验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
  1. 查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

3.6 Leader Partition 负载均衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于几种在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡

image-20230709000709353

  • auto.leader.rebalance.enable默认是true。自动Leader Partition平衡。
  • leader.imbalance.per.broker.percentage默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认值300s,检查leader负载是否平衡的间隔时间。

假设集群只有一个主题如下图所示:

image-20230709001027408

针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要重平衡。

broker2、broker3节点和broker0不平衡率一样,需要再平衡。Broker1的不平衡数为0,不需要再平衡。

参数名称描述
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。生产环 境中,leader 重选举的代价比较大,可能会带来 性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/733653.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【成都】EFDC建模方法、SWAT模型高阶研修

EFDC建模方法及在地表水环境评价、水源地划分、排污口论证应用 为了定量地描述地表水环境质量与污染排放之间的动态关系,EFDC、MIKE、Delft3D、Qual2K等数值模型被广泛应用在环境、水务、海洋等多个领域。Environmental Fluid Dynamics Code(EFDC&#…

[NISACTF 2022]checkin

[NISACTF 2022]checkin 直接给了源码,乍一看非常的简单,但是这题有坑。其实看注释颜色不一样,也能发现不对劲了。 贴一个payload,?ahahahahajitanglailo&%E2%80%AE%E2%81%A6Ugeiwo%E2%81%A9%E2%81%A6cuishiyuan%E2%80%AE%E2…

ARM异常处理详解

前言: 学习一门处理器最重要的就是掌握该处理器的指令集和异常处理。 异常概念: 处理器在正常执行程序时可能会遇到一些不正常的事件发生,这时处理器就要将当前的程序暂停下来转去处理这个异常的事件,异常处理后再返回到被异常打…

需求分析引言:架构漫谈(五)架构师成长之路

我研发领域也从事了一些年,期间也做过一些架构设计工作,包括C#单体转型为Java微服务、Python单体转型为Java微服务等, 也尝试着从自己的经验角度,来汇总一些知识点,同时描述一下如何成长为一个合格的软件架构师&#x…

权限管理系统后端实现1-SpringSecurity执行原理概述

spring security的简单原理: SpringSecurity有很多很多的拦截器,在执行流程里面主要有两个核心的拦截器 1,登陆验证拦截器AuthenticationProcessingFilter 2,资源管理拦截器AbstractSecurityInterceptor 但拦截器里面的实现需要…

IDEA+Spring Boot + MyBatis + Layui+Mysql垃圾回收管理系统源码

IDEASpring Boot MyBatis LayuiMysql垃圾回收管理系统源码 一、系统介绍1.环境配置 二、系统展示1. 管理员登录2.垃圾回收管理3.添加需要回收的垃圾4.垃圾去向管理5.申请需要打包运出的垃圾6.系统公告管理7.个人信息管理8.修改密码 三、部分代码UserMapper.javaUserControlle…

Python的网络爬虫框架-网络爬虫常用框架

Python的网络爬虫框架-网络爬虫常用框架 一、前言二、引言三、Scrapy 爬虫框架四、Crawley 爬虫框架五、PySpider 爬虫框架 一、前言 个人主页: ζ小菜鸡大家好我是ζ小菜鸡,让我们一起来了解Python的网络爬虫框架-网络爬虫常用框架如果文章对你有帮助、欢迎关注、点…

Redis缓存同步1-策略介绍

缓存数据同步策略示意图 在大多数情况下,我们通过浏览器查询到的数据都是缓存数据,如果缓存数据与数据库的数据存在较大差异的话,可能会产生比较严重的后果的。所以,我们应该也必须保证数据库数据、缓存数据的一致性,…

基于simulink使用颜色识别来进行道路跟踪(附源码)

一、前言 此示例演示如何使用颜色信息来检测和跟踪在可能不存在车道标记的主要住宅环境中设置的道路边缘。基于颜色的跟踪示例说明了如何使用色彩空间转换块、霍夫变换块和卡尔曼滤波器块来检测和跟踪使用色调和饱和度的信息。 二、模型 下图显示了基于颜色的道路跟踪模型&a…

MATLAB的num2str,把循环变量作为字符串的内容

MATLAB的num2str,把循环变量作为字符串的内容 输入代码: i 2; abc [sdfg,num2str(i),dsfg]运行结果: 解析: MATLAB里面的[ ]是会把元素组合的意思 现在有: a1 3; a2 4; a3 5; 然后我想通过for循环,循…

mac与pd虚拟机之间不能粘贴文字或粘贴文件

首先确保共享打开: 然后检查虚拟机的Parallels Tools是否正常 一个简单的判断方式就是,退出虚拟机全屏之后,如果能够正常进入融合模式,那么Parallels Tools可用,否则就要排查问题 检查Parallels Tools是否随系统正常启…

C++11 | 智能指针

智能指针 前面的文章中我们介绍了C中的异常有关的知识点,同时在其中我们遇到了有关内存方面的问题,如下所示: int div() {int a, b;cin >> a >> b;if (b 0)throw invalid_argument("除0错误");return a / b; } void…

数据结构算法题——链表

leetcode-2.两数之和 leetcode-2.两数之和 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。 你可以假设除了数…

猿人学做题笔记

简单记录一下做题的思路步骤 1、第一题说的是无混淆加密(简单): 刚开始观察请求,发现链接和请求携带的参数都没有什么异常,然后直接请求会拿不到数据,于是仔细看了一下请求包,发现请求头里面有…

03-Vue基础语法之指令语法与条件渲染

个人名片: 😊作者简介:一名大一在校生,web前端开发专业 🤡 个人主页:python学不会123 🐼座右铭:懒惰受到的惩罚不仅仅是自己的失败,还有别人的成功。 🎅**学习…

【GaussDB(DWS)】数据分布式存储-三种类型的表

toc 一、环境说明 华为数据仓库服务DWS,集群版本8.1.3.320集群拓扑结构: 二、数据分布式方式 DWS采用水平分表的方式,将业务数据表的元组打散存储到各个节点内。这样带来的好处在于,查询中通过查询条件过滤不必要的数据&#…

[工业互联-18]:常见EtherCAT主站方案:SOEM的Windows/Linux解决方案

目录 第1章 SOEM 简介 第2章 SOEM创建EtherCAT主站 2.1 支持Linux和Windows操作系统 2.2 SOEM创建EtherCAT主站的步骤 第3章 QT添加SOEM主站 第1章 SOEM 简介 SOEM (Simple Open EtherCAT Master) 是一种开源的EtherCAT主站协议栈。 EtherCAT(Ethernet for C…

飞轮储能系统的建模与MATLAB仿真(永磁同步电机作为飞轮驱动电机)

简介 飞轮储能系统由于其高储能密度、高效率、轻污染的优点而越来越受到重视。飞轮储能系统以高速旋转的飞轮为依托,通过电力电子设备实现电能与动能的相互转化,从而在负载调峰、功率平抑、不间断电源等多领域都有很好的应用表现。 本文选用永磁同步电机…

基于simulink仿真车道偏离警告系统(附源码)

一、前言 此示例演示如何在视频序列中检测和跟踪道路车道标记,并在驾驶员穿过车道时通知驾驶员。该示例说明了如何使用霍夫变换、霍夫线和卡尔曼滤波器模块来创建线检测和跟踪算法。该示例使用以下步骤实现此算法:1) 检测当前视频帧中的车道…

计算机通信地址【图解TCP/IP(笔记六)】

文章目录 地址地址的唯一性地址的层次性 地址 通信传输中,发送端和接收端可以被视为通信主体。它们都能由一个所谓“地址”的信息加以标识出来。当人们使用电话时,电话号码就相当于“地址”。当人们选择写信时,通信地址加上姓名就相当于“地…