Kafka~消息发送过程与ISR机制了解

news2024/9/19 10:48:01

消息发送过程

使用Kafka发送消息时,一般有两种方式分别是:

  1. 同步发送
  2. 异步发送

同步发送时,可以在发送消息后,通过get方法等待消息结果,这种情况能够准确的拿到消息最终的发送结果,要么是成功、要么是失败。

而异步发送,是采用了callback的方式进行回调的,可以大大的的提升消息的吞吐量,也可以根据回调来判断消息是否发送成功。

不管是同步发送还是异步发送,最终都需要在Producer端把消息发送到Broker中,那么这个过程大致如下:
在这里插入图片描述
Kafka的Producer在发送消息时通常涉及两个线程,主线程(Main)和发送线程(Sender)和一个消息累加器(RecordAccumulator)

  • Main线程是Producer的入口,负责初始化Producer的配置、创建KafkaProducer实例并执行发送逻辑。它会按照用户定义的发送方式(同步或异步)发送消息,然后等待消息发送完成。一条消息的发送,在调用send方法后,会经过拦截器、序列化器及分区器。拦截器主要用于在消息发送之前和之后对消息进行定制化的处理,如对消息进行修改、记录日志、统计信息等。序列化器负责将消息的键和值对象转换为字节数组,以便在网络上传输。分区器决定了一条消息被发送到哪个Partition中。它根据消息的键(如果有)或者特定的分区策略,选择出一个目标Partition。

  • RecordAccumulator在Kafka Producer中起到了消息积累和批量发送的作用,当Producer发送消息时,不会立即将每条消息发送到Broker,而是将消息添加到RecordAccumulator维护的内部缓冲区中,RecordAccumulator会根据配置的条件(如batch.size、linger.ms)对待发送的消息进行批量处理。当满足指定条件时,RecordAccumulator将缓冲区中的消息组织成一个批次(batch),然后一次性发送给Broker。如果发送失败或发生错RecordAccumulator可以从将消息重新分配到新的批次中进行重试。这样可以确保消息不会丢失,同时提高消息的可靠性。

  • Send线程是负责实际的消息发送和处理的。发送线程会定期从待发送队列中取出消息,并将其发送到对应的Partition的 Leader Broker上。它主要负责网络通信操作,并处理发送请求的结果,包括确认的接收、错误处理等。

  • NetworkClient和Selector是两个重要的组件,分别负责网络通信和1/0多路复用。

发送线程会把消息发送到Kafka集群中对应的Partition的Parrtition Leader,Partition Leader接收到消息后,会对消息进行一系列的处理。它会将消息写入本地的日志文件(Log),存储为segment文件,因为是顺序写,segment文件也是顺序截断,为了保证数据的可靠性和高可用性,Kafka使用了消息复制机制。Leader Broker接收到消息后,会将消息复制到其他副本(Partition Follower)。副本是通过网络复制数据的,它们门会定期从LeaderBroker同步消息。

每一个Partition Follower在写入本地log之后,会向Leader发送一个TACK,但是我们的Producer其实也是需要依赖ACK才能知道消息有没有投递成功的,而这个ACK是何时发送的,Producer又要不要关心呢? 这就涉及到了kafka的ack机制,生产者会根据设置的request.required.acks参数不同,选择等待或或直接发送下一条消息:

  • request.required.acks = 0
    表示Producer不等待来自Leader的ACK确认,直接发送送下一条消息。在这种情况下,如果Leader分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
  • request.required.acks = 1
    表示Producer等待来自Leader的ACK确认,当收到确认人后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader服务器,但并不保证Follow节点已已经同步完成。所以如果在消息已经被写入Leader分片,但是还未同步到Follower节点,此时Leade分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
  • request.required.acks = -1
    Leader会把消息复制到集群中的所有ISR(In-Sync Replicas,同步副本),要等待所有ISR的ACK确认后,再向Producer发送ACK消息,然后Producer再继续发下下一条消息。

ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。以下是关于 Kafka 的 ISR 机制的详细介绍:

  • 意义:

ISR 机制动态维护了一个与 Leader 副本保持同步的副本集合,只有在 ISR 集合中的副本才有资格参与 Leader 的选举。通过 ISR 机制,可以确保在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。

  • 用途:
  1. 保证数据可靠性:ISR 机制通过副本冗余机制,提供了 Kafka 消息的高可靠性。
  2. 实现故障转移:ISR 机制可以做到故障转移,保障服务的可用性。当 Leader 副本出现故障时,Kafka 会从 ISR 集合中选举出新的 Leader,从而保证服务的连续性。
  3. 平衡复制方案:ISR 机制平衡了主从架构下,复制方案的选择(同步/异步/少数服从多数),让使用者根据参数自行选择。
  • 实现方式:
  1. 数据同步:Leader Replica 接收到 Producer 发送的消息后,将其写入本地日志,并通过 Pull 模式等待 Follower Replica 主动拉取。Follower Replica 从 Leader Replica 拉取数据并写入本地日志后,将拉取偏移量(fetch offset)返回给 Leader。
  2. 同步状态监测:Leader Replica 持续监控每个 Follower Replica 的拉取偏移量,将其与自身的最新消息偏移量(log end offset)进行比较。若 Follower Replica 的拉取偏移量与 Leader 相差不超过一定阈值(由 replica.lag.time.max.ms 参数控制),则认为该 Follower 处于同步状态,将其纳入 ISR。
  3. ISR 调整:当 Follower Replica 因网络延迟、Broker 故障等原因导致拉取偏移量落后过多,超出阈值时,Leader Replica 会将其从 ISR 中移除。当 Follower Replica 恢复同步后,再次将其加入 ISR。
  • 详细过程

当消息被写入Kafka的分区时,它首先会被写入Leader,然后LLeader将消息复制给ISR中的所有副本。只有当ISR中的所有副本都成功地接收到并确认了消息后,主副本才会认为消息已成功提交。这种机制确保了数据的可靠性和一致性。

在Kafka中,ISR(In-Sync Replicas)列表的维护是通过副本状态和配置参数来进行的。具体的ISR列表维护机制在不同的Kafka版本中有所变化。

  • 在0.9.x之前的版本,Kafka有一个核心的参数:replica.lag.max.messages,表示如果Follower落后
    Leader的消息数量超过了这个参数值,就认为Follower就会从ISR列表里移除。

但是,基于replica.lag.max.messages 这种实现,在瞬间高并发访问的情况下会有问题:比如Leader瞬间接收到几万条消息,然后所有Follower还没来得及同步过去,此时所有follower都会被踢出ISR列表。

  • Kafka从0.9.x版本开始,引入了 replica.lag.max.ms参数,表示如果果某个Follower的LEO (latest end
    offset)一直落后Leader超过了10秒,那么才会被从ISR列表里移除。

这样的话,即使出现瞬间流量,导致Follower落后很多数据,但是只要在限定的时间内尽快追上来就行了。

总之,通过 ISR 机制,Kafka 可以保证在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。同时,ISR 机制也可以提高 Kafka 系统的可靠性和可用性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1877947.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AES加密算法及AES-CMAC原理白话版系统解析

本文框架 前言1. AES加密理论1.1 不同AES算法区别1.2 加密过程介绍1.2.1 加密模式和填充方案选择1.2.2 密钥扩展1.2.3分组处理1.2.4多轮加密1.2.4.1字节替换1.2.4.2行移位1.2.4.3列混淆1.2.4.4轮密钥加1.3 加密模式1.3.1ECB模式1.3.2CBC模式1.3.3CTR模式1.3.4CFB模式1.3.5 OFB模…

社团成员信息系统

ER实体关系图与数据库模型 DDL CREATE TABLE club (club_id int(11) NOT NULL AUTO_INCREMENT,club_name varchar(100) NOT NULL,president_name varchar(50) DEFAULT NULL,foundation_date date DEFAULT NULL,description text,PRIMARY KEY (club_id),KEY president_name (pr…

虚拟化技术(二)

目录 三、存储虚拟化(一)存储虚拟化的一般模型(二)存储虚拟化的实现方式(三)案例分析 四、网络虚拟化(一)核心层网络虚拟化(二)接入层网络虚拟化(…

生成独立的zedboard+ad9361起始项目

文件分享 链接:https://pan.baidu.com/s/17wB_9xVWjO7HhxNvmmZyuA 提取码:94zz 首先下载HDL和NO-OS项目 git clone --recursive https://github.com/analogdevicesinc/hdl git clone --recursive https://github.com/analogdevicesinc/no-OS下载…

L03_Redis知识图谱

这些知识点你都掌握了吗?大家可以对着问题看下自己掌握程度如何?对于没掌握的知识点,大家自行网上搜索,都会有对应答案,本文不做知识点详细说明,只做简要文字或图示引导。 Redis 全景图 Redis 知识全景图都包括什么呢?简单来说,就是“两大维度,三大主线”。 Redis …

基于springboot实现学生用品采购系统项目【项目源码+论文说明】

基于springboot实现学生用品采购系统演示 摘要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此,在计算机上安装学生用品采购系统软…

STM32CubeMx的学习记录系列(2)- STM32G474RET6

最近有个小比赛,需要用到G4,不过找了一圈没有找到标准库的代码,只能使用hal,用CubeMX来生成配置代码。 共同特点 ARDUINO Uno V3 扩展连接器 ST morpho 扩展引脚接头,可完全访问所有 STM32 I/O 采用LQFP64或LQFP48封…

算法 —— 双指针

目录 移动零 复写零 快乐数 盛最多水的容器 有效三角形的个数 查找总价格为目标值的两个商品 三数之和 四数之和 移动零 下图以样例1为例,看下图如何做到保证非零元素相对顺序前提下,移动零元素。 代码实现如下: class Solution {…

1,Windows-本地Linux 系统(WSL)

目录 第一步电脑设置 第二步安装Ubuntu 第三文件传递 开发人员可以在 Windows 计算机上同时访问 Windows 和 Linux 的强大功能。 通过适用于 Linux 的 Windows 子系统 (WSL),开发人员可以安装 Linux 发行版(例如 Ubuntu、OpenSUSE、Kali、Debian、Arc…

如何有效保护生物医药企业隔离网数据导出的安全性?

生物医药企业的核心数据保护至关重要,企业为了保护内部的核心数据,会将网络进行物理隔离,将企业内⽹与外⽹隔离。⽹络隔离后,仍存在重要数据从内网导出至外网的隔离网数据导出需求。以下是一些需要特别保护的核心数据类型&#xf…

小米平板6系列对比

小米平板6系列目前有4款,分别为6、6 Pro、6 Max、6S Pro。具体对比如下表所示。 小米平板型号66 Pro6 Max6S Pro实物图发布时间2023年4月21日2023年4月21日2023年8月14日2024年2月22 日屏幕大小11英寸11英寸14英寸12.4英寸分辨率2.8K2.8K2.8K3K刷新率144Hz144Hz120…

EtherCAT笔记(四)——EtherCAT数据帧结构

EtherCAT数据包含2B的数据头和44~1948B的数据区。数据区由多个子报文组成。由于EtherCAT本身是通过以太网数据帧的形式传输,因此其协议帧中会携带以太网的帧头。 其中,解释如下: (1)以太网数据帧头:EtherC…

VSCode + GDB + J-Link 单片机程序调试实践

VSCode GDB J-Link 单片机程序调试实践 本文介绍如何创建VSCode的调试配置,如何控制调试过程,如何查看修改各种变量。 安装调试插件 在 VSCode 扩展窗口搜索安装 Cortex-Debug插件 创建调试配置 在 Run and Debug 窗口点击 create a launch.json …

C语言力扣刷题11——打家劫舍1——[线性动态规划]

力扣刷题11——打家劫舍1和2——[线性动态规划] 一、博客声明二、题目描述三、解题思路1、线性动态规划 a、什么是动态规划 2、思路说明 四、解题代码(附注释) 一、博客声明 找工作逃不过刷题,为了更好的督促自己学习以及理解力扣大佬们的解…

日志分析-windows系统日志分析

日志分析-windows系统日志分析 使用事件查看器分析Windows系统日志 cmd命令 eventvwr 筛选 清除日志、注销并重新登陆,查看日志情况 Windows7和Windowserver2008R2的主机日志保存在C:\Windows\System32\winevt\Logs文件夹下,Security.evtx即为W…

有哪些防爬虫的方法

防爬虫的方法有robots.txt文、user-agent过滤、ip限制、验证码、动态页面生成、频率限制、动态url参数和反爬虫技术等。详细介绍:1、robots.txt文件,用于告诉搜索引擎爬虫哪些页面可以访问,哪些页面禁止访问;2、ip限制&#xff0c…

面试-J.U.C包的梳理

1.J.U.C包的梳理 Java.Util.Concurrent包简称JUC (1)JUC整体架构图 (2)分析 Executor:线程执行器,任务执行和调度的框架。Tools下存在executor相关的executors类,用于创建executorservice,scheduleexecutorservice,…

获取 url 地址栏 ? 后面的查询字符串,并以键值对形式放到对象里面

写在前面 在前端面试当中,关于 url 相关的问题很常见,而对于 url 请求参数的问题也很常见,大部分以笔试题常见,今天就根据这道面试题一起来看一下。 问题 获取 url 地址栏?后面的查询字符串,并以键值对形式放到对象…

CICD持续集成(Jenkins+Git+Gogs)

1.Jenkins Jenkins 是一个开源的、用于构建和自动化软件开发流程的持续集成和交付工具。它提供了一个可扩展的平台,用于构建、测试和部署软件项目。通过使用 Jenkins,开发团队可以实现持续集成和交付,自动化构建和测试过程,提高软…

Rust监控可观测性

可观测性 在监控章节的引言中,我们提到了老板、前端、后端眼中的监控是各不相同的,那么有没有办法将监控模型进行抽象、统一呢? 来简单分析一下: 业务指标实时展示,这是一个指标型的数据( metric )手机 APP 上传的数…