Kafka原理

news2025/1/10 11:24:25

生产者原理解析

生产者工作流程图:
在这里插入图片描述
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka 中;
RecordAccumulator主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B ,即32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即60秒。
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,
RecordAccumulator内部为每个分区都维护了一个双端队列,即Deque。
消息写入缓存时,追加到双端队列的尾部;
Sender读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;
与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。
ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看 ProducerBatch中是否还可以写入这个ProducerRecord,如果可以写入就直接写入,如果不可以则需要创建一个新的Producer Batch。在新建 ProducerBatch时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch。
Sender从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群broker节点。对于网络连接来说,生产者客户端是与具体broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node, List>的形式之后, Sender会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个Node了,这里的Request是Kafka各种协议请求;
请求在从sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<Nodeld, Deque>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per. connection ,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。通过比较 Deque 的size与这个参数的大小来判断对应的 Node中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。

Producer往Broker发送消息应答机制

kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在构造producer 时通过acks参数指定(在 0.8.2.X 前是通过 request.required.acks 参数设置的)。这个参数支持以下三种值:

  • acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。
  • acks = 1:意味着leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader发生崩溃。
  • acks = all(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
acks含义
0Producer往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。
1Producer往集群发送数据只要 leader成功写入消息就可以发送下一条,只确保Leader 接收成功。
-1或allProducer往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

生产者将acks设置为all,是否就一定不会丢数据呢?
否!如果在某个时刻ISR列表只剩leader自己了,那么就算acks=all,收到这条数据还是只有一个点;
可以配合另外一个参数缓解此情况: 最小同步副本数>=2

其他的生产者参数

  • acks
    acks是控制kafka服务端向生产者应答消息写入成功的条件;生产者根据得到的确认信息,来判断消息发送是否成功;

  • max.request.size
    这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 1MB
    一般情况下,这个默认值就可以满足大多数的应用场景了。
    这个参数还涉及一些其它参数的联动,比如 broker 端(topic级别参数)的 message.max.bytes参数(默认1000012),如果配置错误可能会引起一些不必要的异常;比如将 broker 端的 message.max.bytes 参数配置为10B ,而 max.request.size参数配置为20B,那么当发送一条大小为 15B 的消息时,生产者客户端就会报出异常;

  • retries和retry.backoff.ms ==> 间隔时间 避免无效的重试
    retries参数用来配置生产者重试的次数,默认值为2147483647,即在发生异常的时候进行任何重试动作。
    消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试 。如果将 retries参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于1的值,那可能会出现错序的现象:如果批次1消息写入失败,而批次2消息写入成功,那么生产者会重试发送批次1的消息,此时如果批次1的消息写入成功,那么这两个批次的消息就出现了错序。
    对于某些应用来说,顺序性非常重要 ,比如MySQL binlog的传输,如果出现错误就会造成非常严重的后果;一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection 配置为1 ,而不是把retries配置为0,不过这样也会影响整体的吞吐。

  • compression.type
    这个参数用来指定消息的压缩方式,默认值为“none",即默认情况下,消息不会被压缩。该参数还可以配置为 “gzip”,“snappy” 和 “lz4”。对消息进行压缩可以极大地减少网络传输、降低网络I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;

  • batch.size
    每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在recordAccumulator里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。

  • linger.ms
    这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
    ProducerBatch 时间,默认值为0。生产者客户端会在ProducerBatch填满或等待时间超过linger.ms 值时发送出去。

  • enable.idempotence
    是否开启幂等性功能,详见后续原理加强;
    幂等性,就是一个操作重复做,也不会影响最终的结果!
    int a = 1;
    a++; // 非幂等操作
    val map = new HashMap()
    map.put(“a”,1); // 幂等操作
    在kafka中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性!

  • partitioner.class
    用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner

自定义partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口

消费者组再均衡分区分配策略

会触发rebalance(消费者)的事件可能是如下任意一种:

  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest 请求):比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
  • 消费组所对应的 GroupCoorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。
    将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何rebalance也涉及到分区分配策略。
    kafka有两种的分区分配策略:range(默认) 和 roundrobin(新版本中又新增了另外2种)
我们可以通过partition.assignment.strategy参数选择 range 或 roundrobin。
partition.assignment.strategy参数默认的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

Range Strategy

  • 先将消费者按照client.id字典排序,然后按topic逐个处理;
  • 针对一个topic,将其partition总数/消费者数得到商n和 余数m,则每个consumer至少分到n个分区,且前m个consumer每人多分一个分区;

Round-Robin Strategy

  • 将所有主题分区组成TopicAndPartition列表,并对TopicAndPartition列表按照其hashCode 排序
  • 然后,以轮询的方式分配给各消费者

Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:

  • 要去达成最大化的均衡
  • 尽可能保留各消费者原来分配的分区
    再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)

Cooperative Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:

  • 逻辑与sticky策略一致
  • 支持cooperative再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配)

消费者组再均衡流程

消费组在消费数据的时候,有两个角色进行组内的各事务的协调;
角色1: Group Coordinator (组协调器) 位于服务端(就是某个broker)
组协调器的定位:

coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上
查找Group Coordinator的方式:
先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets 中的分区编号;   分区数
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数offset.topic.num.partitions来配置,默认值是50;
找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;

角色2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)

再均衡流程

eager协议的再均衡过程整体流程如下图:
在这里插入图片描述

Cooperative协议的再均衡过程整体流程如下图:
在这里插入图片描述

再均衡监听器

代码示例:

package com.doitedu;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;


/**
 * 消费组再均衡观察
 */

public class ConsumerDemo2 {
    public static void main(String[] args) {
        //1.创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.订阅主题(确定需要消费哪一个或者多个主题)
        //我现在想看看如果我的消费者组里面,多了一个消费者或者少了一个消费者,他有没有给我做再均衡
        consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
            /**
             * 这个方法是将原来的分配情况全部取消,或者说把所有的分区全部回收了
             * 这个全部取消很恶心,原来的消费者消费的好好的,他一下子就给他全部停掉了
             * @param collection
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                System.out.println("我原来的均衡情况是:"+collection + "我已经被回收了!!");
            }
            /**
             * 这个方法是当上面的分配情况全部取消以后,调用这个方法,来再次分配,这是在均衡分配后的情况
             * @param collection
             */
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                System.out.println("我是重新分配后的结果:"+collection);
            }
        });

        while (true){
            consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/616524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniApp 页面通讯统一解决方案

文章目录 往期回顾统一解决方案uni.on和eventChannel之间的选择如何设置触发器最终范例距离 往期回顾 uniapp 踩坑记录 uni.$on为什么不能修改data里面的数据 uniApp页面通讯大汇总&#xff0c;如何页面之间传值 统一解决方案 uni.on和eventChannel之间的选择 uni.on和eve…

61082-041502PLF(0.80mm)40 位置 连接器 插座,G846A050210T1HR 集管和线壳 WTB 1.00 PITCH

61082-041502PLF&#xff08;0.80mm&#xff09;FCI紧凑型Bergstak连接器提供广泛的堆叠高度和电路尺寸&#xff0c;以支持广泛的夹层&#xff0c;板堆叠通信&#xff0c;数据和工业应用。 连接器类型&#xff1a;插座&#xff0c;外罩触点 针位数&#xff1a;40 间距&#xff…

浅谈互联网搜索之召回

一、背景 在搜索系统中&#xff0c;一般会把整个搜索系统划分为召回和排序两大子系统。本文会从宏观上介绍召回系统&#xff0c;并着重介绍语义召回。谨以此文&#xff0c;希望对从事和将要从事搜索行业的工作者带来一些启发与思考。 二、搜索系统召回方法 不同于推荐系统&…

6月6号软件资讯更新合集......

Yao 0.10.3 正式发布&#xff0c;拥抱 AIGC 时代&#xff01; ChatGPT 解锁了新的人机交互方式&#xff0c;人类可以与电脑直接交流了&#xff01;AIGC 时代已经到来&#xff0c;万千应用正在升级或重构&#xff0c;Yao 提供了一个开箱即用的解决方案&#xff0c;可以快速开发…

迷茫了3年:做完这个测试项目,我终于决定辞职!

2023年早已过半&#xff0c;来个迟到的年中总结&#xff0c;说实话&#xff0c;2023&#xff0c;很迷茫&#xff0c;然后过的非常不如意&#xff0c;倒不是上一年的职业目标没达到&#xff0c;而是接下来的路根本不知道如何走。在没解决这个问题之前&#xff0c;或者说没搞清楚…

Web3.0概念

学习web3您需要先掌握 JavaScript node React 后续 我们将学习一门新的语言 叫 Solidity 他是一种只能合约语言开发 我们利用web3将不再依赖后端 而是连接只能合约开发 首先 我们先不用急着写代码 还是要概念为先 首先 我们来对比 WEB1.0到3.0的概念 首先 web1.0 更多处于信…

AI实战营第二期——第一次作业:基于RTMPose的耳朵穴位关键点检测

题目&#xff1a;基于RTMPose的耳朵穴位关键点检测 背景 根据中医的“倒置胎儿”学说&#xff0c;耳朵的穴位反映了人体全身脏器的健康&#xff0c;耳穴按摩可以缓解失眠多梦、内分泌失调等疾病。耳朵面积较小&#xff0c;但穴位密集&#xff0c;涉及耳舟、耳轮、三角窝、耳甲…

Unity - 从RG中解压法线贴图

文章目录 环境目的问题解决References 环境 Unity : 2020.3.37f1 Pipeline : BRP 目的 备忘便于索引 问题 之前使用 GPA 还原一些效果的时候&#xff0c;发现 法线贴图的 Y 通道数值不对&#xff0c;感觉被 翻转了 比方说&#xff0c;下面是 GPA 中的法线 这个法线是 DX …

Ubuntu20.04安装EVO工具教程

EVO工具全名为“Python package for the evaluation of odometry and SLAM”&#xff0c;使用Python写的轨迹评估工具&#xff0c;目前在SLAM领域论文中的“使用率”逐渐上升&#xff0c;可以说已经成为了作为SLAMer一定要会用的工具。最近需要使用evo工具评测SLAM算法性能并可…

Dell服务器安装Ubuntu系统

1、下载镜像&#xff0c;做启动盘 镜像链接 http://old-releases.ubuntu.com/releases/20.04.2/ubuntu-20.04.2-live-server-amd64.iso 版本可以根据自己要求选择。 做启动盘 我用的是ultraiso 记得先格式化&#xff0c;再写入。 2、 设置BIOS启动 按F11&#xff0c;进入BIOS…

光线追踪是怎么影响渲染速度的,什么显卡可以支持?

在 3D 世界中&#xff0c;慢慢地人们倾向于让它尽可能逼真。他们可以应用许多技术和技巧&#xff0c;但有一种技术可以为您提供很多帮助&#xff0c;称为光线追踪。然而&#xff0c;众所周知&#xff0c;它是非常计算密集型的。在本文中&#xff0c;让我们进一步探讨它&#xf…

Java JUC并发编程

前言 1、JUC是指有关 java.util.concurrent包以及其子包&#xff0c;这些包都是有关线程操作的包 2、HTTPS服务请求中&#xff0c;WEB服务只负责创建主线程来接收外部的HTTPS请求&#xff0c;如果不做任何处理&#xff0c;默认业务逻辑是通过主线程来做的&#xff0c;如果业务…

Linux文件基础IO

目录 C文件IO相关操作 介绍函数 文件相关系统调用接口 接口介绍 fd文件描述符 重定向 缓冲区 inode 软硬链接 动静态库 库的制作 制作静态库 制作动态库 使用库 使用静态库 使用动态库 C文件IO相关操作 介绍函数 打开文件 参数介绍&#xff1a; const char*…

MySQL的explain字段解释

MySQL的explain字段解释 ,type类型含义:1.id 2.select_type 3.table 4.type(重要) 5.possible_keys 6.possible_keys 7. key 8.key_len 9. ref 10. rows(重要) 11. filtered 12. Extra(重要) 如下: Explain命令是查看查询优化器是如何决定执行查询的主要方法。这个功…

Firewalld防火墙详解

文章目录 Firewalld防火墙什么是防火墙Firewalld防火墙的概念Firewalld防火墙运行模式Firewalld防火墙的命令Firewalld防火墙的高级规则 Firewalld防火墙 什么是防火墙 防火墙&#xff1a;防范一些网络攻击。有软件防火墙、硬件防火墙之分。 硬件防火墙和软件防火墙的主要区…

【软件开发】MyBatis 理论篇

MyBatis 理论篇 1.MyBatis 是什么&#xff1f; MyBatis 是一个半 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;它内部封装了 JDBC&#xff0c;开发时只需要关注 SQL 语句本身&#xff0c;不需要花费精力去处理加载驱动、创建连接、创建 statement 等繁杂的过程。…

初识网络之协议定制

目录 一、数据在网络传输中遇到的部分问题 1. 序列化与反序列化 2. 如何保证接收端读取一个完整的报文 二、实现一个简单的网络计算器 1. 客户端处理 1.1 请求结构体和返回结构体 1.2 解析输入的字符串 1.3 序列化 1.4 添加标识符和字符串长度 1.5 接收服务端返回的数…

浏览器的回流与重绘与事件循环

浏览器的回流与重绘和事件循环 浏览器回流浏览器重绘事件循环 浏览器回流 什么是浏览器回流&#xff1f; 回流是通过JS代码让布局或者几何属性改变&#xff0c;使部分页面或者整个页面更新的过程 浏览器重绘 剩下的是浏览器重绘&#xff1a;比如改变div的visibility, color、…

如何使用Foxmail 7.2.25版本登录Microsoft 365 国内版(即世纪互联版)邮箱

近期微软在全球取消了在Exchange Online 的基本身份验证&#xff0c;取消了之后只有适配微软新式验证的客户端才支持登录&#xff0c;以往的直接配置IMAP/POP服务器地址和邮箱账号密码来登录的方式已经行不通了。 详情可以点击此链接了解&#xff1a;弃用 Exchange Online 中的…

APP性能测试中的几个重要概念,你都知道吗?

目录 前言 一. 内存  二. CPU 三. 流量 四. 电量 五. 启动时间 六. 总结 前言 我们在使用各种 App 的时候基本会关注到&#xff1a;这款软件挺耗流量的&#xff1f;运行起来设备掉电有点快嘛&#xff1f;切换页面的时候还会有卡顿等现象&#xff1f;如果遇到有这些问题…