Kafka 深入客户端 — 事务

news2025/1/30 11:45:50

Kafka 事务确保了数据在写入Kafka时的原子性和一致性。

1 幂等

幂等就是对接口的多次调用所产生的结果和调用一次是一致的。

Kafka 生产者在进行重试的时候可能会写入重复的消息,开启幂等性功能后就可以避免这种情况。将生产者客户端参数enable.idempotence设置为true即可。

1.1 实现原理

Kafka 引入了producer id(简称PID)和序列号(sequence number)这两个概念。分别对应v2版的日志格式中RecordBatch的producer id 和first sequence这两个字段。

每个新的生产者实例在初始化时都会由broker分配一个PID(对于用户不可见)。

对于每个PID,消息发送到每个分区都有对应的序列号,序列号从0开始单调递增。生产者每发送一条消息,就会将<PID,分区>对应的序列号的值加1。

1.1.1 序列号值比对

broker端会在内存中为每一对<PID,分区>维护一个序列号(SN_old),对于收到的每一条消息,将比对它的序列号值(SN_new)。以下有三种情况:

SN_new < SN_old+1:消息被重复写入,broker可以直接将其丢弃。

SN_new = SN_old+1:是新消息,且消息没有丢失。

SN_new > SN_old+1:消息可能丢失,生产者会抛出OutOfOlderSequenceException异常。

1.1.2 局限性

Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。

2 事务

事务可以保证跨生产者会话的消息幂等发送及新生产者实例及跨生产者会话的事务恢复。

前者指具有相同事务id的新生产者示例被创建且工作的时候,旧的且拥有相同事务id的生产者实例将不再工作。

后者指某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交,要么被终止。使新的生产者实例从一个正常的状态开始工作。

2.1 实现原理

图 生产者及事务协调器初始化事务到提交事务的流程

开启事务,生产者客户端需要提供唯一的transactionalId(通过客户端参数transactional.id来设置)。并且需要开启幂等特性。

2.1.1 查找事务协调器

KafkaProducer的initTransactions()方法初始化事务。

TransactionCoordinator事务协调器负责分配PID和管理事务。

生产者首先要找到对应的事务协调器所在broker节点。发送FindCoorinatorRequest请求,Kafka根据请求体中的coordinator_key(事务id)来查找节点(具体方式是根据事务id的哈希值计算其在主题__transaction_state中的分区编号),算法如下:

分区编号 = 事务id的哈希值 % transaction_state的分区数

根据分区编号,寻找此分区leader副本所在的broker节点,并将节点信息返回给生产者。

2.1.2 获取PID

生产者在找到事务协调器的节点后,发送InitProducerIdRequest请求(如果未开启事务特性而只开启幂等性,这个请求可以方式给任意broker,否则只会发给事务协调器所在的broker)来获取PID。

事务协调器生成PID后,会把事务id和对应的PID以消息的形式保存到主题__transaction_state中。

该请求还会触发协调器执行以下任务:

1)增加该PID对应的producer_epoch(单调递增)。具有相同PID但producer_epoch小于该值的其他生产者新开启的事务会被拒绝。

producer_epoch 确保了相同事务id任何时刻只有一个生产者实例。

具有相同事务id的新生产者实例被创建且工作的时,旧的且拥有相同事务id的生产者实例将不再工作。

2)恢复(Commit)或终止(Abort)之前生产者未完成的事务。

2.1.3 开启事务

KafkaProducer的beginTransaction()方法开启一个事务。调用该方法后,生产者本地会标记已经开启了一个新的事务。只有在生产者发送第一条消息之后事务协调器才会认为该事务已开启。

2.1.4 发送消息

在生产者给一个新的分区发送数据库之前,它需要先向事务协调器发送AddPartitionsToTxnRequest请求,让事务协调器将<transactionId,TopicPartition>的对应关系存储在主题__transaction_state中。

如果该分区是对应事务中的第一个分区,那么事务协调器还会启动会该事务的计时。

随后生产者通过ProduceRequest请求发送消息(ProducerBatch)到用户自定义主题中。和普通消息不同的是,ProducerBatch会包含实质的PID、producer_epoch和sequence_number。

2.1.5 提交或终止事务

调用KafkaProducer的commitTransaction()或abortTransaction()方法来结束当前的事务。

生产者会向事务协调器发送EndTxnRequest请求。协调器收到请求后会执行如下操作:

  1. 事务协调器将PREPARE_COMMIT或PREPARE_ABORT消息写入主题__transaction_state。
  2. 协调器向事务中各个分区的leader节点发送WriteTxnMarkersRequest请求,当leader节点收到请求后,会在相应分区中写入控制消息来标识事务的终结。它和普通消息一样存储在日志文件中。
  3. 事务协调器将最终的COMPLETE_COMMIT或COMPLETE_ABORT信息写入主题__transaction_state以表明当前事务已结束。

此时可以删除主题__transaction_state中所有关于该事务的消息(将相应的消息设置为墓碑消息即可)。

2.1.6 控制消息ControlBatch

各个分区的leader收到事务协调器的WriteTxnMarkersRequest请求后,会在相应的分区写入控制消息(ControlBatch)。来标识事务的终结。它和普通消息一样存储在日志文件中。不同点在于RecordBatch中某些字段的值。

图 控制消息日志格式

key 中的type表示控制类型:0 ABORT,1 COMMIT。

value中的coordinator_epoch 表示协调器的纪元(版本)。

2.2 消费者与事务

Kafka并不能保证已提交的事务中的所有消息都能被消费:

  1. 事务中的某些消息可能被清理(压缩或删除)。
  2. 消费者通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
  3. 消费者在消费时可能没有分配到事务内的所有分区。

2.2.1 消费者的隔离级别

消费端的参数isolation.level,默认值为“read_uncommitted”表示可以消费未提交的事务,而“read_committed”表示只能消费已提交的事务。

如果隔离级别为“read_committed”,生产者开启事务,并发送3条消息,在生产者执行commitTransaction()或abortTransaction()方法前,KafkaConsumer看不到这些消息,但是其内部会缓存消息,直到生产者执行commitTransaction(),它才会将消息推送给消费端应用;如果生产者执行abortTransaction(),那它就会丢弃这些消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2286408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZZNUOJ(C/C++)基础练习1011——1020(详解版)

1011 : 圆柱体表面积 题目描述 输入圆柱体的底面半径r和高h&#xff0c;计算圆柱体的表面积并输出到屏幕上。要求定义圆周率为如下宏常量 #define PI 3.14159 输入 输入两个实数&#xff0c;表示圆柱体的底面半径r和高h。 输出 输出一个实数&#xff0c;即圆柱体的表面积&…

Baklib探索内容中台的核心价值与实施策略

内容概要 在数字化转型的背景下&#xff0c;内容中台逐渐成为企业数字化策略中的关键组成部分。内容中台是一个集成的内容管理体系&#xff0c;旨在打破信息孤岛&#xff0c;使内容能够在各个业务部门和平台之间高效流通。这种管理体系不仅能够提升内容的生产效率&#xff0c;…

网络安全攻防实战:从基础防护到高级对抗

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 引言 在信息化时代&#xff0c;网络安全已经成为企业、政府和个人必须重视的问题。从数据泄露到勒索软件攻击&#xff0c;每一次…

论文阅读(十三):复杂表型关联的贝叶斯、基于系统的多层次分析:从解释到决策

1.论文链接&#xff1a;Bayesian, Systems-based, Multilevel Analysis of Associations for Complex Phenotypes: from Interpretation to Decision 摘要&#xff1a; 遗传关联研究&#xff08;GAS&#xff09;报告的结果相对稀缺&#xff0c;促使许多研究方向。尽管关联概念…

“““【运用 R 语言里的“predict”函数针对 Cox 模型展开新数据的预测以及推理。】“““

主题与背景 本文主要介绍了如何在R语言中使用predict函数对已拟合的Cox比例风险模型进行新数据的预测和推理。Cox模型是一种常用的生存分析方法&#xff0c;用于评估多个因素对事件发生时间的影响。文章通过具体的代码示例展示了如何使用predict函数的不同参数来获取生存概率和…

Oracle Primavera P6 最新版 v24.12 更新 1/2

目录 引言 P6 PPM 更新内容 1. 在提交更新基线前预览调整 2. 快速轻松地取消链接活动 3. 选择是否从 XER 文件导入责任经理 4. 提高全局变更报告的清晰度 5. 将整个分层代码值路径导出到 CPP 6. 里程碑活动支持所有关系类型 6. 时间表批准 7. 性能改进 8. 安装改进 …

AI大模型开发原理篇-2:语言模型雏形之词袋模型

基本概念 词袋模型&#xff08;Bag of Words&#xff0c;简称 BOW&#xff09;是自然语言处理和信息检索等领域中一种简单而常用的文本表示方法&#xff0c;它将文本看作是一组单词的集合&#xff0c;并忽略文本中的语法、词序等信息&#xff0c;仅关注每个词的出现频率。 文本…

本地部署deepseek模型步骤

文章目录 0.deepseek简介1.安装ollama软件2.配置合适的deepseek模型3.安装chatbox可视化 0.deepseek简介 DeepSeek 是一家专注于人工智能技术研发的公司&#xff0c;致力于打造高性能、低成本的 AI 模型&#xff0c;其目标是让 AI 技术更加普惠&#xff0c;让更多人能够用上强…

【deepseek】deepseek-r1本地部署-第二步:huggingface.co替换为hf-mirror.com国内镜像

一、背景 由于国际镜像国内无法直接访问&#xff0c;会导致搜索模型时加载失败&#xff0c;如下&#xff1a; 因此需将国际地址替换为国内镜像地址。 二、操作 1、使用vscode打开下载路径 2、全局地址替换 关键字 huggingface.co 替换为 hf-mirror.com 注意&#xff1a;务…

sunrays-framework配置重构

文章目录 1.common-log4j2-starter1.目录结构2.Log4j2Properties.java 新增两个属性3.Log4j2AutoConfiguration.java 条件注入LogAspect4.ApplicationEnvironmentPreparedListener.java 从Log4j2Properties.java中定义的配置读取信息 2.common-minio-starter1.MinioProperties.…

【大模型】Ollama+AnythingLLM搭建RAG大模型私有知识库

文章目录 一、AnythingLLM简介二、搭建本地智能知识库2.1 安装Ollama2.2 安装AnythingLLM 参考资料 一、AnythingLLM简介 AnythingLLM是由Mintplex Labs Inc.开发的一个全栈应用程序&#xff0c;是一款高效、可定制、开源的企业级文档聊天机器人解决方案。AnythingLLM能够将任…

代理模式 -- 学习笔记

代理模式学习笔记 什么是代理&#xff1f; 代理是一种设计模式&#xff0c;用户可以通过代理操作&#xff0c;而真正去进行处理的是我们的目标对象&#xff0c;代理可以在方法增强&#xff08;如&#xff1a;记录日志&#xff0c;添加事务&#xff0c;监控等&#xff09; 拿一…

JVM_类的加载、链接、初始化、卸载、主动使用、被动使用

①. 说说类加载分几步&#xff1f; ①. 按照Java虚拟机规范,从class文件到加载到内存中的类,到类卸载出内存为止,它的整个生命周期包括如下7个阶段: 第一过程的加载(loading)也称为装载验证、准备、解析3个部分统称为链接(Linking)在Java中数据类型分为基本数据类型和引用数据…

ProfibusDP主机与从机交互

ProfibusDP 主机SD2索要数据下发&#xff1a;68 08 F7 68 01 02 03 21 05 06 07 08 1C 1668&#xff1a;SD2 08&#xff1a;LE F7&#xff1a;LEr 68&#xff1a;SD2 01:目的地址 02&#xff1a;源地址 03:FC_CYCLIC_DATA_EXCHANGE功能码 21&#xff1a;数据地址 05,06,07,08&a…

Java设计模式:结构型模式→组合模式

Java 组合模式详解 1. 定义 组合模式&#xff08;Composite Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许将对象组合成树形结构以表示“部分-整体”的层次。组合模式使得客户端能够以统一的方式对待单个对象和对象集合的一致性&#xff0c;有助于处理树形结构…

【福州市AOI小区面】shp数据学校大厦商场等占地范围面数据内容测评

AOI城区小区面样图和数据范围查看&#xff1a; — 字段里面有name字段。分类比较多tpye&#xff1a;每个值代表一个类型。比如字段type中1549代表小区住宅&#xff0c;1563代表学校。小区、学校等占地面积范围数据 —— 小区范围占地面积面数据shp格式 无偏移坐标&#xff0c;只…

【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR

【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR 1 算法原理 Tarun A K, Chundawat V S, Mandal M, et al. Fast yet effective machine unlearning[J]. IEEE Transactions on Neural Networks and Learning Systems, 2023. 本文提出了一种名为 UNSIR&#xff08;Un…

基于SpringBoot的阳光幼儿园管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

【开源免费】基于SpringBoot+Vue.JS景区民宿预约系统(JAVA毕业设计)

本文项目编号 T 162 &#xff0c;文末自助获取源码 \color{red}{T162&#xff0c;文末自助获取源码} T162&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

安卓逆向之脱壳-认识一下动态加载 双亲委派(一)

安卓逆向和脱壳是安全研究、漏洞挖掘、恶意软件分析等领域的重要环节。脱壳&#xff08;unpacking&#xff09;指的是去除应用程序中加固或保护措施的过程&#xff0c;使得可以访问应用程序的原始代码或者数据。脱壳的重要性&#xff1a; 分析恶意软件&#xff1a;很多恶意软件…