kafka如何保证消息不丢?

news2025/1/12 10:06:06

概述

我们知道Kafka架构如下,主要由 Producer、Broker、Consumer 三部分组成。一条消息从生产到消费完成这个过程,可以划分三个阶段,生产阶段、存储阶段、消费阶段。

图片

  • 产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer上。

那么如何保证消息不丢我们可以从这三部分来分析。

消息传递语义

在深度剖析消息丢失场景之前,我们先来聊聊「消息传递语义」到底是个什么玩意?

所谓的消息传递语义是 Kafka 提供的 Producer 和 Consumer 之间的消息传递过程中消息传递的保证性。主要分为三种, 如下图所示:

  1. 1. 首先当 Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。

  2. 2. 在 Kafka 0.11.0.0 之前, 如果 Producer 没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到 Broker,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个 Topic 分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。 其中启用幂等传递的方法配置enable.idempotence = true。 启用事务支持的方法配置:设置属性 transcational.id = "指定值"

  3. 3. 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。

  4. 4. 如果 Consumer 消费消息完成后, 再更新 Offset,如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。

总结: 默认 Kafka 提供「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset的方式提供 「at mostonce」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。

接下来我们从生产阶段、存储阶段、消费阶段这几方面看下kafka如何保证消息不丢失。

生产阶段

通过深入解析Kafka消息发送过程我们知道Kafka生产者异步发送消息并返回一个Future,代表发送结果。首先需要我们获取返回结果判断是否发送成功。

// 异步发送消息,并设置回调函数 
producer.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) { 
            System.err.println("消息发送失败: " + exception.getMessage()); 
        } else { 
            System.out.println("消息发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset()); 
        } 
    } 
});

消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

Producer(生产者)保证消息不丢失的方法:

  1. 1. 发送确认机制:Producer可以使用Kafka的acks参数来配置发送确认机制。通过设置合适的acks参数值,Producer可以在消息发送后等待Broker的确认。确认机制提供了不同级别的可靠性保证,包括:

    • • acks=0:Producer在发送消息后不会等待Broker的确认,这可能导致消息丢失风险。

    • • acks=1:Producer在发送消息后等待Broker的确认,确保至少将消息写入到Leader副本中。

    • • acks=all或acks=-1:Producer在发送消息后等待Broker的确认,确保将消息写入到所有ISR(In-Sync Replicas)副本中。这提供了最高的可靠性保证。

  2. 2. 消息重试机制:Producer可以实现消息的重试机制来应对发送失败或异常情况。如果发送失败,Producer可以重新发送消息,直到成功或达到最大重试次数。重试机制可以保证消息不会因为临时的网络问题或Broker故障而丢失。

Broker存储阶段

正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

在kafka高性能设计原理中我们了解到kafka为了提高性能用到了 Page Cache 技术.在读写磁盘日志文件时,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘。如果内存中数据还未刷入磁盘服务宕机了,这个时候还是会丢消息的。

为了最大程度地降低数据丢失的可能性,我们可以考虑以下方法:

  1.  持久化配置优化:可以通过调整 Kafka 的持久化配置参数来控制数据刷盘的频率,从而减少数据丢失的可能性。例如,可以降低 flush.messages 和 flush.ms 参数的值,以更频繁地刷写数据到磁盘。

  2.  副本因子增加:在 Kafka 中,可以为每个分区设置多个副本,以提高数据的可靠性。当某个 broker 发生故障时,其他副本仍然可用,可以避免数据丢失。

  3. 使用acks=all:在生产者配置中,设置 acks=all 可以确保消息在所有ISR(In-Sync Replicas)中都得到确认后才被视为发送成功。这样可以确保消息被复制到多个副本中,降低数据丢失的风险。

  4. 备份数据:定期备份 Kafka 的数据,以便在发生灾难性故障时可以进行数据恢复。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

  1. 自动提交位移:Consumer可以选择启用自动提交位移的功能。当Consumer成功处理一批消息后,它会自动提交当前位移,标记为已消费。这样即使Consumer发生故障,它可以使用已提交的位移来恢复并继续消费之前未处理的消息。

  2. 手动提交位移:Consumer还可以选择手动提交位移的方式。在消费一批消息后,Consumer可以显式地提交位移,以确保处理的消息被正确记录。这样可以避免重复消费和位移丢失的问题。

下面是手动提交位移的例子:

// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Collections.singletonList(topic));

try {
    while (true) {
        // 消费消息
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {
            // 处理消息逻辑
            System.out.println("消费消息:Topic = " + record.topic() +
                    ", Partition = " + record.partition() +
                    ", Offset = " + record.offset() +
                    ", Key = " + record.key() +
                    ", Value = " + record.value());

            // 手动提交位移
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);
            consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1451053.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Godot 游戏引擎个人评价和2024年规划(无代码)

文章目录 前言Godot C# .net core 开发简单评价Godot相关网址可行性 Godot(GDScirpt) Vs CocosGodot VS UnityUnity 的裁员Unity的股票Unity的历史遗留问题&#xff1a;Mono和.net core.net core的开发者&#xff0c;微软 个人的独立游戏Steam平台分成说明独立游戏的选题美术风…

面试经典150题——无重复字符的最长子串

我生来就是高山而非溪流&#xff0c;我欲于群峰之巅俯视平庸的沟壑 1. 题目描述 2. 题目分析与解析 2.1 思路一——暴力解法 看到这个题目&#xff0c;我们是不是发现和上一篇内容刚刚讲过的长度最小的子数组题目很像&#xff1f;首先自然的暴力解法&#xff0c;就是遍历字符…

Linux线程 分离和同步与互斥 条件变量

Linux线程 分离和同步与互斥 条件变量 1. 分离线程2. 线程互斥与互斥量3. 线程同步与竞态条件4. pthread库与条件变量5. 生产者-消费者 1. 分离线程 什么是线程分离&#xff1f; 线程分离是指线程在结束时&#xff0c;操作系统会自动回收其资源&#xff0c;而无需其他线程显式地…

SpringMVC-入门

1.概念 SpringMVC是一种软件架构思想&#xff0c;把软件按照模型(Model)、视图(View)、控制器(Controller)这三层来划分。Model&#xff1a;指的是工程中JavaBean&#xff0c;用来处理数据View&#xff1a;指的是工程中的html、jsp等页面&#xff0c;用来展示给用户数据Control…

LGAMEFI基于BPL公链开发的第一生态:开启RWA游戏娱乐与DeFi融合的新纪元

在去中心化金融&#xff08;DeFi&#xff09;与游戏娱乐的结合趋势中&#xff0c;BPL公链上的LGAMEFI项目代表了前沿的技术革新和市场领导。这种将web2上成熟页游进行RWA链改&#xff0c;不仅仅是将游戏热门领域融合&#xff0c;更是在寻找一种全新的参与者经验&#xff0c;将玩…

Linux网络编程——tcp套接字

文章目录 主要代码关于构造listen监听accepttelnet测试读取信息掉线重连翻译服务器演示 本章Gitee仓库&#xff1a;tcp套接字 主要代码 客户端&#xff1a; #pragma once#include"Log.hpp"#include<iostream> #include<cstring>#include<sys/wait.h…

C# CAD2016 判断多边形的方向正时针或逆时针旋转

方法一&#xff1a;基于相邻顶点相对位置判断顺时针排列 // 计算当前子序列是否为顺时针排列 for (int i 1; i < outerPoints.Count; i) {int index (startVertexIndex i) % outerPoints.Count;int prevIndex (startVertexIndex i - 1) % outerPoints.Count;Point2d c…

Acwing---873. 欧拉函数

欧拉函数 1.题目2.基本思想3.代码实现 1.题目 给定 n 个正整数 ai&#xff0c;请你求出每个数的欧拉函数。 欧拉函数的定义 输入格式 第一行包含整数 n n n。 接下来 n n n 行&#xff0c;每行包含一个正整数 a i ai ai。 输出格式 输出共 n n n 行&#xff0c;每行输出…

Fluke ADPT 连接器新增对福禄克万用 Fluke 101 的支持

所需设备&#xff1a; 1、Fluke ADPT连接器&#xff1b; 2、Fluke 101&#xff1b; Fluke 101 拆机图&#xff1a; 显示界面如下图&#xff1a; 并且可以将波形导出到EXCEL: 福禄克万用表需要自己动手改造&#xff01;&#xff01;&#xff01;

电商+支付双系统项目------设计数据库

这篇文章将详细介绍电商支付双系统项目的数据库设计。数据库在该项目中扮演着至关重要的角色&#xff0c;它负责存储和管理用户信息、商品数据、订单记录以及支付交易等关键数据。通过精心设计和优化数据库结构&#xff0c;可以实现高效的数据存储和检索&#xff0c;确保系统的…

【Linux】 Linux 小项目—— 进度条

进度条 基础知识1 \r && \n2 行缓冲区3 函数介绍 进度条实现版本 1代码实现运行效果 版本2 Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读&#xff01;&#xff01;&#xff01;下一篇文章见&#xff01;&#xff01;&#xff01; 基础知识 1 \r &&a…

利用修改邻接变量

资源下载 【免费】突破密码认证程序&#xff08;修改邻接变量&#xff09;资源-CSDN文库 资源内容 源码 /*****************************************************************************To be the apostrophe which changed "Impossible" into "Im possib…

java.lang.NoClassDefFoundError: org/springframework/core/GenericTypeResolver

前言 小编我将用CSDN记录软件开发求学之路上亲身所得与所学的心得与知识&#xff0c;有兴趣的小伙伴可以关注一下&#xff01; 也许一个人独行&#xff0c;可以走的很快&#xff0c;但是一群人结伴而行&#xff0c;才能走的更远&#xff01;让我们在成长的道路上互相学习&…

Codeforces Round 926 (Div. 2)(A~C)

A. Sasha and the Beautiful Array 分析&#xff1a;说实话&#xff0c;打比赛的时候看到这题没多想&#xff0c;过了一下样例发现将数组排序一下就行&#xff0c;交了就过了。刚刚写题解反应过来&#xff0c;a2-a1a3-a2.....an-a(n-1) an - a1&#xff0c;所以最后结果只取决…

35岁转行,是我人生中最正确的选择

前言 经常听到有人说&#xff0c;35岁是职场的分水岭&#xff0c;但我觉得我的35岁&#xff0c;人生才刚刚开始。 35岁前后&#xff0c;我生二胎&#xff0c;考研&#xff0c;跳槽&#xff0c;转行&#xff0c;从传统行业到服务业&#xff0c;从服务业到新能源行业&#xff0…

第7讲 SpringSecurity执行原理概述

SpringSecurity执行原理概述 spring security的简单原理&#xff1a; SpringSecurity有很多很多的拦截器&#xff0c;在执行流程里面主要有两个核心的拦截器 1&#xff0c;登陆验证拦截器AuthenticationProcessingFilter 2&#xff0c;资源管理拦截器AbstractSecurityInterc…

问题:在解决思想认识上存在的问题讲解过程中和大家分享谁的一段原话() #其他#媒体

问题&#xff1a;在解决思想认识上存在的问题讲解过程中和大家分享谁的一段原话&#xff08;&#xff09; A&#xff0e;鲁迅 B&#xff0e;稻盛和夫 C&#xff0e;戴尔卡耐基 D&#xff0e;奥格曼狄诺 参考答案如图所示

应急响应实战笔记02日志分析篇(3)

第3篇:Web日志分析 ox01 Web日志 Web访问日志记录了Web服务器接收处理请求及运行时错误等各种原始信息。通过对WEB日志进行的安全分析&#xff0c;不仅可以帮助我们定位攻击者&#xff0c;还可以帮助我们还原攻击路径&#xff0c;找到网站存在的安全漏洞并进行修复。 我们来…

【Spring源码解读 底层原理高级进阶】【上】探寻Spring内部:BeanFactory和ApplicationContext实现原理讲解

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《Spring 狂野之旅&#xff1a;底层原理高级进阶》 &#x1f680…

java8并行数据处理与性能

块并用不同的线程分别处理每个数据块的流。这样一来&#xff0c;你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。让我们用一个简单的例子来试验一下这个思想。假设你需要写一个方法&#xff0c;接受数字n作为参数&#xff0c;并返回从1到给定参数…