图解Kafka | 5张图讲透Kafka 消费者交付语义

news2024/11/15 18:01:15

Kafka 消费者交付语义指的是 Kafka 消费者在处理消息时如何保证消息的可靠性和一致性。这涉及到消息是否被丢失、重复处理或者按顺序消费。

Kafka消费者交付语义有三种,即:

  • 最多一次
  • 至少一次
  • 精确一次

当消费者组/消费者从 Kafka 消费数据时,仅支持最多一次和至少一次这两种语义。但是您可以通过选择适当的数据存储来实现类似于精确一次的交付语义,例如,任何键值存储、RDBMS(主键)、Elasticsearch或任何其他支持幂等写入的存储。

最多一次

在最多一次传递语义中,消息最多只能传递一次。在这种语义中,宁可丢失消息也不应重复传递消息。采用最多一次语义的应用程序可以轻松实现更高的吞吐量和较低的延迟。默认情况下,由于“enable.auto.commit”为 true,因此Kafka消费者设置为使用“最多一次”传递语义。

这种语义下,如果消费者在将消息提交为已读,但是在处理消息之前宕机了或者消息处理失败,则未处理的消息将丢失,并且不会再次读取,分区重新平衡将导致另一个消费者从上次提交的偏移量读取消息。

如下图所示,消息是分批读取的,批次中的部分或全部消息可能未处理,但仍已提交为已处理,这就造成了消息的丢失。

至少一次

在至少一次传递语义中,可以多次传递消息,但不应丢失任何消息。消费者确保所有消息都被读取和处理,即使这可能导致消息重复。为了在消费数据时做到至少一次语义,需要将“enable.auto.commit”值设置为“false”`,您可以选择在处理完消息后手动提交,这样你就掌握了消费偏移量提交的主动权,只有消费成功的消息偏移量才会提交。

如果消费者在处理消息之前发生故障,未处理的消息不会丢失,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理。

但是如果消费者在处理消息之后、提交消费偏移量之前发生故障,因为偏移量未提交,分区重新平衡将导致另一个消费者从上次提交的偏移量再次读取相同的消息进进行处理,这就导致这批消息会被重复消费。

精确一次

在精确一次传递语义中,一条消息只能传递一次,并且不能丢失任何消息。这是所有传递语义中最困难的。与其他两种语义相比,采用精确一次语义的应用程序可能具有较低的吞吐量和较高的延迟。

就像前面介绍的一样,可以通过选择适当的数据存储来实现类似于精确一次的语义。

我们可以通过选择支持幂等写入数据存储来实现。幂等写入意味着即使重复执行相同的写入操作,结果也不会改变。这可以确保在至少一次语义中,即使消息被多次处理,最终数据存储中的数据不会重复。

假设我们使用MySQL作为数据存储,并且每条消息都有一个唯一的ID(例如,消息偏移量)。我们将消息写入MySQL数据库的表中,并使用消息的唯一ID作为主键。

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 禁用自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("myTopic"));

try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password")) {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            String insertSQL = "INSERT INTO my_table (id, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=?";
            try (PreparedStatement ps = connection.prepareStatement(insertSQL)) {
                ps.setLong(1, record.offset());
                ps.setString(2, record.value());
                ps.setString(3, record.value());
                ps.executeUpdate();
            }
        }
        consumer.commitSync();
    }
} catch (SQLException e) {
    e.printStackTrace();
}

下面图表展示了如何使用消息偏移量作为唯一标识符进行幂等写入:

Kafka Topic: myTopic
+-----------+-----------+-----------+
| Offset 0  | Offset 1  | Offset 2  |
| Message A | Message B | Message C |
+-----------+-----------+-----------+

消费者读取消息并将其写入MySQL数据库:
+-------------------------------+
| MySQL数据库: my_table         |
+-----------+-------------------+
| ID (Offset) | Value           |
+-----------+-------------------+
| 0          | Message A        |
| 1          | Message B        |
| 2          | Message C        |
+-----------+-------------------+

如果消息重复处理(例如,Offset 1的Message B),由于使用了ON DUPLICATE KEY UPDATE,写入操作将更新现有记录,而不会插入重复记录。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2073748.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

昂科烧录器支持Airoha达发科技的蓝牙音频芯片AB1568

芯片烧录行业领导者-昂科技术近日发布最新的烧录软件更新及新增支持的芯片型号列表&#xff0c;其中Airoha达发科技的蓝牙音频芯片AB1568已经被昂科的通用烧录平台AP8000所支持。 AB1568是一款获得蓝牙5.3和LE音频认证的单芯片解决方案&#xff0c;包含一个ARM Cortex-M4F应用…

香蕉梨:自然的甜蜜宝藏

在水果的缤纷世界里&#xff0c;有一种独特的存在&#xff0c;它融合了香蕉的软糯与梨子的清甜&#xff0c;那便是令人惊艳的香蕉梨。 食家巷香蕉梨&#xff0c;外形圆润可爱&#xff0c;色泽金黄中带着一抹清新的嫩绿&#xff0c;宛如大自然精心雕琢的艺术品。当你拿起一个香蕉…

使用Java进行中小学违规教育培训数据采集实践-以某城市为例

目录 前言 一、违规教育信息 1、内容管理 2、转换后的内容 二、数据库设计 1、空间数据库 三、字符地址位置转换空间信息 1、实现时序图 2、后台实体类的设计与实现 3、数据持久化操作 四、总结 前言 时间来到2024年8月24日&#xff0c;时间过得很快&#xff0c;2024…

PowerShell | git log 中文乱码问题解决

总结一下: 乱码核心问题就是对不上编码.改成对应编码即可. 明白‌LESSCHARSET环境变量‌是用来设置less命令的字符集编码的。当在命令行中使用less命令查看文件时&#xff0c;如果文件包含非ASCII字符&#xff08;如中文&#xff09;&#xff0c;可能会出现乱码问题。通过设置…

SpringBoot+Vue实现大文件上传(断点续传-前端控制)

SpringBootVue实现大文件上传&#xff08;断点续传&#xff09; 1 环境 SpringBoot 3.2.1&#xff0c;Vue 2&#xff0c;ElementUI 2 问题 在前一篇文章&#xff0c;我们写了分片上传来实现大文件上传&#xff0c;存在一个问题就是&#xff0c;中间失败的话需要重新上传&#…

QT WIN11 FluentUI APP开发

代码 import QtQuick import QtQuick.Controls import FluentUIItem {property bool autoPlay: trueproperty int loopTime: 2000property var modelproperty Component delegateproperty bool showIndicator: trueproperty int indicatorGravity : Qt.AlignBottom | Qt.Align…

【MySQL】一文带你理清<行级锁>(行锁,间隙锁,临键锁)

前言 大家好吖&#xff0c;欢迎来到 YY 滴MySQL系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

STM32(六):定时器——输出比较实验

PWM驱动呼吸灯 源码&#xff1a; #include "stm32f10x.h" // Device headervoid PWM_Init(void) {RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2,ENABLE);//开启时钟TIM_InternalClockConfig(TIM2);//选择时基单元的时钟TIM_TimeBaseInitTypeDef TI…

怎么管控终端电脑上的移动端口

管控终端电脑上的移动端口&#xff0c;尤其是USB等移动端口&#xff0c;是确保企业数据安全和提升网络管理效率的重要手段。 一、使用注册表编辑器禁用USB端口&#xff08;适用于Windows系统&#xff09; 打开注册表编辑器&#xff1a; 同时按下“WinR”组合键&#xff0c;打…

【C++从小白到大牛】C++智能指针的使用、原理和分类

目录 1、我们为什么需要智能指针&#xff1f; 2、内存泄露 2.1 什么是内存泄漏&#xff0c;内存泄漏的危害 2.2如何避免内存泄漏 总结一下: 3.智能指针的使用及原理 3.1 RAII 3.2关于深拷贝和浅拷贝更深层次的理解&#xff1a; 3.3 std::auto_ptr 3.4 std::unique_pt…

《黑神话:悟空》登顶全球:游戏行业投资新风向与投资洞察

目录 引言 一、原创IP的崛起&#xff1a;文化共鸣与市场潜力 1《黑神话:悟空》的原创IP魅力 2 原创IP在游戏行业中的重要性 3 原创IP成为新的投资热点 4 文化共鸣的关键作用 二、高质量内容为王&#xff1a;技术与创新的双重驱动 1 《黑神话:悟空》的高质量内容展示 2…

Java接口interface(内含练习)

为什么有接口&#xff1f; 接口就是一种规则&#xff0c;更侧向是一种行为 接口的定义和使用 接口用关键字interface来定义 public interface 接口名{} 接口不能实例化 接口和接口之间是实现关系&#xff0c;通过implements关键字表示 public class 类名 implements 接口…

浅谈线性表——链表

文章目录 一、ArrayList的缺陷二、什么是链表&#xff1f;三、自我实现一个单向不带头非循环结构的链表3.1、实现代码3.2、代码解析 四、自我实现一个双向不带头非循环结构的链表4.1、实现代码 一、ArrayList的缺陷 前面学习了顺序表&#xff0c;顺序表在知道下标时可以快速的…

python应用之random模块(居然还有那么多的随机算法函数)

random 是 Python 的一个常用的内置模块&#xff0c;模块提供了生成随机数的功能&#xff0c;包含了多种生成随机数的函数&#xff0c;比如生成随机整数、随机浮点数、从序列中随机选择元素等。 使用 random模块 要使用 random模块&#xff0c;直接导入它即可。 import rand…

spring揭秘09-aop03-aop织入器织入横切逻辑与自动织入

文章目录 【README】【1】spring aop的织入【1.1】使用ProxyFactory 作为织入器【1.2】基于接口的代理&#xff08;JDK动态代理&#xff0c;目标类实现接口&#xff09;【补充】 【1.2】基于类的代理&#xff08;CGLIB动态代理&#xff0c;目标类没有实现接口&#xff09;【1.2…

Nginx: 配置项之autoIndex模块与Nginx变量

autoIndex模块 autoindex模块它所实现的一个基本功能&#xff0c;是当用户请求以 / 结尾式的URL&#xff0c;它会列出对应的目录结构比如说, 在实际的生态环境中&#xff0c;内部系统可能经常需要为用户提供一些下载功能。可能需要列出来某一个磁盘上的一个文件&#xff0c; 比…

【D-DCVRP】求解DCVRP改进贪婪算法(三)

一、Held-Harp模型 海尔德和卡尔普在1970年提出景点模型,用于求解TSP问题的最优解下界 该模型同样可以用于DCVRP问题,既有定理1成立。 定理1:根据Held-Karp模型使用向量 π = ( 0 , π 1 , π 2 , ⋯   , π n ) \pi=(0,\pi_1,\pi_2,\cdots,\pi_n) π=(0,π1​,π2​,⋯…

Datawhale第五期夏令营-CV竞赛

CV竞赛 0.赛事报名租用4090 1.开始运行下载文件提交结果 2.内容解释赛题背景赛题目标社会价值评分规则baseline精读代码什么是YOLO 主要代码内容精读使用Ultraalytics运行代码 0.赛事报名 赛事官网:https://www.marsbigdata.com/competition/details?id3839107548872 租用40…

【Redis】RDB和AOF持久化

RDB和AOF持久化 一、什么是持久化&#xff1f;二、RDB三、AOF 一、什么是持久化&#xff1f; 数据一般写在内存上&#xff0c;但是当重新启动计算机内存数据是会丢失的&#xff0c;而硬盘中的数据是不会丢失的&#xff0c;所以&#xff0c;当我们把数据从内存放到硬盘中的话就…

解决Windows下载完anaconda之后,在pycharm中使用anaconda

怎么下载anaconda我就不详细讲了&#xff0c;就是官方下载基本嫩都是下一步下一步你就可以 一、首先配置环境变量如图 二、查看anaconda情况 三、打开pycharm,如下图操作 ## 注意这里的.bat文件需要在你下载到的anaconda中去找 完毕