kafka全解

news2025/1/21 12:22:29

目录

  • Kafka概述
    • 定义
    • 消息队列
    • 目录结构分析
    • 传统消息队列的应用场景
    • 消息队列的两种模式
      • 点对点模式
      • 发布/订阅模式
    • Kafka基础架构
  • Kafka快速入门
    • 安装部署
      • 集群规划
      • 集群部署
      • 集群启停脚本
    • Kafka命令行操作
      • Kafka基础架构
      • 主题命令行操作
      • 生产者命令行操作
      • 消费者命令行操作
  • kafka可视化工具
  • Kafka重要概念
    • broker
    • zookeeper
    • producer(生产者)
    • consumer(消费者)
    • consumer group(消费者组)
    • 分区(Partitions)
    • 副本(Replicas)
    • 主题(Topic)
    • 偏移量(offset)
    • 消费者组
  • Kafka生产者
    • 生产者消息发送流程
      • 发送原理
      • 生产者重要参数列表
      • 异步发送API
        • 普通异步发送
        • 带回调函数的异步发送
      • 同步发送API
    • 生产者分区
      • 分区和副本机制
      • 分区好处
        • 轮询策略
        • 随机策略(不用)
        • 按key分配策略
        • 乱序问题
    • 副本机制
      • producer的ACKs参数
      • acks配置为0
      • acks配置为1
      • acks配置为-1或者all
    • Kafka生产者幂等性与事务
      • 幂等性
      • Kafka生产者幂等性
      • 幂等性原理
    • Kafka事务
      • 事务操作API
    • 数据有序和数据乱序
  • Kafka Broker
    • Zookeeper存储的Kafka信息
    • Kafka Broker总体工作流程
    • Broker重要参数
    • Kafka副本
      • 副本基本信息
      • Leader 选举流程
      • Leader 和 Follower 故障处理细节
      • 活动调整分区副本存储
      • Leader Partition 负载平衡
    • 文件存储
      • Topic 数据的存储机制
      • 文件清理策略
  • Kafka 消费者
    • Kafka 消费方式
    • Kafka 消费者工作流程
    • 消费者组原理
    • 消费者重要参数
    • offset 位移
      • offset 的默认维护位置
      • 自动提交offset
      • 手动提交offset
      • 指定Offset消费
  • Kafka-Kraft模式
    • Kafka-Kraft架构

Kafka概述

定义

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。

在这里插入图片描述

消息队列

目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。

在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。

在这里插入图片描述
在这里插入图片描述

目录结构分析

  1. bin:Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
  2. config:Kafka的所有配置文件
  3. libs: 运行Kafka所需要的所有JAR包
  4. logs: Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
  5. site-docs: Kafka的网站帮助文件

传统消息队列的应用场景

传统的消息队列的主要应用场景包括**:缓存/消峰、解耦异步通信**。

缓冲/消峰: 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

在这里插入图片描述
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

在这里插入图片描述
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理它们。

在这里插入图片描述

消息队列的两种模式

点对点模式

消费者主动拉去数据,消息收到后清除消息
在这里插入图片描述

发布/订阅模式

  • 可以有多个topic主题(浏览,点赞,收藏,评论等)
  • 消费者消费数据之后,不删除数据
  • 每个消费者互相独立,都可以消费到数据

在这里插入图片描述

Kafka基础架构

1、为方便扩展,并提高吞吐量,一个topic分为多个partition

2、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费

3、为提高可用性,为每个partition增加若干副本,类似NameNode HA

4、ZK中记录谁是leader,Kafka2.8.0 以后也可以配置不采用ZK.

在这里插入图片描述

  • Producer:消息生产者,就是向Kafka broker 发消息的客户端。

  • Consumer:消息消费者,向Kafka broker 取消息的客户端。

  • Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

  • Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Topic: 可以理解为一个队列,生产者和消费者面向的都是一个topic。

  • Partition: 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。

  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。

  • Leader:每个分区多个副本的 “主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。

  • Follower:每个分区多个副本中的 “从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个Follower会成为新的 Leader。

Kafka快速入门

安装部署

集群规划

Hadoop102Hadoop103Hadoop104
zkzkzk
kafkakafkakafka

集群部署

  1. docker部署zk集群:参考《zk全解》

  2. 进入到/usr/local/kafka目录,修改配置文件

    vim server.properties 
    
    
    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
    # 配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    # 监听所有网卡地址,允许外部端口连接     
    listeners=PLAINTEXT://0.0.0.0:9092 
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    

    可以提前在hosts文件中配置master,slave1,slave2的ip,之前在学习k8s的时候我已经配置过了,可以直接拿来用。

    listeners=PLAINTEXT://0.0.0.0:9092 ,默认情况下,advertised.listeners不设置的话,则默认使用listeners的属性,然而advertised.listeners是不支持0.0.0.0的,所以需要指定暴露的监听器,如下

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://虚拟机ip:9092
    
  3. 将安装包拷贝到其他服务器

  4. 分别在hadoop103和hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1broker.id=2

  5. 配置环境变量

    1. 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
    sudo vim /etc/profile.d/my_env.sh
    增加如下内容:
    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    

    这里我将kafka直接放在了根目录下的一个文件夹,更加方便:

    在这里插入图片描述

    1. 刷新一下环境变量。
    source /etc/profile
    
    1. 分发环境变量文件到其他节点,并 source。
    sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
    source /etc/profile
    source /etc/profile
    
  6. 分别启动kafka:

bin/kafka-server-start.sh -daemon config/server.properties
  1. 如果遇到cluser_id不符合的问题,直接将日志文件删除重新启动即可。

集群启停脚本

  1. 脚本如下,
#! /bin/bash
case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo " --------启动 $i Kafka-------"
		ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
	daemon /opt/module/kafka/config/server.properties"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo " --------停止 $i Kafka-------"
		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
	done
};;
esac
  1. 添加执行权限
chmod +x kf.sh
  1. 启动集群命令
kf.sh start
  1. 停止集群命令
kf.sh stop

Kafka命令行操作

Kafka基础架构

在这里插入图片描述

主题命令行操作

  1. 查看操作主题命令参数

    ./bin/kafka-topics.sh 
    

    在这里插入图片描述

  2. 查看当前服务器中的所有topic

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
    
  3. 创建 first topic

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
    

    选项说明:

    1. –topic 定义 topic 名
    2. –replication-factor 定义副本数
    3. –partitions 定义分区数
  4. 查看 first 主题的详情

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
    
  5. 修改分区数(注意:分区数只能增加,不能减少)

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3
    
  6. 查看结果:

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe 
    Topic: first	TopicId: _Pjhmn1NTr6ufGufcnsw5A	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
    	Topic: first	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    	Topic: first	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
    	Topic: first	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
    
  7. 删除 topic

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first 
    

生产者命令行操作

  1. 查看操作者命令参数

    ./bin/kafka-console-producer.sh 
    

    在这里插入图片描述

  2. 发送消息

    ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
    >hello world
    >yooome yooome
    

消费者命令行操作

  1. 查看操作消费者命令参数

    ./bin/kafka-console-consumer.sh
    

    在这里插入图片描述

  2. 消费消息

    1. 消费first 主题中的数据:
    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
    
    1. 把主题中所有的数据都读取出来(包括历史数据)。
    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
    

    在这里插入图片描述

kafka可视化工具

官网:https://www.kafkatool.com/download.html

在这里插入图片描述

Kafka重要概念

broker

在这里插入图片描述

  1. 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
  2. broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
  3. 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

zookeeper

在这里插入图片描述

  1. ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

  2. ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

  3. Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

producer(生产者)

生产者负责将数据推送给broker的topic

consumer(消费者)

消费者负责从broker的topic中拉取数据,并自己进行处理

consumer group(消费者组)

在这里插入图片描述

  1. consumer group是kafka提供的可扩展且具有容错性的消费者机制
  2. 一个消费者组可以包含多个消费者
  3. 一个消费者组有一个唯一的ID(group Id)
  4. 组内的消费者一起消费主题的所有分区数据

分区(Partitions)

在这里插入图片描述
在Kafka集群中,主题被分为多个分区

副本(Replicas)

在这里插入图片描述
副本可以确保某个服务器出现故障时,确保数据依然可用,在Kafka中,一般都会设计副本的个数>1,

主题(Topic)

在这里插入图片描述

  1. 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
  2. Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
  3. 在主题中的消息是有结构的,一般一个主题包含某一类消息
  4. 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

偏移量(offset)

在这里插入图片描述

  1. offset记录着下一条将要发送给Consumer的消息的序号
  2. 默认Kafka将offset存储在ZooKeeper中
  3. 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
  4. 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

消费者组

  1. Kafka支持有多个消费者同时消费一个主题中的数据。
    在这里插入图片描述

  2. 同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。

  3. 设置 test topic为2个分区bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test

Kafka生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到了两个线程 — main 线程和Sender线程。在main线程中创建了一个双端队列 RecordAccumulator。main线程将消息发送给ResordAccumlator,Sender线程不断从 RecordAccumulator 中拉去消息发送到 Kafka Broker。
在这里插入图片描述

生产者重要参数列表

在这里插入图片描述

异步发送API

普通异步发送

  1. 需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker。

在这里插入图片描述
2、代码编程go get github.com/Shopify/sarama

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "first"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{
		"192.168.71.128:9092", "192.168.71.129:9092", "192.168.71.130:9092",
	}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	} else {
		fmt.Println(client)
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

带回调函数的异步发送

在这里插入图片描述
【注意:】消息发送失败会自动重试,不需要我们在回调函数中手动重试。

同步发送API

在这里插入图片描述

生产者分区

分区和副本机制

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

  1. 轮询分区策略
  2. 随机分区策略
  3. 按key分区分配策略
  4. 自定义分区策略

分区好处

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

  2. 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行 消费数据

在这里插入图片描述

轮询策略

在这里插入图片描述

  1. 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
  2. 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区

随机策略(不用)

随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

在这里插入图片描述

按key分配策略

在这里插入图片描述
按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。

乱序问题

轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。

producer的ACKs参数

对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

acks配置为0

在这里插入图片描述

acks配置为1

在这里插入图片描述
当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。

acks配置为-1或者all

在这里插入图片描述

Kafka生产者幂等性与事务

幂等性

拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

在这里插入图片描述
如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。

Kafka生产者幂等性

在这里插入图片描述
在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

幂等性原理

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

  1. PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
  2. Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
  3. 幂等性只能保证的是在单分区单会话内不重复

在这里插入图片描述

Kafka事务

  1. Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

  2. 开启事务,必须开启幂等性

在这里插入图片描述

事务操作API

Producer接口中定义了以下5个事务相关方法:

  1. initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
  2. beginTransaction(开始事务):启动一个Kafka事务
  3. sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
  4. commitTransaction(提交事务):提交事务
  5. abortTransaction(放弃事务):取消事务

数据有序和数据乱序

在这里插入图片描述

Kafka Broker

Zookeeper存储的Kafka信息

[zk: localhost:2181(CONNECTING) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

在这里插入图片描述

Kafka Broker总体工作流程

在这里插入图片描述

Broker重要参数

在这里插入图片描述
在这里插入图片描述

Kafka副本

副本基本信息

  1. Kafka副本作用:提高数据可靠性。
  2. Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  3. Kafka中副本为:Leader和Follower。Kafka生产者只会把数据发往 Leader,然后Follower 找 Leader 进行同步数据。
  4. Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

AR = ISR + OSR

ISR:表示 Leader 保持同步的 Follower 集合。如果 Follower 长时间未 向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s 。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader ,负责管理集群 broker 的上下线,所有 topic 的分区副本分配 和 Leader 选举等工作。

在这里插入图片描述

  1. 创建一个新的 topic,4 个分区,4 个副本
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.
  1. 查看 Leader 分布情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
  1. 停止掉 hadoop105 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
  1. 停止掉 hadoop104 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
  1. 启动 hadoop105 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
  1. 启动 hadoop104 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
  1. 停止掉 hadoop103 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2

Leader 和 Follower 故障处理细节

LEO(Log End Offset): 每个副本的最后一个offset,LEO其实就是最新的 offset + 1。

HW(High Watermark):所有副本中最小的LEO。

在这里插入图片描述
LEOLog End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1

HWHigh Watermark):所有副本中最小的LEO

在这里插入图片描述

活动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求:创建一个新的 topic ,4个分区,两个副本,名称为three 。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。

在这里插入图片描述

手动调整分区副本存储的步骤如下:

  1. 创建一个新的 topic,名称为 three。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --create --partitions 4 --replication-factor 2 --
topic three
  1. 查看分区副本存储情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic three
  1. 创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

输入如下内容:

{
  "version":1,
  "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
  {"topic":"three","partition":1,"replicas":[0,1]},
  {"topic":"three","partition":2,"replicas":[1,0]},
  {"topic":"three","partition":3,"replicas":[1,0]}] 
}
  1. 执行副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file 
increase-replication-factor.json --execute
  1. 验证副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file 
increase-replication-factor.json --verify
  1. 查看分区副本存储情况。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic three

Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某 些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

在这里插入图片描述

参数名称描述
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。生产环
境中,leader 重选举的代价比较大,可能会带来
性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader
的比率。如果每个 broker 超过了这个值,控制器
会触发 leader 的平衡。
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔
时间。

文件存储

Topic 数据的存储机制

在这里插入图片描述
查看 hadoop102(或者 hadoop103、hadoop104)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件

[atguigu@hadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata

直接查看 log 日志,发现是乱码。

通过工具查看 index 和 log 信息。

[atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments 
--files ./00000000000000000000.index 
Dumping ./00000000000000000000.index
offset: 3 position: 152

在这里插入图片描述
日志存储参数配置

参数描述
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分
成块的大小,默认值 1G。
log.index.interval.bytes默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),
然后就往 index 文件里面记录一个索引。 稀疏索引。

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • Log.retention.hours,最低优先级小时,默认7天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 delete 和 compact 两种。

  1. delete 日志阐述:将过期数据删除
  • log.cleanup.policy = delete 所有数据启用阐述策略

(1) 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

(2) 基于大小:默认关闭。超过设置的所有日志总大小,阐述最早的 segment 。

log.retention.bytes,默认等于-1,表示无穷大。

在这里插入图片描述

  1. compact 日志压缩

compact日志压缩:对于相同 key 的不同 value 值,值保留最后一个版本。

  • log.cleanup.policy = compact所有数据启动压缩策略

在这里插入图片描述

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个 offset 大的 offset 对应的消息,实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。

​ 这种策略只适合特殊场景,比如消息的 key 是用户 ID,value 是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

Kafka 消费者

Kafka 消费方式

  • pull(拉)模式:consumer 采用从 broker 中主动拉去数据。Kafka 采用这种方式。
  • push(推)模式:Kafka没有采用这种方式,因为由 broker 决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是 50m/s,Consumer1,Consumer2就来不及处理消息。

pull 模式不足之处是,如果Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。

在这里插入图片描述

Kafka 消费者工作流程

在这里插入图片描述

消费者组原理

Consumer Group (CG):消费者组,由多个consumer组成。形成一个消费者组的条件是所有消费者的 groupid 相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

在这里插入图片描述
在这里插入图片描述

消费者重要参数

参数名称描述
bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了
消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在
(如,数据被删除了),该如何处理? earliest:自动重置偏
移量到最早的偏移量。 latest:默认,自动重置偏移量为最
新的偏移量。 none:如果消费组原来的(previous)偏移量
不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。
该条目的值必须小于 session.timeout.ms ,也不应该高于
session.timeout.ms 的 1/3。
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。
超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该
消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字
节数。该时间到,仍然会返回数据。
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批
消息最大的字节数。如果服务器端一批次的数据大于该值
(50m)仍然可以拉取回来这批数据,因此,这不是一个绝
对最大值。一批次的大小受 message.max.bytes (broker
config)or max.message.bytes (topic config)影响。
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

offset 位移

offset 的默认维护位置

在这里插入图片描述

自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true

  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

在这里插入图片描述

参数名称描述
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

手动提交offset

虽然自动提交offset十分简单比那里,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。一次 Kafka 还提供了手动提交 offset 的API。

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

在这里插入图片描述

指定Offset消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量

时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。

(2)latest(默认值):自动将偏移量重置为最新偏移量。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

在这里插入图片描述

Kafka-Kraft模式

Kafka-Kraft架构

在这里插入图片描述

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行;

  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;

  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;

  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/377883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法 # SimHash 算法:文本相似度、文本去重、海量文本快速查询

SimHash SimHash 是 Google 发明的海量网页去重的高效算法,将原始的文本映射为 64 位的二进制串,然后通过比较二进制的差异进而表示原始文本内容的差异。 传统的 Hash 算法只负责将原始内容尽量均匀随机地映射为一个 hash 值,原理上相当于伪随机数产生算法。SimHash 本身属…

javascript刷链表——链表基础知识、虚拟头节点操作链表及链表基本操作

javascript刷链表1javascript刷链表11.链表基础知识2.虚拟头节点操作链表力扣:203.移除链表元素3.链表基本操作力扣:707. 设计链表javascript刷链表1 今天开始刷链表,冲冲冲~ 1.链表基础知识 链表是一种通过指针串联在一起的一种线性结构&…

STM32之中断和事件

中断和事件什么是中断当CPU正在执行程序时,由于发生了某种事件,要求CPU暂时中断当前的程序执行,转而去处理这个随机事件,处理完以后,再回到原来被中断的地方,继续原来的程序执行,这样的过程称为…

TDengine创建database报错 DB error: Out of dnodes (0.000000s)

TDengine 创建数据库报错 CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 WAL_LEVEL 1; DB error: Out of dnodes (0.000000s)这个错误提示表明 TDengine 的数据节点已经用完,无法继续创建新的数据库。这可能是由于您的 TDengine 数据库中已经存在了大量的…

阶段十:总结专题(第五章:数据库篇)

阶段十:总结专题(第五章:数据库篇 )Day-第五章:数据库篇1. 隔离级别1.1、**未提交读**(不使用)1.2、**提交读(RC)**(经常使用)1.3、**可重复读&am…

RDO一体化部署OpenStack

RDO一体化部署OpenStack 环境准备 安装centos7 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J785hZvT-1677578418769)(C:\Users\HONOR\AppData\Roaming\Typora\typora-user-images\image-20230228171254675.png)] 使用vmware安装安装centos7&a…

【Django功能开发】如何正确使用定时任务(启动、停止)

系列文章目录 【Django开发入门】ORM的增删改查和批量操作 【Django功能开发】编写自定义manage命令 文章目录系列文章目录前言一、django定时任务二、django-apscheduler基本使用1.安装django-apscheduler2.配置settings.py的INSTALLED_APPS3.通过命令生成定时记录表3.如何创…

MySQL索引类型(type)分析

type索引类型 system > const > eq_ref > ref > range > index > all 优化级别从左往右递减,没有索引的⼀般为’all’。推荐优化目标:至少要达到 range 级别, 要求是 ref 级别, 如果可以是 const 最好&#xff…

线程池源码和CompletableFuture使用总结

线程池 线程池的创建方式 通过Executors的静态方法通过 new ThreadPoolExecutor方式创建 七大参数的作用 参数作用corePoolSize核心线程数,线程池创建好后就准备就绪的线程数量,一直存在maximumPoolSize最大线程数量,控制资源keepAliveTim…

嵌入式 linux 系统开发网络的设置

目录 一、前言 二、linux网络静态地址设置 前言 为什么要对linux系统下的ubuntu进行网络设置呢? 因为我们在嵌入式开发中,我们要保证windows系统、linux系统、开发板的ip要处于同一个网段,而默认ubuntu下的linux系统的ip是动态分配的&#…

ACM-蓝桥杯训练第一周

🚀write in front🚀 📝个人主页:认真写博客的夏目浅石.CSDN 🎁欢迎各位→点赞👍 收藏⭐️ 留言📝​ 📣系列专栏:ACM周训练题目合集.CSDN 💬总结&#xff1a…

debian 部署nginx https

我是flask 处理请求单进程, 差点意思 , 考虑先flask 在往下走 一:安装nginx 因为我是debian 系统,所以我的建议是直接 sudo apt-get install nginx 你也可以选择在官网下载, 但是我搭建ssl 的时候安装openssl非常的麻…

记住这3点,有效提高江苏专转本上岸率

记住这3点,有效提高上岸率 我们都知道,在江苏统招专转本考试中想岸并不是一件容易的事情。考生能否顺利上岸,往往受多方面因素影响,这其中包括:个人基础、学习方式、信息搜索能力。 如何提高自己的专转本上岸几率&…

粗心小编被云拯救,那云上数据谁来拯救?

开工第三天      小编已忙的焦头烂额      不是因为工作积压      而是因为自己的疏忽      也许是没有进入工作状态,一大早先经历了自行车钥匙丢失,手机遗落在家,好不容易坐到工位上才发现,备份数据的U盘忘带了。    不过,好在提前将工作文件上传到了云端…

10 大项目管理知识领域 4大核心领域 5大辅助领域

有人说:一个人从1岁活到80岁很平凡,但如果从80岁倒着活,那么一半以上的人都可能不凡。 生活没有捷径,我们踩过的坑都成为了生活的经验,这些经验越早知道,你要走的弯路就会越少。 项目管理的10大知识领域&a…

LeetCode 1145. 二叉树着色游戏 -- 简单搜索

二叉树着色游戏 提示 中等 199 相关企业 有两位极客玩家参与了一场「二叉树着色」的游戏。游戏中,给出二叉树的根节点 root,树上总共有 n 个节点,且 n 为奇数,其中每个节点上的值从 1 到 n 各不相同。 最开始时: 「一…

【C++】vector的基本使用

难道向上攀爬的那条路,不是比站在顶峰更让人热血沸腾吗? 文章目录一、vector和string的联系与不同二、vector的扩容操作1.resize() (缺省值为匿名对象)&& reserve()2.reserve在g和vs上的扩容机制3.reserve异地扩容和shri…

Pinia 介绍、使用、实践

1. Pinia 介绍1.1 Pinia 是什么Pinia 官网https://pinia.vuejs.org/vuex Githubhttps://github.com/vuejs/vuex上面是 Vuex Github 中置顶说明,我们可以得知:Pinia 现在是新的默认设置,Vue 的官方状态管理库已更改为 Pinia,Vue3、…

数据结构与算法系列之时间与空间复杂度

这里写目录标题算法的复杂度大O的渐进表示法实例分析空间复杂度每日一题算法的复杂度 衡量一个算法的好坏,一般 是从时间和空间两个维度来衡量的, 即时间复杂度和空间复杂度。 时间复杂度主要衡量一个算法的运行快慢, 空间复杂度主要衡量一个…

Linux -- 程序 进程 线程 概念引入

程序与进程 :程序 :什么是程序 ???伪官方 : 二进制文件,文件存储在磁盘中,例如 /usr/bin 目录下 。 是静态。 简单讲 :# 我们都学习了语言,比如下面这串代…