【Kafka】消息队列Kafka进阶

news2024/11/13 17:58:49

目录

    • Kafka分区机制
      • 生产者分区写入策略
        • 轮询策略
        • 随机策略(不用)
        • 按key分配策略
        • 乱序问题
        • 自定义分区策略
      • 消费者组Rebalance机制
      • 消费者分区分配策略
        • Range范围分配策略
        • RoundRobin轮询策略
        • Stricky粘性分配策略
    • Kafka副本机制
      • producer的ACKs参数
        • acks配置为0
        • acks配置为1
        • acks配置为-1或者all
    • 高级(High Level)API与低级(Low Level)API
      • 高级(High Level)API
        • 低级(Low Level)API
    • 监控工具Kafka-eagle
      • 安装Kafka-Eagle
    • Kafka原理
      • 分区的leader与follower
        • AR、ISR、OSR
        • Controller
          • Controller的选举
        • leader负载均衡
          • Preferred Replica
    • Kafka 生产、消费数据工作流程
      • Kafka数据写入流程
      • Kafka数据消费流程
        • 两种消费模式
    • Kafka的数据存储形式
      • 存储日志
      • 读取消息
      • 删除消息
    • 消息不丢失机制
      • broker数据不丢失
      • 生产者数据不丢失
      • 消费者数据不丢失
    • 数据积压
      • Lag标签
      • 解决数据积压问题
    • Kafka中数据清理(Log Deletion)
      • 定时日志删除任务
        • 基于时间的保留策略
        • 基于日志大小的保留策略
        • 基于日志起始偏移量保留策略
      • 日志压缩(Log Compaction)
    • Kafka配额限速机制(Quotas)
      • 限制producer端速率
      • 限制consumer端速率
      • 取消Kafka的Quota配置

Kafka分区机制

生产者分区写入策略

生产者写入消息到 topic,Kafka 将依据不同的策略将数据分配到不同的分区中。

  • 轮询分区策略
  • 随机分区策略
  • 按key分区分配策略
  • 自定义分区策略

轮询策略

  默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。如果在生产消息时,key为null,则使用轮询算法均衡地分配分区。
在这里插入图片描述

随机策略(不用)

  随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
在这里插入图片描述

按key分配策略

  按key分配策略,有可能会出现【数据倾斜】,例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
在这里插入图片描述

乱序问题

  轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。
  而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

自定义分区策略

在这里插入图片描述
创建自定义分区器:

public class KeyWithRandomPartitioner implements Partitioner {
    private Random r;

    @Override
    public void configure(Map<String, ?> configs) {
        r = new Random();
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //cluster.partitionCountForTopic 表示获取指定topic的分区数量
        //r.nextInt(1000) 表示从随机数生成器 r 中随机生成一个小于1000的整数,其中参数1000指定了生成的随机数的范围,即生成的随机数是0到999之间的整数。
        //在这段代码中,生成的随机数将被用于计算消息所在的分区编号。由于模运算 % cluster.partitionCountForTopic(topic) 的结果必须小于分区数量,因此这里对1000取模的目的是将随机数的范围缩小到分区数量内,以确保不会选择到超出范围的分区编号。
        return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
    }

    @Override
    public void close() {}
}

在Kafka生产者配置中,自定使用自定义分区器的类名:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());

消费者组Rebalance机制

  Kafka 中的 Rebalance 称之为再均衡,是 Kafka 中确保 Consumer group 下所有的consumer 如何达成一致,分配订阅的 topic 的每个分区的机制。

Rebalance触发的时机有:

  • 消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。
    在这里插入图片描述

  • 订阅的topic个数发生变化,消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。

  • 在这里插入图片描述

  • 订阅的topic分区数发生变化
    在这里插入图片描述
    Rebalance的不良影响:

  • 发生 Rebalance 时,consumer group 下的所有 consumer 都会协调在一起共同参与,Kafka 使用分配策略尽可能达到最公平的分配。

  • Rebalance 过程会对 consumer group 产生非常严重的影响,Rebalance 的过程中所有的消费者都将停止工作,直到 Rebalance 完成。

消费者分区分配策略

  • Range范围分配策略
  • RoundRobin轮询策略
  • Stricky粘性分配策略

Range范围分配策略

Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

:Range 范围分配策略是针对 每个Topic的。

配置消费者的partition.assignment.strategyorg.apache.kafka.clients.consumer.RangeAssignor

props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");

算法公式

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个

剩余消费者消费n个

在这里插入图片描述
在这里插入图片描述

RoundRobin轮询策略

  RoundRobinAssignor 轮询策略是将消费组内所有消费者以及消费者所订阅的 所有topic的partition 按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

配置消费者的partition.assignment.strategyorg.apache.kafka.clients.consumer.RoundRobinAssignor

props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

在这里插入图片描述

Stricky粘性分配策略

  从Kafka 0.11.x开始,引入此类分配策略。主要目的:分区分配尽可能均匀。
  在发生 rebalance 的时候,分区的分配尽可能与上一次分配保持相同。没有发生rebalance 时,Striky粘性分配策略和 RoundRobin 分配策略类似。
在这里插入图片描述
上面如果 consumer2 崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配,例如:
在这里插入图片描述
可以发现,consumer0 和 consumer1 原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。
在这里插入图片描述
  Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费。
  例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)

Kafka副本机制

  副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。

producer的ACKs参数

  对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

// 配置ACKs参数
props.put("acks", "all");

acks配置为0

acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的。
在这里插入图片描述
ACK为0,基准测试:

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=10.211.55.8:9092 acks=0

acks配置为1

生产者会等到leader分区写入成功后,返回成功,接着发送下一条,性能中等。
在这里插入图片描述

acks配置为-1或者all

确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的。
在这里插入图片描述
基准测试结果:

指标单分区单副本(ack=0)单分区单副本(ack=1)单分区单副本(ack=-1/all)
吞吐量47359.248314 records/sec 每秒4.7W条记录40763.417279 records/sec 每秒4W条记录540.5 /s 每秒7.3W调记录
吞吐速率45.17 MB/sec 每秒约45MB数据38.88 MB/sec 每秒约89MB数据0.52 MB/sec
平均延迟时间686.49 ms avg latency799.67 ms avg latency120281.8 ms
最大延迟时间1444.00 ms max latency1961.00 ms max latency1884.00 ms

  根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。

  分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。

高级(High Level)API与低级(Low Level)API

高级(High Level)API

  • 消费Kafka的消息很容易实现,写起来比较简单。
  • 不需要执行去管理offset,直接通过ZK管理;也不需要管理分区、副本,由Kafka统一管理。
  • 消费者会自动根据上一次在ZK中保存的offset去接着获取数据。
  • 在ZK中,不同的消费者组(group)同一个topic记录不同的offset,这样不同程序读取同一个topic,不会受offset的影响。
  • 高级API的缺点:不能控制offset,例如:想从指定的位置读取;不能细化控制分区、副本、ZK等。
// 消费者程序:从test主题中消费数据
public class ConsumerTest {
    public static void main(String[] args) {
        // 1. 创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.88.100:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2. 创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        consumer.subscribe(Arrays.asList("test"));

        // 4. 使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // 定义100毫秒超时
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

低级(Low Level)API

  通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。
  而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前 offset 是自动保存在ZK中,使用低级API,可以将offset不一定要使用 ZK 存储,可以自己来存储offset。
  例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。

示例手动消费分区数据:

  之前高级(High Level)API,我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,需要指定要消费的分区,例如:如果某个程序将某个指定分区的数据保存到外部存储中,例如:Redis、MySQL,那么保存数据的时候,只需要消费该指定的分区数据即可。如果某个程序是高可用的,在程序出现故障时将自动重启(例如:Flink、Spark程序)。这种情况下,程序将从指定的分区重新开始消费数据。

不再使用之前的 subscribe 方法订阅主题,而使用 assign 方法指定想要消费的消息:

String topic = "test";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

一旦指定了分区,就可以就像前面的示例一样,在循环中调用 poll 方法消费消息。

当手动管理消费分区时,即使 GroupID 是一样的,Kafka的组协调器都将不再起作用。

如果消费者失败,也将不再自动进行分区重新分配。

监控工具Kafka-eagle

  开发工作中,当业务前提不复杂时,可以使用 Kafka 命令来进行一些集群的管理工作。但如果业务变得复杂,例如:需要增加 group、topic 分区,此时,使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助完成日常的管理工作,将会大大提高对于 Kafka 集群管理的效率,而且使用工具来监控消费者在 Kafka 中消费情况。

  Kafka Eagle是一款结合了目前大数据 Kafka 监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。官网地址:https://www.kafka-eagle.org/

安装Kafka-Eagle

开启Kafka JMX端口:

  JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。

在启动Kafka的脚本前,添加:
【Kafka】消息队列Kafka基础——>编写Kafka一键启动/关闭脚本 已添加

cd ${KAFKA_HOME}
export JMX_PORT=9988
nohup bin/kafka-server-start.sh config/server.properties &

安装Kafka-Eagle

需提前准备好mysql数据库并创建ke数据库。安装JDK,并配置好JAVA_HOME。

# 将kafka_eagle上传,并解压到 /export/server 目录中。
cd cd /export/software/
tar -xvzf kafka-eagle-bin-3.0.1.tar.gz -C ../server/

cd /export/server/kafka-eagle-bin-3.0.1/ 
tar -xvzf efak-web-3.0.1-bin.tar.gz 
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1

# 配置 kafka_eagle 环境变量。
vim /etc/profile
export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
export PATH=$PATH:$KE_HOME/bin
source /etc/profile

配置 kafka_eagle。使用 vi 打开 conf 目录下的 system-config.properties

vim conf/system-config.properties
# 修改第4行,配置kafka集群别名
kafka.eagle.zk.cluster.alias=cluster1
# 修改第5行,配置ZK集群地址
cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
# 注释第6行
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
# 修改第32行,打开图标统计
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

# 注释第69行,取消sqlite数据库连接配置
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

# 修改第77行,开启mysql
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://10.211.55.8:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=52809329
# 启动脚本`ke.sh`中配置JAVA_HOME:
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin

vim ke.sh
# 在第24行添加JAVA_HOME环境配置
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-arm64

# 修改Kafka eagle可执行权限
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin
chmod +x ke.sh

# 启动 kafka_eagle
./ke.sh start

#访问Kafka eagle,默认用户为admin,密码为:123456
http://10.211.55.8:8048/ke

Kafka eagle界面中Kafka度量指标 topic list
点击Topic下的List菜单,就可以展示当前Kafka集群中的所有topic。
在这里插入图片描述

指标意义
Brokers Spreadbroker使用率
Brokers Skew分区是否倾斜
Brokers Leader Skewleader partition是否存在倾斜

Kafka原理

分区的leader与follower

  在Kafka中,每个 topic 都可以配置多个分区以及多个副本。
  每个分区都有一个leader以及0个或者多个 follower。在创建 topic 时,Kafka会将每个分区的leader均匀地分配在每个broker上。
  正常使用kafka是感觉不到 leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。

总结

  • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步。
  • 如果leader出现故障,其他follower会被重新选举为leader
  • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中

在这里插入图片描述
查看某个partition的leader在哪个服务器中:

示例 名为 test 的3个分区、3个副本的topic。

在这里插入图片描述

AR、ISR、OSR

  在实际环境中,leader 有可能会出现一些故障,所以 Kafka 一定会选举出新的leader。
  Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR。

  • 分区的所有副本称为 ARAssigned Replicas已分配的副本)
  • 所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISRIn-Sync Replicas 在同步中的副本)
  • 由于 follower 副本同步滞后过多的副本(不包括 leader 副本)组成 OSROut-of-Sync Replias
  • AR = ISR + OSR
  • 正常情况下,所有的 follower 副本都应该与 leader 副本保持同步,即 AR = ISROSR集合为空。

在这里插入图片描述

Controller

  Kafka启动时,会在所有的 broker 中选择一个Controller。前面 leader 和 follower 是针对partition,而 Controller 是针对 broker 的。
  创建 topic、或者添加分区、修改副本数量之类的管理任务都是由 Controller 完成的。Kafka 分区leader 的选举,也是由 Controller 决定的。

Controller的选举

  在 Kafka 集群启动的时候,每个 broker 都会尝试去 ZooKeeper 上注册成为Controller(ZK临时节点),但只有一个竞争成功,其他的 broker 会注册该节点的监视器。一但该临时节点状态发生变化,就可以进行相应的处理。Controller 也是高可用的,一旦某个 broker 崩溃,其他的 broker 会重新注册为Controller。

找到当前Kafka集群的 Controller:
Kafka Tools的 Tools 菜单,找到 ZooKeeper Brower,查看到哪个 broker 是 Controller
在这里插入图片描述
Controller 选举 partition leader:

  • 所有 Partition 的 leader 选举都由 Controller 决定
  • Controller 会将 leader 的改变直接通过 RPC 的方式通知需为此作出响应的 Broker
  • Controller 读取到当前分区的 ISR,只要有一个 Replica 还幸存,就选择其中一个作为 leader 否则,则任意选这个一个 Replica 作为leader
  • 如果该 partition 的所有 Replica 都已经宕机,则新的 leader 为-1

  具体来说,当一个分区的 leader 副本失效时,follower 副本会发现并向其它 broker 节点发送请求,申请成为该分区的新 leader。同时,每个 broker 节点会周期性地向 controller 节点发送心跳请求,汇报自己当前的状态和可用性信息。controller 节点会根据这些信息,选择一个健康的、可用的 broker 节点作为该分区的新 leader。

在选举新 leader 的过程中,Controller 节点会参考如下因素

  • 副本状态:只有处于 ISR(in-sync replicas)列表中的 follower 副本才有资格成为新 leader,因为它们的数据已经与 leader 同步。
  • 副本位置:Controller 节点会选择与原 leader 副本相同或更靠前的位置作为新 leader 的位置,以确保最小化数据丢失。
  • 副本健康状况:Controller 节点会优先选择健康的、可用的 broker 节点作为新 leader,以确保高可用性和服务质量。

  总之,Controller 节点会综合考虑多个因素,选出一个最适合成为新 leader 的 broker 节点,从而保障 Kafka 集群的高可用性和稳定性。

为什么不能通过ZK的方式来选举partition的leader?

  Kafka 集群如果业务很多的情况下,会有很多的 partition。假设某个 broker 宕机,就会出现很多的 partiton 都需要重新选举 leader。如果使用 Zookeeper 选举 leader,会给 Zookeeper 带来巨大的压力。所以,Kafka 中 leader 的选举不能使用 ZK 来实现。

leader负载均衡

Preferred Replica

  Kafka 中引入了一个叫做 preferred-replica 的概念,意思就是:优先的 Replica
  在 ISR 列表中,第一个 replica 就是preferred-replica,第一个分区存放的broker,肯定就是 preferred-replica。

执行以下脚本可以将preferred-replica设置为 leader,均匀分配每个分区的leader。

./kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic 主题 --partition=1 --election-type preferred

确保leader在broker中负载均衡:

杀掉 test 主题的某个broker,这样 Kafka 会重新分配leader。等到 Kafka 重新分配 leader 之后,再次启动 Kafka 进程。此时:观察test主题各个分区leader的分配情况。

在这里插入图片描述
此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:

bin/kafka-leader-election.sh --bootstrap-server 10.211.55.8:9092 --topic test --partition=1 --election-type preferred
# Successfully completed leader election (PREFERRED) for partitions test-1

–partition:指定需要重新分配leader的partition编号:
在这里插入图片描述

Kafka 生产、消费数据工作流程

Kafka数据写入流程

在这里插入图片描述

  • 生产者先从 zookeeper 的 “/brokers/topics/主题名/partitions/分区名/state”节点找到该 partition 的leader:

在这里插入图片描述

  • 生产者在ZK中找到该ID找到对应的broker:

在这里插入图片描述

  • broker进程上的 leader 将消息写入到本地 log 中。
  • follower从 leader 上拉取消息,写入到本地 log,并向 leader 发送 ACK
  • leader接收到所有的 ISR 中的 Replica 的 ACK 后,并向生产者返回 ACK。

Kafka数据消费流程

两种消费模式

  Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息。消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
在这里插入图片描述

  • 每个 consumer 都可以根据分配策略(默认 RangeAssignor ),获得要消费的分区
  • 获取到 consumer 对应的offset(默认从 ZK 中获取上一次消费的 offset )
  • 找到该分区的 leader,拉取数据
  • 消费者提交 offset
    在这里插入图片描述

Kafka的数据存储形式

  一个 topic 由多个分区组成。一个分区(partition)由多个segment(段)组成,一个segment(段)由多个文件组成(log、index、timeindex)。

在这里插入图片描述

存储日志

  Kafka中的数据是保存在 /export/server/kafka_2.12-2.4.1/data(自己安装在配置文件设定)中,消息是保存在以:主题名-分区ID 的文件夹中的,数据文件夹中包含以下内容:
在这里插入图片描述
在这里插入图片描述

文件名说明
00000000000000000000.index索引文件,根据offset查找数据就是通过该索引文件来操作的
00000000000000000000.log日志数据文件
00000000000000000000.timeindex时间索引
leader-epoch-checkpoint持久化每个partition leader对应的LEO (log end offset、日志文件中下一条待写入消息的offset)
  • 每个日志文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都以0000000000000000000.log 开始

  • 默认的每个日志文件最大为 log.segment.bytes =102410241024 1G

  • 为了简化根据 offset 查找消息,Kafka 日志文件名设计为开始的偏移量

测试:新创建一个topic:test_10m,该topic每个日志数据文件最大为10M

bin/kafka-topics.sh --create --zookeeper 10.211.55.8 --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes=10485760

使用之前的生产者程序往 test_10m 主题中生产数据:

在这里插入图片描述
写入消息:新的消息总是写入到最后的一个日志文件中,该文件如果到达指定的大小(默认为:1GB)时,将滚动到一个新的文件中。

读取消息

根据 offset 首先需要找到存储数据的 segment 段(注意:offset指定分区的全局偏移量)
然后根据这个 全局分区offset 找到相对于文件的 segment段offset :
在这里插入图片描述
最后再根据 segment段offset 读取消息,为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就是使用简单的二分查找。
在这里插入图片描述

删除消息

  在Kafka中,消息是会被定期清理的。一次删除一个 segment 段的日志文件,Kafka 的日志管理器,会根据 Kafka 的配置,来决定哪些文件可以被删除。

消息不丢失机制

  • broker数据不丢失
  • 生产者数据不丢失
  • 消费者数据不丢失

broker数据不丢失

  生产者通过分区的leader写入数据后,所有在 ISR 中 follower 都会从 leader 中复制数据,这样,可以确保即使 leader 崩溃了,其他的 follower 的数据仍然是可用的。

生产者数据不丢失

  生产者连接leader写入数据时,可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置:

  • 配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接
    收到数据)

  • 配置ACK响应要求为 1 时 —— 表示leader收到数据

  • 配置ACK影响要求为 0 时 —— 生产者只负责发送数据,不关心数据是否丢失(这种情
    况可能会产生数据丢失,但性能是最好的)

生产者可以采用同步和异步两种方式发送数据:

  • 同步:发送一批数据给kafka后,等待kafka返回结果
  • 异步:发送一批数据给kafka,只是提供一个回调函数。

说明:如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。

消费者数据不丢失

在消费者消费数据的时候,只要每个消费者记录好 offset 值即可,就能保证数据不丢失。

数据积压

  Kafka 消费者消费数据的速度是非常快的,但如果由于处理 Kafka 消息时,由于有一些外部 IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。

Lag标签

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决数据积压问题

当Kafka出现数据积压问题时,首先要找到数据积压的原因。以下是在企业中出现数据积压的几个类场景。

数据写入MySQL失败


问题描述
某日运维人员找到开发人员,说某个topic的一个分区发生数据积压,这个topic非常重要,而且开始有用户投诉。运维非常紧张,赶紧重启了这台机器。重启之后,还是无济于事。

问题分析

消费这个topic的代码比较简单,主要就是消费topic数据,然后进行判断在进行数据库操作。运维通过kafka-eagle找到积压的topic,发现该topic的某个分区积压了几十万条的消息。
最后,通过查看日志发现,由于数据写入到MySQL中报错,导致消费分区的offset一自没有提交,所以数据积压严重。

因为网络延迟消费失败


问题描述

基于Kafka开发的系统平稳运行了两个月,突然某天发现某个topic中的消息出现数据积压,大概有几万条消息没有被消费。

问题分析

通过查看应用程序日志发现,有大量的消费超时失败。后查明原因,因为当天网络抖动,通过查看Kafka的消费者超时配置为50ms,随后,将消费的时间修改为500ms后问题解决。

Kafka中数据清理(Log Deletion)

  Kafka 的消息存储在磁盘中,为了控制磁盘占用空间,Kafka 需要不断地对过去的一些消息进行清理工作。Kafka 的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在 Kafka 中,提供两种日志清理方式:

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。日志删除是以段(segment日志)为单位来进行定期清理的。

  • 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。

在 Kafka 的broker或topic配置中:

配置项配置值说明
log.cleaner.enabletrue(默认)开启自动清理日志功能
log.cleanup.policydelete(默认)删除日志
log.cleanup.policycompaction压缩日志
log.cleanup.policydelete,compact同时支持删除、压缩

定时日志删除任务

  Kafka 日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。
在这里插入图片描述

  当前日志分段的保留策略有3种:

  • 基于时间的保留策略

  • 基于日志大小的保留策略

  • 基于日志起始偏移量的保留策略

基于时间的保留策略

  以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:log.retention.hourslog.retention.minuteslog.retention.ms
  其中,优先级为 log.retention.ms > > > log.retention.minutes > > > log.retention.hours

  默认情况,在broker中,配置如下:log.retention.hours=168。也就是,默认日志的保留时间为168小时,相当于保留7天。

  删除日志分段时:从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)

  Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。

设置topic 5秒删除一次

为了方便观察,设置段文件的大小为1M
在这里插入图片描述
设置topic的删除策略:key: retention.ms value: 5000
在这里插入图片描述
  尝试往topic中添加一些数据,等待一会,观察日志的删除情况。可以发现,日志会定期被标记为删除,然后被删除。

基于日志大小的保留策略

  日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过 broker 端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。

:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

基于日志起始偏移量保留策略

  每个 segment 日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

日志压缩(Log Compaction)

  Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的 key 对应的数据只保留一个版本。

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment
  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态

在这里插入图片描述

Kafka配额限速机制(Quotas)

  生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用 broker 上的全部资源,造成网络 IO 饱和。有了配额(Quotas)就可以避免这些问题。Kafka 支持配额管理,从而可以对 Producer 和 Consumer 的 produce&fetch 操作进行流量限制,防止个别业务压爆服务器。

限制producer端速率

为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试,观察生产消息的速率:

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

# 结果:50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

限制consumer端速率

  对consumer限速与 producer 类似,只不过参数名不一样。为指定的 topic 进行限速,以下为所有 consumer 程序设置 topic 速率不超过1MB/s,即1048576/s。命令如下:

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试:

bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test --fetch-size 1048576 --messages 500000

#结果为:MB.sec:1.0743

取消Kafka的Quota配置

使用以下命令,删除Kafka的Quota配置:

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/802243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 10 on ARM, version 22H2 (updated Jul 2023) ARM64 AArch64 中文版、英文版下载

Windows 10 on ARM, version 22H2 (updated Jul 2023) ARM64 AArch64 中文版、英文版下载 基于 ARM 的 Windows 10 请访问原文链接&#xff1a;https://sysin.org/blog/windows-10-arm/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;s…

汽车产业链面临重大变革 大运乘用车加强产业布局 助力低碳出行

当前&#xff0c;国家“双碳”战略的全面实施&#xff0c;全球绿色产业发展理念的不断加深以及汽车产品形态、交通出行模式、能源消费结构变革所呈现的发展机遇等诸多因素&#xff0c;持续推动新能源汽车产业全面转型提速。据悉&#xff0c;2022年&#xff0c;中国新能源汽车销…

C/C++ 动态内存分配与它的指针变量

一、什么是内存的动态分配 全局变量分配在内存中的静态存储区。局部变量&#xff08;包括形参&#xff09;分配在内存中的动态存储区&#xff0c;这个存储区是一个称为栈的区域。除此之外&#xff0c;C语言还允许建立内存动态分配区域&#xff0c;以存放一些临时用的数据&…

HighTec 工程配置详解1

目录 HighTec 工程配置详解编译配置构建配置管理器编译属性编译步骤编译环境变量编译日志编译配置TriCore C CompilerTriCore C LinkerHighTec 工程配置详解 编译配置 构建配置管理器 管理器内,可以创建各种不同用途的配置项。例如用于生产工程的 ROM 配置,用于调试工程的…

【跨代码仓库合并方案】

1、背景&#xff1a; 1、wiser绑定的uiidA的定制修改内容和ELKO绑定的uiidB基本是一样的&#xff0c;需要手动粘贴同步&#xff0c;增加测试保障风险&#xff0c;还会浪费开发资源投入&#xff1b; 2、施耐德wiser和elko面板两套面板基本一致&#xff0c;但是经过new art升级后…

关于浙大MEM项目报考的一些高频但很少有答案的一些问题……

2024年浙大MEM提前批面试申请已经进入材料评审阶段&#xff0c;考生们按照自己的节奏备考等待即可。 针对今年的项目报考&#xff0c;其实有很多细节问题考生们平时很关注的&#xff0c;但获取官方的回应往往会有一定的难度&#xff0c;杭州达立易考教育根据平时的咨询积累了一…

【牛客】CM11链表分割

题目分析&#xff1a; 以链表head->4->2->1->6->0->8->7为例&#xff0c;分割后应该为head->4->2->1->0->6->8->7 定义两个链表&#xff0c;less存储比x小的所有节点&#xff0c;greater存储比x大的所有节点 遍历原链表&#xff0c;…

swiper不生效/切换不生效,点击切换按钮activeIndex值不对应问题@令狐张豪

原因&#xff1a;因为把new Swiper放在mounted实例化的时候可能v-for并未执行完成结构还未完全生成 错误&#xff1a;先执行了swiper实例化后循环的&#xff1b;正确&#xff1a;先循环完数据确保数据完整循环完成后再执行swiper实例化&#xff1b; 解决方案&#xff1a;watch…

临床数据 5. 如何构建微卫星不稳定性结直肠癌预后评分系统?

临床数据分析方案 桓峰基因公众号推出临床数据分析方案教程&#xff0c;整理如下&#xff1a; 临床数据 1. 临床基因突变数据如何发高分&#xff1f; 临床数据 2. 基于NGS的胃癌诊疗全过程的临床应用方案 临床数据 3. 肿瘤微小残留病灶(MRD)如何发文章&#xff1f; 临床数据 4.…

时间复杂度函数图像

复杂度一览 f(n)阶函数y1O(1)常数函数ylogxO(logn)对数函数yxO(n)线性函数yxlogxO(nlogn)线性对数函数yx^2O(n^2)二次函数yx^3O(n^3)三次函数y2^xO(2^n)指数函数 对比图一览 对比结果在线预览 参考 时间复杂度比较及时间复杂度对应函数&#xff0c;函数图像

STL标准模板库 字符与字符串 string,string_view,const char *

文章目录 字符与字符串计算机如何表达字符可显示字符控制字符关于控制字符的一个冷知识 C 语言字符C 语言中的字符类型 charchar 即整数”思想应用举例C 语言帮手函数关于 char 类型的一个冷知识 C 语言中的字符串“0结尾字符串”知识点应用举例C 语言转义符% 和 \ 的异同 C 字…

flutter 打包iOS安装包

flutter iOS Xcode打包并导出ipa文件安装包 1、 Xcode配置 1、 启动打包 1、 等待打包 1、 打包完成、准备导出ipa 1、 选择模式 1、 选择配置文件 1、 导出 1、 选择导出位置 1、 得到ipa

Python数据可视化工具——Pyecharts

目录 1 简介绘图前先导包 2 折线图3 饼图4 柱状图/条形图5 散点图6 箱线图7 热力图8 漏斗图9 3D柱状图10 其他&#xff1a;配置项 1 简介 Pyecharts是一款将python与echarts结合的强大的数据可视化工具 Pyecharts是一个用于生成echarts图表的类库。echarts是百度开源的一个数据…

火山引擎DataLeap的Data Catalog系统公有云实践 (下)

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 Data Catalog公有云遇到的挑战 Data Catalog经历了一个从0到1在火山引擎公有云部署并逐步优化和迭代发布10版本的过程&#xff0c;在这个过程中经历不少挑战&#…

Codeforces Round 888 (Div. 3)(视频讲解全部题目)

[TOC](Codeforces Round 888 (Div. 3)&#xff08;视频讲解全部题目&#xff09;) Codeforces Round 888 (Div. 3)&#xff08;A–G&#xff09;全部题目详解 A Escalator Conversations #include<bits/stdc.h> #define endl \n #define INF 0x3f3f3f3f using namesp…

数据结构:顺序表(C实现)

个人主页 水月梦镜花 个人专栏 C语言 &#xff0c;数据结构 文章目录 一、顺序表二、实现思路1.存储结构2.初始化顺序表(SeqListInit)3.销毁顺序表(SeqListDestroty)4.打印顺序表(SeqListPrint)5.顺序表尾插(SeqListPushBack)and检查容量(SeqListCheckCapacity)6.顺序表头插(Se…

大数据面试题之Elasticsearch:每日三题(六)

大数据面试题之Elasticsearch:每日三题 1. 为什么要使用Elasticsearch&#xff1f;2.Elasticsearch的master选举流程&#xff1f;3.Elasticsearch集群脑裂问题&#xff1f; 1. 为什么要使用Elasticsearch&#xff1f; 系统中的数据&#xff0c;随着业务的发展&#xff0c;时间…

如何创建一个容器并运行docker镜像

文章目录 如何创建一个容器并使用docker镜像docker命令解析nacos启动成功 访问 进入容器&#xff0c;修改配置文件 接上集 CentOS 7安装Docker https://blog.csdn.net/qq_39017153/article/details/131955100 如何创建一个容器并使用docker镜像 还是打开镜像容器官网https://hu…

JS如何获取最近一个月或指定天数的日期,并以数组的形式存储

JS如何获取最近一个月或指定天数的日期,并以数组的形式存储 代码 num为传递的天数 (传递30查最近一个月) get_date(num) {let dateArray []//获取今天日期let myDate new Date()let today myDate.getFullYear() - (myDate.getMonth() 1) "-" myDate.getDate(…

升讯威在线客服系统是如何实现对 IE8 完全完美支持的(怎样从 WebSocket 降级到 Http)【干货】

简介 升讯威在线客服与营销系统是基于 .net core / WPF 开发的一款在线客服软件&#xff0c;宗旨是&#xff1a; 开放、开源、共享。努力打造 .net 社区的一款优秀开源产品。 完整私有化包下载地址 &#x1f4be; https://kf.shengxunwei.com/freesite.zip 当前版本信息 发布…