Kafka 中的事务

news2025/4/9 11:14:25

Kafka 中的 事务(Transactions) 是为了解决 消息处理的原子性和幂等性问题,确保一组消息要么全部成功写入、要么全部失败,不出现中间状态或重复写入。事务机制尤其适合于 “精确一次(Exactly-Once)” 的处理语义(EOS, Exactly Once Semantics)。

🧠 Kafka 中为什么需要事务?

在实际业务中,可能有这样的场景:

一个消费者从 Topic A 读取一条消息,然后处理它,并将处理结果写入 Topic B —— 我们希望这个“读取 + 写入”是一个整体,要么都成功,要么都失败,否则可能造成重复消费或数据不一致

普通情况下 Kafka 只能做到:

  • 最多一次(At most once):消息可能丢;
  • 至少一次(At least once):消息可能重复;
  • 不能保证精确一次,除非业务层做幂等控制。

因此 Kafka 引入了事务机制来支持真正的 Exactly Once Semantics(EOS)

✅ Kafka 事务的核心概念

概念说明
Transactional Producer开启事务功能的生产者,可以保证一组写入的原子性。
Transactional ID每个事务性生产者的唯一标识,用于区分和恢复未完成的事务。
事务协调器(Transaction Coordinator)Kafka 集群中的一个 Broker 组件,负责管理事务的状态、提交与回滚。
Producer ID(PID)Kafka 为每个事务性生产者分配的唯一 ID,用于实现幂等性和事务追踪。

✅ Kafka 事务的使用流程(简化)

  1. 初始化事务生产者(开启事务功能):
    Properties props = new Properties();
    props.put("transactional.id", "txn-001");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.initTransactions();
    
  2. 开始事务
    producer.beginTransaction();
    
  3. 执行写入操作(可以写入多个 Topic、多个 Partition):
    producer.send(new ProducerRecord<>("topicA", "key1", "msg1"));
    producer.send(new ProducerRecord<>("topicB", "key2", "msg2"));
    
  4. 提交事务(成功)或 中止事务(失败):
    producer.commitTransaction(); // 或者 producer.abortTransaction();
    

✅ Kafka 事务的特点与保障

1. 原子性

  • 一次事务中的多条消息,要么全部写入成功,要么全部失败并回滚。
  • 对消费者来说,要么能消费到完整事务内的消息,要么一条都看不到。

2. 幂等性(Idempotence)

  • 自动启用,配合事务使用时,可以避免消息重复写入,即使重试也不会写入重复数据。

3. 隔离性

  • Kafka 使用 读已提交(read_committed)读未提交(read_uncommitted) 的消费模式控制事务可见性。
  • 默认:消费者只能读取已提交的事务消息,未提交或中止的事务消息不会暴露给消费者。

✅ Kafka 事务与消费者的协作(消费 + 生产)

配合 enable.auto.commit=falseread_committed,可以实现精确一次语义

producer.beginTransaction();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    // 处理数据,并写入结果
    producer.send(new ProducerRecord<>("output-topic", process(record)));
}

// 手动提交 offset,作为事务的一部分
Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(records);
producer.sendOffsetsToTransaction(offsets, consumerGroupId);

producer.commitTransaction();

通过把 消费 Offset 的提交生产消息的提交 绑定到同一个事务中,Kafka 实现了端到端的 Exactly-Once 保证。

✅ Kafka 事务机制本质

Kafka 事务的回滚机制,并不是自动触发的,开发者必须在代码逻辑中显式地判断是否出错,然后手动调用:

producer.abortTransaction();

如果开发者只调用了 producer.commitTransaction(),而没有判断出错,也没有手动调用 abortTransaction(),那么出问题时 Kafka 不会自动回滚!需要开发者自己判断、自己调用!

✅ 正确的事务控制流程应该是这样的:

try {
    producer.beginTransaction();

    // 消费 + 处理 + 发送
    producer.send(...);
    producer.send(...);

    // 提交 offset
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    // 提交事务
    producer.commitTransaction();   // ✅ 成功,整个事务写入生效!

} catch (Exception e) {
    // 出现任何异常,都应该回滚
    producer.abortTransaction();   // ❗回滚事务,所有写入 + offset 统统丢弃
    e.printStackTrace();
}

🚨 如果直接调用 commitTransaction() 会怎样?

如果前面的 send()sendOffsetsToTransaction() 抛了异常,但没 try-catch 捕获,程序继续调用 commitTransaction(),结果是:

  • Kafka 会检测到前面出了问题,会抛出 ProducerFencedExceptionIllegalStateException 等;
  • 此时事务已经处于非法状态;
  • commitTransaction() 会失败,Kafka 不会自动回滚!
  • 如果你不手动调用 abortTransaction(),这个事务就会卡在中间状态,不生效,也没回滚。

✅ 做法:必须写 try-catch 包住整个事务过程,出错就 abortTransaction()

这就是标准的 事务控制模式(跟数据库事务的 try-catch 是一样的思路):

⚠️ 常见会触发事务失败的场景:

场景会发生什么
网络波动、Broker 写入超时send() 抛异常
offset 提交失败sendOffsetsToTransaction() 抛异常
重复使用 transactional.id 被踢出ProducerFencedException
使用了错误的调用顺序IllegalStateException
commitTransaction() 时事务非法提交失败,不会自动回滚

✅ 补充一句

Kafka 的事务机制本质上是「声明式事务」,但实现方式是「编程式事务」,不像数据库事务那样自动提交或自动回滚 —— 所以你写代码的时候,一定要有清晰的事务控制逻辑。

✅ 总结一下你该怎么做

✅ 成功就 commitTransaction()
❌ 出错就 abortTransaction()
🧠 判断错没错,靠自己的业务代码来 try-catch 控制

什么是事务非法?

commitTransaction() 时事务非法,这句话是什么意思?“事务非法”到底是个啥意思?

在 Kafka 中,一个事务是有“状态”的,它不是你想提交就能提交的。只有当事务状态是“合法/活跃”的时候,才能 commitTransaction(),否则就会抛异常。

所以,“事务非法” = 事务已经处于异常、失效、终止状态,不能提交。

📊 Kafka 中事务的几种状态(简化理解)

事务状态描述
InitializedinitTransactions() 调用后,初始化完成
Started调用 beginTransaction() 后,事务已开始
InFlight事务进行中,已发送消息,或 offset
ReadyToCommit一切正常,可以提交
Invalid出现异常、被踢出、操作错误 → 事务非法
Committed已提交成功
Aborted已主动回滚

🚨 什么情况下事务会变成“非法状态”?

以下几种情况会让事务“非法化”,从而你调用 commitTransaction() 时直接失败:

1. ❌ 你调用顺序错了

比如你根本没有调用 beginTransaction(),就直接调用 send()commitTransaction()

producer.initTransactions();
// producer.beginTransaction();  // ❌ 忘了这行!

producer.send(...);             // ❌ 错误用法
producer.commitTransaction();   // ❌ 会抛 IllegalStateException

这时候 Kafka 会认为你“乱搞”,把事务标为非法状态。

2. ❌ 事务内某个操作失败(比如 send 抛异常)

producer.beginTransaction();

try {
    producer.send(...);  // ⚠️ 如果这里失败了,比如网络问题
    producer.commitTransaction(); // ❌ 事务状态非法,提交失败
} catch (Exception e) {
    producer.abortTransaction();  // ✅ 你得主动回滚!
}

3. ❌ 你被 Kafka 判定为“被踢出事务”

Kafka 是通过 transactional.id 标识一个事务性的 producer 的,一个 transactional.id 只能在一个 producer 实例中使用
如果你重复使用了这个 ID(比如程序重启未清理),Kafka 会抛:

org.apache.kafka.common.errors.ProducerFencedException

这时,Kafka 会把你当前的事务标记为非法,你必须关闭 producer 实例,否则不能提交也不能继续。

4. ❌ offset 提交失败了

producer.sendOffsetsToTransaction(...);  // 如果这里异常,事务就“坏了”
producer.commitTransaction();            // ❌ 再提交,事务非法

🧠 为什么 Kafka 要这么严格?

因为 Kafka 的事务要保证:

要么全写入、全提交,要么一个字节都不留下。

所以一旦你有步骤失败,它就会保护性地禁止你再提交,以免产生脏数据(比如你写了一半就崩了,还提交 offset,那就“假成功”了)。

✅ 那应该怎么做?

使用事务时,标准模板写法如下:

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    try {
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            // 处理 + 转发
            producer.send(new ProducerRecord<>("output-topic", record.key(), process(record.value())));
        }

        // 把消费 offset 提交到事务中
        Map<TopicPartition, OffsetAndMetadata> offsets = ...;
        producer.sendOffsetsToTransaction(offsets, consumerGroupId);

        producer.commitTransaction();  // ✅ 事务提交

    } catch (Exception e) {
        producer.abortTransaction();  // ❗出现问题,事务回滚
    }
}

✅ 总结一句话

Kafka 中事务“非法” = 你违反了事务的规则(出错、顺序错、异常未处理等),Kafka 把这个事务锁住,不让你再提交,以避免脏数据。

你只要记得:

  • 成功就 commitTransaction()
  • 出错必须 abortTransaction()
  • 所有事务逻辑必须放在 try-catch 中;

就不会踩坑 ✅

🚫 注意事项和限制

  1. 事务有开销
    • 每次事务需要额外的协调和状态管理,吞吐会略低于普通模式。
    • 大量小事务不如少量大事务高效。
  2. 只能配合 Kafka 使用
    • Kafka 的事务不能覆盖外部数据库、Redis 等操作,无法实现跨系统的分布式事务
  3. 事务状态会持久化到内存和日志中
    • 若事务未正常提交或中止,Kafka 会在恢复后重新协调这些事务状态。
  4. 事务 ID 要保持稳定
    • 如果你频繁变更 transactional.id,会导致事务协调器无法追踪事务状态。

🧠 总结一句话

Kafka 中的事务机制提供了跨多个 topic/partition 的消息写入 原子性,配合幂等性和 offset 提交绑定,可实现 精确一次语义(Exactly Once) —— 特别适用于金融、电商、订单系统、数据管道等对一致性要求极高的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2329702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang系列 - 内存对齐

Golang系列-内存对齐 常见类型header的size大小内存对齐空结构体类型参考 摘要: 本文将围绕内存对齐展开, 包括字符串、数组、切片等类型header的size大小、内存对齐、空结构体类型的对齐等等内容. 关键词: Golang, 内存对齐, 字符串, 数组, 切片 常见类型header的size大小 首…

网络原理 - HTTP/HTTPS

1. HTTP 1.1 HTTP是什么&#xff1f; HTTP (全称为 “超文本传输协议”) 是⼀种应用非常广泛的应用层协议. HTTP发展史&#xff1a; HTTP 诞生于1991年. 目前已经发展为最主流使用的⼀种应用层协议 最新的 HTTP 3 版本也正在完善中, 目前 Google / Facebook 等公司的产品已经…

OCC Shape 操作

#pragma once #include <iostream> #include <string> #include <filesystem> #include <TopoDS_Shape.hxx> #include <string>class GeometryIO { public:// 加载几何模型&#xff1a;支持 .brep, .step/.stp, .iges/.igsstatic TopoDS_Shape L…

深度学习入门(四):误差反向传播法

文章目录 前言链式法则什么是链式法则链式法则和计算图 反向传播加法节点的反向传播乘法节点的反向传播苹果的例子 简单层的实现乘法层的实现加法层的实现 激活函数层的实现ReLu层Sigmoid层 Affine层/SoftMax层的实现Affine层Softmax层 误差反向传播的实现参考资料 前言 上一篇…

Linux:页表详解(虚拟地址到物理地址转换过程)

文章目录 前言一、分页式存储管理1.1 虚拟地址和页表的由来1.2 物理内存管理与页表的数据结构 二、 多级页表2.1 页表项2.2 多级页表的组成 总结 前言 在我们之前的学习中&#xff0c;我们对于页表的认识仅限于虚拟地址到物理地址转换的桥梁&#xff0c;然而对于具体的转换实现…

PostgreSQL 一文从安装到入门掌握基本应用开发能力!

本篇文章主要讲解 PostgreSQL 的安装及入门的基础开发能力,包括增删改查,建库建表等操作的说明。navcat 的日常管理方法等相关知识。 日期:2025年4月6日 作者:任聪聪 一、 PostgreSQL的介绍 特点:开源、免费、高性能、关系数据库、可靠性、稳定性。 官网地址:https://w…

WEB安全--内网渗透--LMNTLM基础

一、前言 LM Hash和NTLM Hash是Windows系统中的两种加密算法&#xff0c;不过LM Hash加密算法存在缺陷&#xff0c;在Windows Vista 和 Windows Server 2008开始&#xff0c;默认情况下只存储NTLM Hash&#xff0c;LM Hash将不再存在。所以我们会着重分析NTLM Hash。 在我们内…

8.用户管理专栏主页面开发

用户管理专栏主页面开发 写在前面用户权限控制用户列表接口设计主页面开发前端account/Index.vuelangs/zh.jsstore.js 后端Paginator概述基本用法代码示例属性与方法 urls.pyviews.py 运行效果 总结 欢迎加入Gerapy二次开发教程专栏&#xff01; 本专栏专为新手开发者精心策划了…

室内指路机器人是否支持与第三方软件对接?

嘿&#xff0c;你知道吗&#xff1f;叁仟室内指路机器人可有个超厉害的技能&#xff0c;那就是能和第三方软件 “手牵手” 哦&#xff0c;接下来就带你一探究竟&#xff01; 从技术魔法角度看哈&#xff1a;好多室内指路机器人都像拥有超能力的小魔法师&#xff0c;采用开放式…

从代码上深入学习GraphRag

网上关于该算法的解析都停留在大概流程上&#xff0c;但是具体解析细节未知&#xff0c;由于代码是PipeLine形式因此阅读起来比较麻烦&#xff0c;本文希望通过阅读项目代码来解析其算法的具体实现细节&#xff0c;特别是如何利用大模型来完成图谱生成和检索增强的实现细节。 …

【Redis】通用命令

使用者通过redis-cli客户端和redis服务器交互&#xff0c;涉及到很多的redis命令&#xff0c;redis的命令非常多&#xff0c;我们需要多练习常用的命令&#xff0c;以及学会使用redis的文档。 一、get和set命令&#xff08;最核心的命令&#xff09; Redis中最核心的两个命令&…

微前端随笔

✨ single-spa&#xff1a; js-entry 通过es-module 或 umd 动态插入 js 脚本 &#xff0c;在主应用中发送请求&#xff0c;来获取子应用的包&#xff0c; 该子应用的包 singleSpa.registerApplication({name: app1,app: () > import(http://localhost:8080/app1.js),active…

C++中的浅拷贝和深拷贝

浅拷贝只是将变量的值赋予给另外一个变量&#xff0c;在遇到指针类型时&#xff0c;浅拷贝只会把当前指针的值&#xff0c;也就是该指针指向的地址赋予给另外一个指针&#xff0c;二者指向相同的地址&#xff1b; 深拷贝在遇到指针类型时&#xff0c;会先将当前指针指向地址包…

车载诊断架构 --- 整车重启先后顺序带来的思考

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 周末洗了一个澡,换了一身衣服,出了门却不知道去哪儿,不知道去找谁,漫无目的走着,大概这就是成年人最深的孤独吧! 旧人不知我近况,新人不知我过…

【C++11(下)】—— 我与C++的不解之缘(三十二)

前言 随着 C11 的引入&#xff0c;现代 C 语言在语法层面上变得更加灵活、简洁。其中最受欢迎的新特性之一就是 lambda 表达式&#xff08;Lambda Expression&#xff09;&#xff0c;它让我们可以在函数内部直接定义匿名函数。配合 std::function 包装器 使用&#xff0c;可以…

Windows 10/11系统优化工具

家庭或工作电脑使用时间久了&#xff0c;会出现各种各样问题&#xff0c;今天给大家推荐一款专为Windows 10/11系统设计的全能优化工具&#xff0c;该软件集成了超过40项专业级实用程序&#xff0c;可针对系统性能进行深度优化、精准调校、全面清理、加速响应及故障修复。通过系…

浅谈在HTTP中GET与POST的区别

从 HTTP 报文来看&#xff1a; GET请求方式将请求信息放在 URL 后面&#xff0c;请求信息和 URL 之间以 &#xff1f;隔开&#xff0c;请求信息的格式为键值对&#xff0c;这种请求方式将请求信息直接暴露在 URL 中&#xff0c;安全性比较低。另外从报文结构上来看&#xff0c…

LightRAG实战:轻松构建知识图谱,破解传统RAG多跳推理难题

作者&#xff1a;后端小肥肠 &#x1f34a; 有疑问可私信或评论区联系我。 &#x1f951; 创作不易未经允许严禁转载。 姊妹篇&#xff1a; 2025防失业预警&#xff1a;不会用DeepSeek-RAG建知识库的人正在被淘汰_deepseek-embedding-CSDN博客 从PDF到精准答案&#xff1a;Coze…

C++多线程编码二

1.lock和try_lock lock是一个函数模板&#xff0c;可以支持多个锁对象同时锁定同一个&#xff0c;如果其中一个锁对象没有锁住&#xff0c;lock函数会把已经锁定的对象解锁并进入阻塞&#xff0c;直到多个锁锁定一个对象。 try_lock也是一个函数模板&#xff0c;尝试对多个锁…

垃圾回收——三色标记法(golang使用)

三色标记法(tricolor mark-and-sweep algorithm)是传统 Mark-Sweep 的一个改进&#xff0c;它是一个并发的 GC 算法&#xff0c;在Golang中被用作垃圾回收的算法&#xff0c;但是也会有一个缺陷&#xff0c;可能程序中的垃圾产生的速度会大于垃圾收集的速度&#xff0c;这样会导…