「Kafka」Broker篇

news2025/1/16 1:47:30

「Kafka」Broker篇

主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。

Kafka Broker 总体工作流程

Zookeeper 存储的 Kafka 信息

  • 启动 Zookeeper 客户端:

    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
    
  • 通过 ls 命令可以查看 kafka 相关信息:

    [zk: localhost:2181(CONNECTED) 2] ls /kafka
    

    image-20240110143832837

image-20231229163857940

Kafka Broker 总体工作流程

image-20231229163930666

模拟 Kafka 上下线,Zookeeper 中数据变化:

  1. 查看 /kafka/brokers/ids 路径上的节点:

    image-20231229164453868

  2. 查看 /kafka/controller 路径上的数据:

    image-20231229164440799

  3. 查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据:

    image-20231229164521363

  4. 停止 hadoop104 上的 kafka: image-20240110142636596

  5. 再次查看 /kafka/brokers/ids 路径上的节点

    image-20240110142623644

  6. 再次查看 /kafka/controller 路径上的数据

    image-20240110142702080

  7. 再次查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据

    image-20240110142724094

  8. 启动 hadoop104 上的 kafka

    image-20240110142742279

  9. 再次观察 1、2、3 步骤中的内容。

Broker 重要参数

image-20231229164041782

image-20231229164056634

image-20231229164110883

生产经验—节点服役和退役

服役新节点

新节点准备

  1. 关闭 hadoop104,并右键执行克隆操作

  2. 开启 hadoop105,并修改 IP 地址

    image-20240110150510473

  3. 在 hadoop105 上,修改主机名称为 hadoop105

    image-20240110150536171

  4. 重新启动 hadoop104、hadoop105

  5. 修改 haodoop105 中 kafka 的 broker.id 为 3保证唯一

    [atguigu@hadoop105 config]$ vim server.properties
    

    image-20240110155843127

  6. 删除 hadoop105 中 kafka 下的 datas 和 logs

    [atguigu@hadoop105 kafka]$ rm -rf datas/* logs/*
    
  7. 启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群

    [atguigu@hadoop102 ~]$ zk.sh start
    [atguigu@hadoop102 ~]$ kf.sh start
    
  8. 单独启动 hadoop105 中的 kafka

    [atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
    

我们先来看一下 first 主题的信息:

image-20240110160935007

目前 first 主题的信息仍然只存在 broker0、1、2上,但 broker3 并没有帮助分担历史数据,所以我们需要负载均衡的操作。

执行负载均衡操作

  1. 创建一个要均衡的主题:

    [atguigu@hadoop102 kafka]$ vim topics-to-move.json
    
    {
    	"topics": [
    		{"topic": "first"}
    	],
    	"version": 1
    }
    
  2. 生成一个负载均衡的计划

    image-20240110160653990

  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

    [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
    

    输入以下内容(刚生成的计划):

    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
  4. 执行副本存储计划:

    image-20240110161449511

  5. 验证副本存储计划:

    image-20240110161658340

    image-20240110161609204

退役旧节点

执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡

把要退役节点的数据导入到其他节点上。

  1. 创建一个要均衡的主题

    [atguigu@hadoop102 kafka]$ vim topics-to-move.json
    
    {
    	"topics": [
    		{"topic": "first"}
    	],
        "version": 1
    }
    
  2. 创建执行计划

    image-20240110162052104

  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)

    [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
    
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
    
  4. 执行副本存储计划

    [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
    
  5. 验证副本存储计划

    [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server  hadoop102:9092  --reassignment-json-file increase-replication-factor.json --verify
    
    Status of partition reassignment:
    Reassignment of partition first-0 is complete.
    Reassignment of partition first-1 is complete.
    Reassignment of partition first-2 is complete.
    Clearing broker-level throttles on brokers 0,1,2,3
    Clearing topic-level throttles on topic first
    

    image-20240110162329053

执行停止命令

在 hadoop105 上执行停止命令即可:

[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh

Kafka 副本

副本基本信息

  • Kafka 副本作用:提高数据可靠性。

  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;

    • 太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 中副本分为:Leader 和 Follower。

    • Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

A R = I S R + O S R AR = ISR + OSR AR=ISR+OSR

I S R ISR ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

O S R OSR OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

image-20240110153554112

Leader 选举会按照 AR 的顺序进行选取,就是下图中的 Replicas 顺序:

image-20240110153908376

image-20240110153923789

image-20240110153939706

Leader 和 Follower 故障处理细节

Follower 故障处理细节

消费者可见的数据最大 offset 就是 4, H W − 1 HW - 1 HW1

该 Follower 先被踢出 ISR 队列,然后其余的 Leader、Follower继续接受数据。如果该 Follower 恢复了,会读取本地磁盘上次记录的 HW,并裁剪掉 高于 HW 的数据,从 HW 开始向 Leader 进行同步数据。

image-20240111145337546

待该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader,

image-20240111145207846

Leader 故障处理细节

broker0 一开始是 Leader,然后挂掉了,选举 broker1 为新的 Leader,然后其余的 Follower 会把各自 log 文件高于 HW 的部分裁剪掉,然后从新的 Leader 同步数据。

image-20240110154045978

分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?

创建 16 分区,3 个副本

  1. 创建一个新的 topic,名称为 second

    image-20240110154238901

  2. 查看分区和副本情况:

    image-20240110154302249

依次错开,让每一个副本负载均衡,均匀分配,也可以保证数据的可靠性。

image-20240110154314929

生产经验—手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求:创建一个新的topic,4个分区,两个副本,名称为 three。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。

image-20240110154440464

手动调整分区副本存储的步骤如下:

image-20240110154459796

image-20240110154534569

image-20240111164010567

生产经验—Leader Partition 负载平衡

image-20240110154628926

image-20240110154640522

真正生产环境建议关闭,或设置 percentage 为 20%、30%,不要频繁的触发自平衡,浪费集群大量性能。

生产经验—增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

  1. 创建 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
    
  2. 手动增加副本存储

    1. 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)

      [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
      

      输入如下内容:

      {"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
      
    2. 执行副本存储计划

      [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
      

文件存储

文件存储机制

Topic 数据的存储机制

image-20240116153508193

kafka 中默认数据保存 7 天,通过 .timeindex 文件判断日志保存多久,过期会定时清理对应的数据,详情参考下方的 - 文件清理策略。

思考:Topic 数据到底存储在什么位置?

image-20240111204450098
在这里插入图片描述
image-20240111204513026

index 文件和 log 文件详解

image-20240111204551632

说明:日志存储参数配置

image-20240111204615225

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

  • log.retention.hours,最低优先级,小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级,毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 deletecompact 两种。

1)delete 日志删除:将过期数据删除
  • log.cleanup.policy = delete 所有数据启用删除策略(默认)

    1. 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

    2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

      log.retention.bytes,默认等于 -1,表示无穷大,其实就是关闭掉了。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

image-20240116154803754

以 segment 中所有记录中的最大时间戳作为该文件时间戳,进行删除。

也就是只要这个 segment 中有数据还未过期,就不进行删除操作。

2)compact 日志压缩

image-20240116154918725

高效读写数据

分布式集群

Kafka 本身是分布式集群,可以采用分区技术,并行度高。

稀疏索引

读数据采用稀疏索引,可以快速定位要消费的数据。

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

image-20240116160711136

页缓存 + 零拷贝技术

image-20240116160741389

image-20240116160751069

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1398908.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JavaEE】_网络编程基础

目录 1. 网络编程基础 1.1 网络编程定义 1.2 网络编程中的基本概念 1.2.1 API 1.2.2.发送端和接收端 1.2.3 请求和响应 1.2.4 客户端和服务端 2. Socket 套接字 2.1 概念 2.2 分类 3. UDP数据报套接字编程 3.1 DatagramSocket API 3.1.1 含义 3.1.2 构造方法 3…

全景摄像机行业分析:市场规模不可限量

早期的全景相机行业竞争格局较为多元。近年来随着行业技术不断成熟,市场的竞争格局由多家参与逐步向头部企业聚拢,国内企业凭借图像处理技术优势在全景相机行业中逐步抢占市场份额。 全景摄像机,是可以独立实现大范围无死角监控的摄像机。 一…

OpenVINS学习7——评估工具的简单使用

前言 OpenVINS自带评估工具,这里记录一下使用方法,我是以VIRAL数据集为例,但是目前仍然有问题,发现误差很大,我还没搞明白哪里出了问题。 工具介绍 主要参考 https://docs.openvins.com/eval-error.html https://bl…

ELK 日志分析系统

目录 一、日志管理方案 二、完整日志系统基本特征 三、ELK 简介 ELK组件: 1、ElasticSearch 2、Logstash 3、Kibana 可以添加的其它组件: 1、Filebeat 2、缓存/消息队列(redis、kafka、RabbitMQ等) 3、Fluentd 三、ELK …

作业-数组计数法

目录 数字出现次数 题目描述 输入 输出 输入复制 输出复制 求n个数中每个数出现的次数 题目描述 输入 输出 输入复制 输出复制 声音识别 题目描述 输入 输出 输入复制 输出复制 选班委 题目描述 输入 输出 输入复制 输出复制 数字出现次数 题目描述 …

解析智能酒精壁炉不完全燃烧的成因及潜在问题

解析智能酒精壁炉不完全燃烧的成因及潜在问题 智能酒精壁炉作为一种环保、高效、现代化的取暖工具,其采用酒精作为燃料进行燃烧,但在一些情况下,可能会出现酒精燃烧不完全的问题。下面将深入探讨这一现象的成因以及可能引发的问题。 成因分析…

SpringSecurity Web 权限方案

目录 一、设置登录系统的账号、密码 二、数据库查询用户名密码 三、自定义登录页面 四、基于角色或权限进行访问控制 (一)hasAuthority 方法 (二)hasAnyAuthority 方法 (三)hasRole 方法 &#xff…

Java String基础学习

目录 1、String的构造方法 2、String内存模型 3、字符串的比较 4、字符串的练习 1、用户登录系统 2、遍历字符串 3、统计字符次数 4、拼接字符串 5、字符串的反转 6、金额转换 7、手机号屏蔽 * 8、身份证信息查看 9、敏感词替换 5、StringBuilder 1、概念及练习…

Java毕业设计-基于ssm的网上求职招聘管理系统-第85期

获取源码资料,请移步从戎源码网:从戎源码网_专业的计算机毕业设计网站 项目介绍 基于ssm的网上求职招聘管理系统:前端 jsp、jquery,后端 springmvc、spring、mybatis,角色分为管理员、招聘人员、用户;集成…

【GitHub项目推荐--AI杀入斗地主领域】【转载】

AlphaGo:第一个战胜围棋世界冠军的人工智能机器人。 我不会玩围棋,没办法和 AlphaGO 对局。但是我喜欢玩斗地主,有斗地主人工智能机器人吗? 有,而且还开源了。DouZero:快手团队开发的斗地主AI。别的不说&…

JAVAEE出街 网络编程(一)

网络编程 一. 网络编程二. 客户端与服务器2.1 一问一答2.2 一问多答2.3 多问一答2.4 多问多答 三. TCP与UDP的特点 一. 网络编程 网络编程本质上就是学习传输层给应用层提供的API,把数据交给传输层,通过一层层的封装将数据通过网卡传输出去。 二. 客户端…

LabVIEW电能质量监测系统

系统利用LabVIEW开发一个基于LabVIEW的电能质量监测系统,模拟并监测暂态电能质量扰动,如电压骤升、电压骤降、电压波动和暂态振荡等。系统的硬件部分包括高精度的振动传感器和信号调节设备,以及型号为NI9234的数据采集卡和高性能计算机。这些…

【JavaEE进阶】 SpringBoot配置⽂件

文章目录 🍀配置⽂件的作⽤🌴SpringBoot配置⽂件🎋配置⽂件的格式🎄properties配置⽂件🚩properties基本语法🚩读取配置⽂件🚩properties的缺点 🌳yml配置⽂件yml基本语法&#x1f6…

网络编程01 常见名词的一些解释

本文将讲解网络编程的一些常见名词以及含义 在这之前让我们先唠一唠网络的产生吧,其实网络的产生也拯救了全世界 网络发展史 网络的产生是在美苏争霸的期间,实际上双方都持有核武器,希望把对方搞垮的同时不希望自己和对方两败俱伤. 希望破坏对方的核武器发射,这就涉及到三个方面…

实现分布式锁

背景 分布式锁是一种用于协调分布式系统中多个节点之间并发访问共享资源的机制。在分布式系统中,由于存在多个节点同时访问共享资源的可能性,需要使用分布式锁来保证数据的一致性和正确性。 今天要实现的是分布式场景中的互斥类型的锁。 下面时分布…

免费使用IntelliJ IDEA的7种方式(2024 最新版)

大家好,我是小黑,今天要和大家分享的是如何免费使用 IntelliJ IDEA。我们都知道,作为一名程序员,拥有一个高效的开发工具是至关重要的。IntelliJ IDEA 无疑是市面上最受欢迎的开发工具之一。但是,获取授权的成本有时会…

MySQL 索引(下)

🎉欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克🍹 ✨博客主页:小小恶斯法克的博客 🎈该系列文章专栏:重拾MySQL-进阶篇 🍹文章作者技术和水平很有限,如果文中出现…

leetcode下一个更大的元素---1暴力---2单调栈

1.题目&#xff1a; nums1 中数字 x 的 下一个更大元素 是指 x 在 nums2 中对应位置 右侧 的 第一个 比 x 大的元素。 给你两个 没有重复元素 的数组 nums1 和 nums2 &#xff0c;下标从 0 开始计数&#xff0c;其中nums1 是 nums2 的子集。 对于每个 0 < i < nums1.l…

STL中的map

概述 std::map 是一个模板类&#xff0c;定义在头文件 <map> 中&#xff1a; template<class Key,class T,class Compare std::less<Key>,class Allocator std::allocator<std::pair<const Key, T>> > class map;std::map 是一种有序关联容器…

考研C语言刷编程题篇之分支循环结构基础篇(一)

目录 第一题 第二题 方法一&#xff1a;要循环两次&#xff0c;一次求阶乘&#xff0c;一次求和。 注意&#xff1a;在求和时&#xff0c;如果不将sum每次求和的初始值置为1&#xff0c;那么求和就会重复。 方法二&#xff1a; 第三题 方法一&#xff1a;用数组遍历的思想…