四十、大数据技术之Kafka3.x(3)

news2025/1/14 0:44:28

🌻🌻 目录

  • 一、Kafka Broker
    • 1.1 Kafka Broker工作流程
      • 1.1.1 Zookeeper 存储的Kafka信息
      • 1.1.2 Kafka Broker 总体工作流程
      • 1.1.3 Broker 重要参数
    • 1.2 生产经验——节点服役和退役
      • 1.2.1 服役新节点
      • 1.2.2 退役旧节点
    • 1.3 Kafka 副本
      • 1.3.1 副本基本信息
      • 1.3.2 Leader 选举流程
      • 1.3.3 Leader 和 Follower 故障处理细节
      • 1.3.4 分区副本分配
      • 1.3.5 生产经验——手动调整分区副本存储
      • 1.3.6 生产经验——Leader Partition负载平衡
      • 1.3.7 生产经验——增加副本因子
    • 1.4 文件存储
      • 1.4.1 文件存储机制
      • 1.4.2 文件清理策略
    • 1.5 高效读写数据

一、Kafka Broker

1.1 Kafka Broker工作流程

1.1.1 Zookeeper 存储的Kafka信息

(1)启动Zookeeper客户端。

cd /usr/local/zookeeper/bin/

ls

./zkCli.sh 

在这里插入图片描述

(2)通过ls命令可以查看kafka相关信息。

ls /

ls /kafka/

ls /kafka/brokers/

ls /kafka/brokers/ids

在这里插入图片描述

zookeerper 可视化工具 PrettyZoo:

  • 下载
  • 使用参考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1.1.2 Kafka Broker 总体工作流程

在这里插入图片描述

1)模拟Kafka上下线,Zookeeper中数据变化

(1)查看/kafka/brokers/ids路径上的节点。

./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

./kafka-server-stop.sh

在这里插入图片描述

在这里插入图片描述

1.1.3 Broker 重要参数

在这里插入图片描述

1.2 生产经验——节点服役和退役

1.2.1 服役新节点

1)新节点准备
(1)关闭linux-102(已经装了hadoop,jdk,zookeeper,kafka),并右键执行克隆操作,linux-103,linux-104(注:之前若有直接可以删掉重新克隆)
(2)开启linux-103,linux-104 并修改IP地址。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

④ 修改ip,将192.168.10.102修改为 192.168.10.103,保存,并重启网络

在这里插入图片描述

在这里插入图片描述

⑤ 修改 主机名

在这里插入图片描述
在这里插入图片描述

⑥ 重启进行远程连接

在这里插入图片描述

⑦ 删除日志,并分别修改linux-103与linux-102 的节点 id 和服务器 ip,并开启集群配置

在这里插入图片描述

在这里插入图片描述

开启集群配置(linux-102,linux-103,linux-104都需修改)

在这里插入图片描述

(3) 先启动linux-102,再启动linux-103,再次启动 linux-104(脚本启动后期更新)

cd /usr/local/zookeeper/bin/

./zkServer.sh start

./zkServer.sh status

cd /usr/local/kafka/bin

./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(4) 查看 linux-102 上有哪些主题,并分配了分区(生产环境建议至少分两个副区,如果不小心删除一个,则还有一个类似备份的)

./kafka-topics.sh --bootstrap-server linux-102:9092 --list

./kafka-topics.sh --bootstrap-server linux-102:9092 --topic first --describe

在这里插入图片描述

思考:如何将linux-102上面的一些分到linux-103达到负载均衡呢?看下面 2)

2)执行负载均衡操作

(1)创建一个要均衡的主题(在 linux-102 服务器的 kafka下面创建)。

cd /usr/local/kafka

vi topics-to-move.json

在这里插入图片描述

{
    "topics": [

        {"topic": "first"}

    ],

    "version": 1
}

在这里插入图片描述

(2)生成一个负载均衡的计划(在 linux-102 服务器的 kafka下面生成)。

# 0,1,2 代表三台服务器
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092  --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

遇到错误java.io.IOException: Unable to read file topics-to-move.json

在这里插入图片描述

解决:直接切换到kafka 根目录进行生成即可:

在这里插入图片描述

(3)创建副本存储计划(所有副本存储在broker2broker3broker4)。

vi increase-replication-factor.json

在这里插入图片描述

输入如下内容:

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}

在这里插入图片描述

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute

在这里插入图片描述

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify

在这里插入图片描述

上述操作都是在linux-102上操作的。

1.2.2 退役旧节点

1)执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡

(1)创建一个要均衡的主题。

与上面(1)创建一个要均衡的主题(在 linux-102 服务器的 kafka下面创建)。一样无需再创建。

(2)创建执行计划。

#上面是分配到了三台机器 2,3,4,现在移除 4即服务器 linux-104
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3" --generate

在这里插入图片描述

(3)创建副本存储计划(所有副本存储在broker2broker3)。

vi increase-replication-factor.json

在这里插入图片描述

删除前面的,将上面复制的粘贴里面保存即可。

在这里插入图片描述

粘贴上面复制的如下内容:

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute

在这里插入图片描述

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify

在这里插入图片描述
2)执行停止命令

linux-104上执行停止命令即可

./kafka-server-stop.sh

在这里插入图片描述

1.3 Kafka 副本

下面的所有后期会详细更新(可跳过直接看 👉👉大数据技术之Kafka3.x(4)

1.3.1 副本基本信息

  • (1)Kafka 副本作用:提高数据可靠性
  • (2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • (3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
    (4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。
    AR = ISR + OSR
    ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
    OSR,表示Follower与Leader副本同步时,延迟过多的副本。

1.3.2 Leader 选举流程

(跳过后期更新)

Kafka 集群中有一个brokerController会被选举为Controller Leader,负责管理集群 broker 的上下线,所有topic分区副本分配Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。

在这里插入图片描述

开启 linux-102,linux-103,linux-104,进行如下操作:

(1)创建一个新的topic,4个分区,4个副本(因为我是单节点,所以设置了 1)

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic Daniel1 --partitions 1 --replication-factor 1

在这里插入图片描述

(2)查看Leader分布情况

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic Daniel1

在这里插入图片描述

(3)停止掉linux-103的kafka进程,并查看Leader分区情况

在这里插入图片描述

(4)停止掉linux-104的kafka进程,并查看Leader分区情况
(5)启动linux-104的kafka进程,并查看Leader分区情况
(6)启动linux-104的kafka进程,并查看Leader分区情况
(7)停止掉linux-103的kafka进程,并查看Leader分区情况

1.3.3 Leader 和 Follower 故障处理细节

在这里插入图片描述

1.3.4 分区副本分配

(跳过后期更新)

如果kafka服务器只有4个节点,那么设置kafka的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

1)创建16分区,3个副本

(1)创建一个新的topic,名称为second。

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic second --partitions 16 --replication-factor 3

在这里插入图片描述

(2)查看分区和副本情况。

在这里插入图片描述

在这里插入图片描述

1.3.5 生产经验——手动调整分区副本存储

在这里插入图片描述

手动调整分区副本存储的步骤如下:

(1)创建一个新的topic,名称为three。

在这里插入图片描述

(2)查看分区副本存储情况。

在这里插入图片描述

(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
(4)执行副本存储计划。
(5)验证副本存储计划。
(6)查看分区副本存储情况。

1.3.6 生产经验——Leader Partition负载平衡

在这里插入图片描述

在这里插入图片描述

1.3.7 生产经验——增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

1)创建topic
2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
(2)执行副本存储计划。

1.4 文件存储

1.4.1 文件存储机制

1)Topic数据的存储机制

在这里插入图片描述

2)思考:Topic数据到底存储在什么位置?

(1)启动生产者,并发送消息。
(2)查看linux-102(或者linux-103、linux-104)的/usr/local/kafka/datas/first-1(first-0、first-2)路径上的文件。
(3)直接查看log日志,发现是乱码。
(4)通过工具查看index和log信息。
3)index文件和log文件详解

在这里插入图片描述

说明:日志存储参数配置

在这里插入图片描述

1.4.2 文件清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认7天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka中提供的日志清理策略有delete和compact两种。
1)delete日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略
    (1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
    (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
    log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?

在这里插入图片描述

2)compact日志压缩

在这里插入图片描述

1.5 高效读写数据

  • 1)Kafka本身是分布式集群,可以采用分区技术,并行度高
  • 2)读数据采用稀疏索引,可以快速定位要消费的数据
  • 3)顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

在这里插入图片描述

4)页缓存 + 零拷贝技术

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2035019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线程知识及编程

线程定义 在Python中,想要实现多任务还可以使用多线程来完成。 为什么使用多线程? 进程是分配资源的最小单位 , 一旦创建一个进程就会分配一定的资源 , 就像跟两个人聊QQ就需要打开两个QQ软件一样是比较浪费资源的 . 线程是程序执行的最小单位 , 实际…

企业应该如何准备 EcoVadis 审核?

企业准备 EcoVadis 审核可以参考以下步骤: 注册:在网上注册并提供公司的相关信息,包括法律实体名称、国家和地区、企业规模和行业等。如果是受客户邀请参加评估,需按照邀请邮件中的链接进行注册,并确保客户能随时获知评…

【ML】强化学习(Reinforcement Learning)及其拆解

【ML】强化学习(Reinforcement Learning) 1. RL Outline 强化学习(Reinforcement Learning)概述1.1 RL的基本框架 2. RL 引入:从这个小游戏开始3. Policy Gradient 方法4. Actor-Critic 方法5. [奖励塑形(R…

(第二十六天)

上午 1、web01与web02服务器搭建 ip:10.0.0.11 systemctl stop filewalld systemctl disable firewalld setenforce 0 vim /etc/selinux/config SELINUX disabled yum -y install nginx echo "web----------01" > /usr/share/nginx/html/index.h…

力扣热题100_二叉树_230_二叉搜索树中第K小的元素

文章目录 题目链接解题思路解题代码 题目链接 230. 二叉搜索树中第K小的元素 给定一个二叉搜索树的根节点 root ,和一个整数 k ,请你设计一个算法查找其中第 k 小的元素(从 1 开始计数)。 示例 1: 输入&#xff1a…

图论:欧拉路

欧拉路是什么 什么?你对这个名字感到很陌生?再看看是图论的内容,感觉是不是很难?其实一点也不难,这就是生活中的一笔画问题,也就是不重复的经过每一条边并可以访问所有的点,先看看这个图&#…

Python字符串格式化方法输出到控制台

python打印是这样的,我希望resource_id能够对齐输出: 3种格式化方法 %格式化 >>> pi3.141592653589793 >>> e2.718281828459045 >>> print("pi%.2f e%.3f" % (pi, e) ) pi3.14 e2.718“”.format()格式化 >…

操作系统(线程管理-通过条件变量实现消费者与生产者模型)

生产者与消费者模型 生产者:生产数据的线程,这类的线程负责从用户端、客户端接收数据,然后把数据Push到存储中介。 消费者:负责消耗数据的线程,对生产者线程生产的数据进行(判断、筛选、使用、响应、存储&…

x264 编码器 SSIM 算法源码分析

SSIM SSIM(Structural Similarity Index)是一种用于衡量两幅图像之间视觉相似度的指标。它不仅考虑了图像的亮度、对比度和饱和度,还考虑了图像的结构信息。SSIM的值介于-1到1之间,值越接近1表示两幅图像越相似。 SSIM是基于以下三个方面来计算的: 亮度(Luminance):比…

【Hot100】LeetCode—560. 和为 K 的子数组

目录 1- 思路前缀和 2- 实现⭐560. 和为 K 的子数组——题解思路 3- ACM 实现 原题链接:560. 和为 K 的子数组 1- 思路 前缀和 ① 借助 HashMap 存储前缀和:Key 为对应的前缀和,Value 为对应的出现次数 hm 初始化放入 (0,1) 代表和为 0 的情…

linux--chrony时间同步以及局域网同步

文章目录 注意Chrony介绍Chrony 支持的系统与联系方式Chrony Git 仓库环境准备服务端确认是否安装chrony如果没有安装,那就执行以下命令进行安装配置文件/etc/chrony.conf命令参数讲解基本用法选项命令系统时钟命令时间源命令NTP 访问命令执行命令参考 客户端确认是…

Java中等题-乘积最大子数组(力扣)

给你一个整数数组 nums ,请你找出数组中乘积最大的非空连续 子数组 (该子数组中至少包含一个数字),并返回该子数组所对应的乘积。 测试用例的答案是一个 32-位 整数。 示例 1: 输入: nums [2,3,-2,4] 输出: 6 解释: 子数组 […

Telemark电源TT-6E-BEAM SOURCE操作手侧含电路图接线

Telemark电源TT-6E-BEAM SOURCE操作手侧含电路图接线

详解LVS-dr模式

什么是LVS-dr模式 图解 在LVS-DR模式下,负载均衡器(也称为调度器)接收来自客户端的请求,并根据预设的调度算法将请求转发给后端的真实服务器(也称为RS)。真实服务器处理完请求后,直接将响应返回…

Jetpack Compose实战教程(六)

Jetpack Compose实战教程(六) 第六章 没有了margin,如何区分外边距?内边距? 文章目录 Jetpack Compose实战教程(六)一、前言二、本章目标三、开始编码3.1 特殊情况下的margin3.2 使用padding设…

基于Springboot+netty 的IM聊天室弹幕

代码地址 仓库地址 聊天室 创建springboot项目 因为我的NettyServer 是8080 所以我把服务端口改为了8081 引入netty 依赖 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.92.Final</versi…

Prometheus+Grafana保姆笔记(1)——Prometheus+Grafana的安装

Prometheus Grafana 的组合在微服务项目中可以完成许多DevOps任务&#xff0c;它们共同提供了强大的监控和可视化功能。 我们陆续介绍Prometheus Grafana 的相关用法。 首先介绍PrometheusGrafana的安装。 安装 Prometheus Prometheus 是GO写的&#xff0c;并不依赖于 Ja…

【深度学习】注意力机制(Transformer)

注意力机制 1.基础概念 1.1 查询、键和值 在人类的注意力方式中&#xff0c;有自主性的与非自主性的注意力提示两种解释方式。所谓自主性注意力提示&#xff0c;就是人本身主动想要关注到的某样东西&#xff1b;非自主性提示则是基于环境中物体的突出性和易见性&#xff0c;…

8.13面试题目

美团商业分析实习生 一、8个球有一个重一点&#xff0c;最少称几次找出 2次 8个球中有一个重一点&#xff0c;最少称2次能找出来。 具体称重步骤如下&#xff1a; 第一次称重&#xff1a;将8个球分成三组&#xff0c;分别为3个球、3个球和2个球。将两组各3个球放在天平的两端…

【搜索二维矩阵】python刷题记录

R4-二分查找专题 直接二维变一维&#xff0c;然后二分查找就可以了 class Solution:def searchMatrix(self, matrix: List[List[int]], target: int) -> bool:nums[i for row in matrix for i in row]def binfind(the,target):low,high0,len(the)-1while low<high:mid(l…