Kafka 消息丢失如何处理?

news2024/9/24 19:23:51

今天给大家分享一个在面试中经常遇到的问题:Kafka 消息丢失该如何处理?

这个问题啊,看似简单,其实里面藏着很多“套路”。

来,咱们先讲一个面试的“真实”案例。

图片

面试官问:“Kafka 消息丢失如何处理?”

小明一听,反问:“你是怎么发现消息丢失了?”

面试官顿时一愣,沉默了片刻后,可能有点不耐烦,说道:“这个你不用管,反正现在发现消息丢失了,你就说如何处理。”

小明一头雾水:“问题是都不知道怎么丢的,处理起来岂不是瞎搞吗?”

画面一黑,面试官离开了会议室,留小明一个人凌乱在风中……

👀 这段子虽然搞笑,但实际工作中,确实“消息丢失”这个事儿有点让人摸不着头脑。

大家有没有想过:消息丢失的定义到底是什么?其实,发现消息丢失的过程,才是处理问题的关键!

图片

在用 Pub/Sub 类中间件,比如 Kafka 或 RocketMQ 时,消息丢失可能有很多原因,包括生产者、消费者和网络传输等各个环节。

我们今天就结合实际工作中遇到的情况,聊聊到底怎么发现消息丢失,又该怎么处理。😎

首先,我们要搞清楚消息丢失的几种典型场景:

1.生产者消息发送失败:这个比较简单,如果生产者发消息时,网络抖动、服务宕机或 Kafka broker 挂了,那消息就丢了。

这时候生产者通常会重试,但是如果重试策略不当,还是可能丢消息。

   

2.消费者消费消息失败:最常见的是消费者拉取了消息,但是业务处理失败,或者消费后没有提交 offset,导致消息“看似”消费了,实际根本没处理。

这种情况不算真正的消息丢失,但你业务数据不一致,这锅还是要 Kafka 来背。😂

3. 网络异常导致消息丢失:有时候消息发送成功了,但是因为网络问题,导致消费者没能拉到这些消息,这类情况更难排查。

OK,分析了几种可能性,接下来看看有哪些方法可以帮助我们及时发现这些问题。

1.监控和告警系统

  

监控是最基础的保障手段。一般来说,Kafka 提供了很多指标可以监控,比如生产端和消费端的吞吐量、消息积压(lag)情况、消费者组的 offset 等等。

通过这些监控指标,一旦消费端的消息积压开始异常增长,或者 offset 停滞不前,就说明很可能有消息丢失了。

很多公司会用 Prometheus + Grafana 来做监控和可视化,再配合告警系统(如 Alertmanager)实时提醒。

比如可以监控 `kafka_consumer_lag` 这个指标,一旦消息积压超过预设阈值,就触发告警。

# Prometheus 配置监控 Kafka 消费者积压
kafka_consumer_lag{consumer_group="your-consumer-group", topic="your-topic"} > 100

在工作中,这类告警往往是消息丢失的第一个信号,反应速度极快。

2.消息追踪机制

消息追踪就像在每个消息上打个“追踪码”,确保每条消息都能被追踪到。

具体做法是:生产者在发送每条消息时,生成一个唯一的 `message_id`,消费者在消费时同样记录消费的 `message_id`。

通过对比生产端和消费端的 ID,就可以发现有没有消息“掉队”了。

在实际应用中,通常会通过日志来记录这些 `message_id`,并定期检查对账,保证所有消息都正确处理了。

// 生产者发送消息时生成 message_id
String messageId = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", messageId, messageContent);
producer.send(record);

// 消费者消费消息时记录 message_id
public void consumeMessage(ConsumerRecord<String, String> record) {
    String messageId = record.key();  // 获取 message_id
    // 将 message_id 存储到日志或数据库中,用于后续追踪
    log.info("Consumed message with ID: {}", messageId);
}

3.消息确认机制

Kafka 本身有个很经典的机制,就是手动提交offset。消费者在处理完消息后,才提交消费位置的 offset。

如果消费失败了,不提交 offset,Kafka 就会重新分配这条消息,避免消息丢失。

很多时候,消息丢失的“锅”其实是消费者自己在消费时出了问题,明明没处理完却偷偷提交了 offset,让 Kafka 以为消息已经处理完毕了。

手动提交 offset 就能很好地避免这种情况。

public void consumeMessages() {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // 处理消息逻辑
            processMessage(record);
            // 成功处理后提交 offset
            consumer.commitSync();
        } catch (Exception e) {
            // 处理失败不提交 offset,Kafka 会重试
            log.error("Failed to process message, will retry.", e);
        }
    }
}

​​​​​​​

4.消息重试和补偿机制

为了解决偶发性的消费失败,很多公司会为 Kafka 消费端加一个重试机制。

当消息处理失败时,重新将消息放回队列,或者放到一个死信队列(Dead Letter Queue, DLQ)里,然后专门处理这些异常消息。

// 如果消息处理失败,将其放回死信队列
try {
    processMessage(record);
} catch (Exception e) {
    producer.send(new ProducerRecord<>("dlq-topic", record.key(), record.value()));
}

这个方式虽然不能彻底避免消息丢失,但能保证消息不会轻易丢失,特别是一些重要业务场景中,消息的可靠性至关重要。

5.多副本存储

Kafka 还有一个核心功能,就是多副本机制,即消息在多个 broker 上都有副本。这样即使某个 broker 挂了,其他副本也能提供消息。

通过设置 `replication.factor` 参数,我们可以指定 Kafka 每条消息的副本数,确保即使一台机器挂了,消息也不会丢失。

# Kafka Topic 多副本配置
replication.factor=3
 

最后,真正发现消息丢失了,怎么办呢?这里有一些基本的补救措施:

1.检查消费端日志:首先要确定消息到底有没有消费。如果消费端日志显示消费失败,重新处理即可。

   

2.重发消息:如果消费端确实没处理成功,可以将消息重新发送到 Kafka,或者从备份中恢复并重放消息。

3.处理丢失后的补偿:业务上可能会涉及补偿措施,比如通知相关人员手动处理,或者对丢失的数据进行回补。

总之,消息丢失不算是特别常见的问题,但一旦遇到,还是需要冷静排查问题源头。

Kafka 等 Pub/Sub 中间件本身已经有比较强大的机制来应对这些场景,只要结合业务需求,做好监控和容错机制,基本都能把问题压到最小。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2138424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM+Vue+MySQL的在线医疗服务系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着医疗信息化的快速发展和患者对便捷医疗服务需求的日益增长&#xff0c;开发一个高效、可靠的在线医疗服务系统显得尤为重要。基于SSM&#xff08;SpringSpring MVCMyBatis&#xff09;框架、前端采用Vue.js、后端连接MySQL数…

CrossOver24.0.5破解版免费下载和永久激活图文教程,苹果电脑怎么玩《黑神话:悟空》

CrossOver24可以玩《黑神话&#xff1a;悟空》么&#xff1f;答案是可以的。 1、首先我们需要下载CrossOver24软件。 CrossOver24安装包夸克网盘链接&#xff1a;https://pan.quark.cn/s/35e64d746778 2、下载完成后&#xff0c;我们双击CrossOver.pkg开始安装&#xff0c;然…

LeetCode[简单] 141.环形链表

给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#xff08;…

C++初阶学习——探索STL奥秘——模拟实现list类

1、基本框架 list 由三个类构建而成: 节点类:每个节点必须的三部分(指向前一个节点的指针、指向后一个节点的指针、当前节点存储的数据) 迭代器类:此时的迭代器为双向迭代器&#xff0c;比较特殊&#xff0c;需要对其进行封装&#xff0c;如 it并非使迭代器单纯向后移动&…

QT添加图标标题和打包项目

QT项目打包 项目的标题和图标标题项目图标exe图标 可执行文件——生成exeexe运行报错“找不到qt6gui.dll”等 相关库文件——生成zip安装包打包程序——生成exe安装包 项目的标题和图标 项目打包要好看点&#xff0c;得有个好点的标题和图标&#xff0c;这次打包的项目是我上一…

excel如何快速选中某个数字或者某串数字

鼠标光标放在某个数字或者某串数字的末尾&#xff0c;进行双击鼠标左键即可 &#xff08;就会选中当前鼠标光标前相邻的所有数字&#xff09;&#xff1a;

【Node.js】RabbitMQ 延时消息

概述 在 RabbitMQ 中实现延迟消息通常需要借助插件&#xff08;如 RabbitMQ 延迟队列插件&#xff09;&#xff0c;因为 RabbitMQ 本身不原生支持延迟消息。 延迟消息的一个典型场景是&#xff0c;当消息发布到队列后&#xff0c;等待一段时间再由消费者消费。这可以通过配置…

【拦截导弹】

​ 题目 ​​ 题解 题解&#xff1a;1010. 拦截导弹&#xff08;dp与贪心&#xff09; - AcWing 我谈几点&#xff1a; 第一&#xff0c;由此复习了upper_bound和lower_bound函数 第二&#xff0c;由此学习了贪心方式求“最多分割不严格递减子序列的数目”和“最长不严格递…

算法参数对拥塞控制的影响

来看看参数对公平收敛的影响。仅假象一下就知道应该是个加权公平&#xff0c;但事实如何&#xff0c;还是要具体看一下。 首先看 aimd&#xff0c;标准的 reno 算法是每 round 之后 cwnd 加 1&#xff0c;但如果有些流加 1&#xff0c;有些流加 2&#xff0c;会如何&#xff1…

踩坑【已解决】:使用maven打印结果是控制台输出中文乱码

报错原图&#xff1a; 解决方案&#xff1a; 1、修改maven->runner中的配置添加如下信息&#xff1a; -Dfile.encodingUTF-8 2、检查编码的配置信息&#xff1a; 3、检查窗口右下角的配置信息&#xff1a; 解决结果&#xff1a;

SEGGERS实时系统embOS推出Linux端模拟器

SEGGER 发布了两个新的 embOS 仿真模拟器&#xff1a;embOS Sim Linux 和 embOS-MPU Sim Linux。 通过模拟 Linux 主机系统上的硬件&#xff0c;取代物理硬件&#xff0c;为开发人员提供了一种无缝的方式来构建原型和测试应用程序。 embOS Sim Linux 端口支持 32 位和 64 位系…

【在Linux世界中追寻伟大的One Piece】网络命令|验证UDP

目录 1 -> Ping命令 2 -> Netstat命令 3 -> Pidof命令 4 -> 验证UDP-Windows作为client访问Linux 4.1 -> UDP client样例 1 -> Ping命令 Ping命令是一种网络诊断工具&#xff0c;它使用ICMP(Internet Control Message Protocol&#xff0c;互联网控制消…

CAN BUS

CAN BUS 原理 网上资料非常丰富&#xff0c;是车载系统主要BUS之一。 我们关注如下方面 can bus 是什么网络结构CAN BUS 协议ECU node实现其他 What is CAN Bus? Control Area Network (CAN) bus is a serial communication protocol that allows devices to exchange dat…

MySQL:视图【详解】

1、视图 1.1 视图的定义 视图是在数据库中定义的虚拟表。它是一个基于一个或多个实际表的查询结果集&#xff0c;可以像实际表一样被查询和操作。视图可以看作是一个动态生成的数据表&#xff0c;其内容是从其他表中选择、过滤和计算得到的。 视图通过使用SQL查询语句来定义…

Framebuffer应用编程

目录 前言 LCD操作原理 涉及的 API 函数 open函数 ioctl 函数 mmap 函数 Framebuffer程序分析 源码 1.打开设备 2.获取LCD参数 3.映射Framebuffer 4.描点函数 5.随便画几个点 上机实验 前言 本文介绍LCD的操作原理和涉及到的API函数&#xff0c;分析Framebuffer…

配置全新服务器深度学习一套流程

目录 1.安装anaconda2.配置cuda3.配置cudnn4.配置新的pytorch环境5.安装rdkit包6.小问题记录 1.安装anaconda 直接参考视频 总结&#xff1a; 1.下载anaconda安装包&#xff0c;尽量不下载最新的版本 2.bash 对应安装包&#xff0c;一直回车&#xff0c;yes 3.配置环境vim ~/.…

点餐小程序实战教程10权限验证

目录 1 创建员工的全局变量2 创建员工首页3 跳转到员工首页4 给全局变量赋值5 验证权限6 登录的完整代码总结 我们已经实现了员工的注册及登录功能&#xff0c;登录成功后需要跳转到我们的员工首页。在首页加载的时候我们需要去验证当前用户是否已经登录&#xff0c;未登录我就…

深入理解数据分析的使用流程:从数据准备到洞察挖掘

数据分析是企业和技术团队实现价值的核心。 5 秒内你能否让数据帮你做出决策&#xff1f; 通过本文&#xff0c;我们将深入探讨如何将原始数据转化为有意义的洞察&#xff0c;帮助你快速掌握数据分析的关键流程。 目录 数据分析的五个核心步骤1. 数据获取常用数据获取方式 2. 数…

synchronized的详解、锁的升级过程和优缺点比较

本文 详细介绍Java中为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级 锁、重量级锁&#xff0c;以及锁升级过程。 Java中每一个对象都可以作为锁。具体表现形式为以下三种形式&#xff1a; 对于普通的同步方法&#xff0c;锁是当前的实例对象对于静态同步方法&a…

攻防世界--->秘密-银河-300

做题笔记。 适用于reverse的隐写术。。。。啊哈哈哈哈 下载 查壳。(用的WSL->Debian) 64ida打开。 运行程序如下&#xff1a; 反汇编看不出来什么名堂&#xff0c;那就去看汇编代码。 下个断点。 东看看西看看 这是我们程序打印代码 往下翻&#xff1a; SECRET 秘密。 我…