【Kafka系列 07】Kafka 如何保证消息不丢失

news2024/11/20 7:30:02

一、Kafka 消息不丢失的边界

一直以来,很多人对于 Kafka 丢失消息这件事情都有着自己的理解,因而也就有着自己的解决之道。在讨论具体的应对方法之前,我觉得我们首先要明确,在 Kafka 的世界里什么才算是消息丢失,或者说 Kafka 在什么情况下能保证消息不丢失。这点非常关键,因为很多时候我们容易混淆责任的边界,如果搞不清楚事情由谁负责,自然也就不知道由谁来出解决方案了。

那 Kafka 到底在什么情况下才能保证消息不丢失呢?

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

第一个核心要素是 已提交的消息。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。

那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是 有限度的持久化保证,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器了。

现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

二、消息丢失案例

案例1:生产者程序丢失数据

Producer 程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。我来描述一个场景:你写了一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存,于是大骂:“Kafka 真烂,消息发送居然都能丢失,而且还不告诉我?!”如果你有过这样的经历,那么请先消消气,我们来分析下可能的原因。

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。

这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。

如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。

不过,就算不是 Kafka 的“锅”,我们也要解决这个问题吧。实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。

案例2:消费者程序丢失数据

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。

 比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。

正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。

同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。

当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。在专栏后面的内容中,我会跟你分享如何应对重复消费的问题。

除了上面所说的场景,其实还存在一种比较隐蔽的消息丢失场景。

我们依然以看书为例。假设你花钱从网上租借了一本共有 10 章内容的电子书,该电子书的有效阅读时间是 1 天,过期后该电子书就无法打开,但如果在 1 天之内你完成阅读就退还租金。

为了加快阅读速度,你把书中的 10 个章节分别委托给你的 10 个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这 10 个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这 10 个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。

对于 Kafka 而言,这就好比 Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。

这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。

这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移

三、最佳实践

看完这两个案例之后,总结一下 Kafka 无消息丢失的最佳实践配置,每一个其实都能对应上面提到的问题。

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数,用来设置分区的副本数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

四、小结

今天,我们讨论了 Kafka 无消息丢失的方方面面。我们先从什么是消息丢失开始说起,明确了 Kafka 持久化保证的责任边界,随后以这个规则为标尺衡量了一些常见的数据丢失场景,最后通过分析这些场景,我给出了 Kafka 无消息丢失的“最佳实践”。总结起来,我希望你今天能有两个收获:

  • 明确 Kafka 持久化保证的含义和限定条件。
  • 熟练配置 Kafka 无消息丢失参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1500524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为云开年采购季Web及移动App上云体验,助力软件行业创新发展

随着云化、智能化浪潮的进一步深入,越来越多的应用软件开发商选择将核心产品从本地IDC机房搬迁到公有云上。但同时,软件开发商们也非常在意公有云厂商的可靠性与安全性,希望能够选择一家更加稳定可靠的云服务商,确保自身业务的连续…

02极简LLM逻辑与PyTorch快速入门

文章目录 02极简LLM逻辑与PyTorch快速入门极简LLM逻辑PyTorch环境安装(重要,不难)PyTorch 主要概念Tensors张量张量常见的形式:scalar、vector、matrix、n-dimensinal张量初始化张量参数:shape、datatype、device张量运…

从huggingface下载模型像本地加载但是UnicodeDecodeError

我自己是在Linux下出现了这个问题 原文:https://github.com/huggingface/transformers/issues/13674 The path for the AutoModel should be to a directory pointing to a pytorch_model.bin and to a config.json. Since you’re pointing to the .bin file dire…

论文笔记:Efficient Bootstrapping for Confidential Transactions

EcoBoost: Efficient Bootstrapping for Confidential Transactions 设计了一种被称为EcoBoost的新方法,以提高支持机密交易的区块链的引导效率。具体来说,利用随机抽样来验证高概率保密交易的正确性。因此,与事务数量相比**,验证…

Promise async await

简介:回调 JS会提供很多函数,允许异步行为。换句话说,现在开始执行的行为。但它们会在稍后完成。异步执行某项功能的函数应该提供一个 callback 参数用于在相应事件完成时调用。处理Error: 加载成功时,它会调用 callb…

Z Potentials | 星爵,他的征途不止向量数据库

纵观过去几十年的科技发展史,每一代新的技术架构的出现往往都伴随着新的数据范式的出现,也催生了多家百亿到千亿美金数据平台的诞生。如果说 2023 年科技领域的关键词是 LLM,那么数据库领域的关键词一定非向量数据库莫属。向量数据库是一种专…

我们是如何测试人工智能产品的

在当今数字化时代,人工智能(AI)技术已经成为我们生活中不可或缺的一部分。然而,要构建出可信赖的AI系统并非易事。这需要我们不仅深入理解人工智能的核心原理,还需要将这些理论知识应用到实际场景中。 为了帮助大家系…

一个不错的空间视频收集论坛

该网站收录了来自世界各地的空间视频、空间照片和全景照片,以突出令人惊叹的 Apple Vision Pro 的功能。 网站地址:

Java 客户端向服务端上传文件(TCP通信)

一、实验内容 编写一个客户端向服务端上传文件的程序,要求使用TCP通信的的知识,完成将本地机器输入的路径下的文件上传到D盘中名称为upload的文件夹中。并把客户端的IP地址加上count标识作为上传后文件的文件名,即IP(count&#…

mysql中insert … select锁范围

1、执行 insert … select 的时候,对目标表也不是锁全表,而是只锁住需要访问的资源。 例如, CREATE TABLE t (id int(11) NOT NULL AUTO_INCREMENT,c int(11) DEFAULT NULL,d int(11) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY c (c) ) EN…

IP定位技术在金融风控中的应用研究

随着金融科技的快速发展,金融行业的风险也呈现出多样化、复杂化的特点。金融风控作为保障金融安全的重要手段,其面临的挑战也日益加剧。在这样的背景下,IP定位技术作为一种先进的信息技术手段,正逐渐成为金融风控领域的重要工具。…

精酿啤酒:原料来源的追溯与认证体系

为了确保啤酒品质的可靠性和安全性,Fendi Club啤酒建立了一套完善的原料来源追溯与认证体系。这套体系旨在确保从原料采购到生产过程的每一个环节都能得到进一步监控和管理,从而提高产品质量,降低风险。 Fendi Club啤酒对原料供应商进行严格的…

【kerberos】hadoop集群使用keytab认证的逻辑

一、背景: haoop的kerberos认证核心是org.apache.hadoop.security.UserGroupInformation类。 UserGroupInformation一般有两种:(1)apache原生的(2)cdh hdp改良过的,即cloudera改良过的。 由此衍…

Kube-Prometheus 监控Istio

推荐 Istio 多集群监控使用 Prometheus,其主要原因是基于 Prometheus 的分层联邦(Hierarchical Federation)。 通过 Istio 部署到每个集群中的 Prometheus 实例作为初始收集器,然后将数据聚合到网格层次的 Prometheus 实例上。 网…

【Web】浅聊Java反序列化之C3P0——不出网Hex字节码加载利用

目录 简介 原理分析 EXP 前文:【Web】浅聊Java反序列化之C3P0——URLClassLoader利用 简介 不出网的情况下,这个C3P0的Gadget可以和fastjson,Snake YAML , JYAML,Yamlbeans , Jackson,Blazeds,Red5, Castor等配合使用(调用setter和初始化…

【php】【mysql】 原生初级简易新闻发布系统成品代码动态网站开发网页WEB浏览器端B/S结构

【php】【mysql】 原生初级简易新闻发布系统成品代码动态网站开发网页WEB浏览器端B/S结构 一级目录二级目录三级目录 获取源码方式项目说明:项目运行环境文件包含运行截图程序代码结构图新闻首页图新闻详情图新闻列表管理图个人信息管理图登录界面新闻详情新闻发布 …

基于JavaWeb的NBA赛事信息管理系统

目 录 摘 要 I Abstract II 引 言 1 1 系统概述 3 1.1 系统开发背景及意义 3 1.2 开发环境、工具 3 1.3 相关技术 4 1.3.1 前端技术 4 1.3.2 数据库技术 4 1.4 本章小结 4 2 系统分析 5 2.1 功能需求分析 5 2.2 性能需求分析 5 2.3 本章小结 6 3 系统设计 7 3.1 功能设计 7 3.…

拼多多1000元虚拟店铺免4万保证金

众所周知拼多多现在流量非常大,虚拟也算是蓝海,想做的人大部分都被保证金拦在门外,高达4W的保证金不是每个人都能承受的,正好在当下有一个方法可以解决这个苦恼。 拼多多虚拟店铺免保证金玩法现在处于前期阶段,很多人…

如何搭建 Jenkins 自动化测试平台?

前言 在进行平台搭建前,我们首先要问自己:我需要搭建的平台的功能是什么,要实现什么目标? 在我的理解中,自动化构建平台的执行流程(目标)是: 我们将代码提交到代码托管工具上&…

Linux 关于NTP同步硬件时钟的可靠性验证

Linux关于NTP同步硬件时钟的可靠性验证 1. 常见的时钟类型1.1 系统时钟1.2 硬件时钟 2. 常见时钟同步方式2.1 ntpd服务2.1.1 推荐配置/etc/ntp.conf2.1.2 推荐配置/etc/sysconfig/ntpd 2.2 定时任务ntpdate2.3 ntp命令同步状态相关命令解读2.3.1 ntpq -pn解读2.3.2 ntpdate -u解…