15.Kafka系列之事务原理及实践

news2025/1/9 1:33:29

我们先来回顾下6.Kafka系列之设计思想(四)-消息传递语义中的一些内容

1. 消息传递保证

  • At most once:最多一次。消息可能会丢失,但永远不会重新传递
  • At least once:至少一次。消息永远不会丢失,但可能会重新传递
  • Exactly once:这正是人们真正想要的,每条消息只传递一次
1.1 发布消息的持久性保证

1.发布消息时,我们有消息被“提交”到日志的概念。一旦发布的消息被提交,只要复制该消息写入的分区的一个代理保持“活动”状态,它就不会丢失。如果生产者尝试发布消息并遇到网络错误,则无法确定此错误是发生在消息提交之前还是之后。在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已经成功,则消息可能会在重新发送期间再次写入日志
2.从 0.11.0.0 开始,Kafka 生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目​​。为此,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号对消息进行重复数据删除
3.同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即 要么所有消息都已成功写入,要么都没有

1.2 消费消息时的保证

1.它可以读取消息,然后保存它在日志中的位置,最后处理消息。在这种情况下,消费者进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息还没有被处理。这对应于“至多一次”语义,因为在消费者失败消息的情况下可能不会被处理
2.它可以读取消息、处理消息并最终保存其位置。在这种情况下,消费者进程有可能在处理消息之后但在保存其位置之前崩溃。在这种情况下,当新进程接管它接收到的前几条消息时,它已经被处理过了。这对应于消费者失败情况下的“至少一次”语义。在许多情况下,消息有一个主键,因此更新是幂等的(两次接收相同的消息只是用它自己的另一个副本覆盖记录)
3.那么 exactly once 语义(即你真正想要的东西)呢?从 Kafka 主题消费并生产到另一个主题时(如Kafka Streams 应用程序),我们可以利用上面提到的 0.11.0.0 中的新事务生产者功能。消费者的位置作为消息存储在主题中,因此我们可以在与接收处理数据的输出主题相同的事务中将偏移量写入 Kafka。如果交易被中止,消费者的位置将恢复到它的旧值,并且输出主题的产生的数据将对其他消费者不可见,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,所有消息对消费者都是可见的,即使它们是中止事务的一部分,但在“read_committed”中,消费者将只返回来自已提交事务的消息(以及任何不属于一部分的消息)交易)

2. Kafka幂等性

我们在客户端参数显示设置enable.idempotence=true就会开启生产者幂等消息传递

  • 每个新的生产者实例在初始化时会被分配一个PID(producer id)
  • 对于每个PID,消息发送到的每一个分区都有对应的序列号,序列号从0开始单调递增,生产者每发送一条消息就会将<PID,分区>对应的序列号值加1
  • broker端会在内存中为每一对<PID,分区>维护一个序列号,对于收到的每一条消息,只有当它的序列号的值(SN_new)正好比broker端中维护的对应序列号的值(SN_old)大1,broker才会接收该消息。如果 SN_new < SN_old + 1,说明消息被重复写入,broker会将该消息丢弃。否则,说明中间有数据尚未写入,暗示可能有消息丢失,对应生产者会抛出 OutOfOrderSequenceException 异常

注意:序列号实现幂等只是针对每一对<PID,分区>,即Kafka的幂等性只能保证单个生产者会话(session)中单分区的幂等

3. Kafka事务

通过事务可以弥补幂等性不能跨多个分区的缺陷,且可以保证对多个分区写入操作的原子性

3.1 创建主题与查看分区
kafka-topics.sh --create --topic my-topic --partitions 10 --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

3.2 编写KafkaTransactionDemo代码
/**
 * Kafka事务DEMO
 *
 * @author shenjian
 * @since 2023/5/27
 */
public class KafkaTransactionDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
        // 开启幂等传递
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

        producer.initTransactions();

        try {
            producer.beginTransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
        producer.close();
    }
}
3.3 运行并消费结果
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

# --time -1 表示获取的最新位移值
# --time -2 表示获取的最早的位移值,可能由于最早的数据由于过期被删除,所以最早的位移不一定是0
# 通过两数相减,就可以知道当前分区的数据条数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic  --time -2
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic  --time -1

不放心的小伙伴可以去统计要总数量是否一致奥

通过事务,从生产者角度,Kafka可以保证:

  • 跨生产者会话的消息幂等发送: transactionId与PID一一对应,如果新的生产者启动,具有相同transactionId的旧生产者会立即失效(每个生产者通过 transactionId获取PID的同时,还会获取一个单调递增的 producer epoch)
  • 跨生产者会话的事务恢复: 当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事物要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作(通过 producer epoch判断)

从消费者角度,事务能保证的语义相对偏弱,对于一些特殊的情况,Kafka并不能保证已提交的事务中的所有消息都能被消费:

  • 对采用日志压缩策略的主题,事务中的某些消息可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)
  • 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失

4.Kafka事务实现原理

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的
broker节点有一个专门管理事务的内部主题 __transaction_state,TransactionCoodinator 会将事务状态持久化到该主题中

  1. 查找 TransactionCoordinator:生产者会先向某个broker发送 FindCoordinator 请求,找到 TransactionCoordinator 所在的 broker节点
  2. 获取PID:生产者会向 TransactionCoordinator 申请获取 PID,TransactionCoordinator 收到请求后,会把 transactionalId 和对应的 PID 以消息的形式保存到主题__transaction_state 中,保证 <transaction_Id,PID>的对应关系被持久化,即使宕机该对应关系也不会丢失
  3. 开启事务:调用 beginTransaction()后,生产者本地会标记开启了一个新事务
  4. 发送消息:生产者向用户主题发送消息,过程跟普通消息相同,但第一次发送请求前会先发送请求给TransactionCoordinator 将 transactionalId 和 TopicPartition 的对应关系存储在 __transaction_state 中
  5. 提交或中止事务:Kafka除了普通消息,还有专门的控制消息(ControlBatch)来标志一个事务的结束,控制消息有两种类型,分别用来表征事务的提交和中止。该阶段本质就是一个两阶段提交过程:
  • 将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state
  • 将COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets
  • 将 COMPLETE_COMMIT 或 COMPLETE_COMMIT_ABORT 消息写入主题 __transaction_state

如此一来,表面当前事务已经结束,此时就可以删除主题 __transaction_state 中所有关于该事务的消息

欢迎关注公众号算法小生

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/579024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

rust 初识基础: 变量、数据类型、函数、所有权、枚举

了解到 rust 和 WebAssembly 的结合使用&#xff0c;可以构建前端应用&#xff0c;而且性能也比较好。初步学习使用 rust 是预编译静态类型语言。 安装 rust 官网下载 rust-CN , 大致了解下为什么选择&#xff1a;高性能、可靠性、生产力。 打开控制台啊&#xff0c;执行安装…

【Servlet】

目录 &#x1f382;1. 第一个 Servlet 程序&#xff1a;使用 Servlet 写 hello world &#x1f95e;1.1 创建项目 &#x1f373;1.2 引入依赖 &#x1f383;1.3 创建目录 &#x1f358;1.4 开始写代码 &#x1f30d;1.5 打包代码 &#x1f364;1.6 部署 &#x1f451;1…

如何在华为OD机试中获得满分?Java实现【获取最大软件版本号】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

使用kotlin用回溯法解决电话号码的字母组合问题

17. 电话号码的字母组合 难度中等 2474 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#…

23种设计模式中之中介者模式(Mediator Pattern)

前言&#xff1a;大家好&#xff0c;我是小威&#xff0c;24届毕业生&#xff0c;在一家满意的公司实习。本篇文章将23种设计模式中的迭代器模式&#xff0c;此篇文章为一天学习一个设计模式系列文章&#xff0c;后面会分享其他模式知识。 如果文章有什么需要改进的地方还请大佬…

运维工程师面试总结(含答案)

运维工程师面试总结 原文链接&#xff1a;https://www.cuiliangblog.cn/detail/article/2 一、linux 1. linux系统启动流程 第一步&#xff1a;开机自检&#xff0c;加载BIOS第二步&#xff1a;读取&#xff2d;&#xff22;&#xff32;第三步&#xff1a;Boot Loader grub…

uni-app常用场景速查记录

记录一下uniapp开发过程中遇到的问题场景,方便后期查看. 1.elementUI中textarea文本如何设置换行显示 2.uniapp中实现文字滚动显示 3.下拉刷新和触底分页查询 1.elementUI中textarea文本设置换行显示 el-input标签中type为textarea中录入的文本内容,在表格中显示…

关键词搜索1688商品数据采集、1688商品列表数据接口

1688&#xff1a;指中国最大的电子商务综合平台&#xff0c;类似于美国的亚马逊。 关键词&#xff1a;是用于描述检索文档或记录的词语或短语&#xff0c;通常是用户输入的查询信息。 搜索&#xff1a;是在数据库、网页搜索引擎或其他信息存储库中查找信息内容的过程。 商品数…

springboot+springsecurity+jwt+elementui图书管理系统

​​图书管理系统​​ 关注公号&#xff1a;java大师&#xff0c;回复“图书”&#xff0c;获取源码 一、springboot后台 1、mybatis-plus整合 1.1添加pom.xml <!--mp逆向工程 --><dependency><groupId>org.projectlombok</groupId><artifactId&…

腾讯云服务器可用区是什么意思?可用区详细说明

腾讯云服务器可用区什么意思&#xff1f;可用区&#xff08;Zone&#xff09;是指腾讯云在同一地域内电力和网络互相独立的物理数据中心&#xff0c;一个可用区故障不会影响另一个可用区的正常运行&#xff0c;所以可用区用于构建高容灾、高可靠性应用。腾讯云服务器网来详细说…

华为OD机试真题B卷 Java 实现【食堂供餐】,附详细解题思路

一、题目描述 某公司员工食堂以盒饭的方式供餐。 为将员工取餐排队时间降为0&#xff0c;食堂的供餐速度必须要足够快。 现在需要根据以往员工取餐的统计信息&#xff0c;计算出一个刚好能达到排队时间为0的最低供餐速度。 即&#xff0c;食堂在每个单位时间内必须至少做出…

分布式事务的21种武器 - 7

在分布式系统中&#xff0c;事务的处理分布在不同组件、服务中&#xff0c;因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式&#xff0c;并分析其实现原理和优缺点&#xff0c;在面对具体分布式事务问题时&#xff0c;可以选择合适的模式…

Rocketmq学习之路(一)从生产上的问题引出MQ

前言&#xff1a; 从来没有真正弄明白mq是什么&#xff0c;只知道他有消峰&#xff0c;异步&#xff0c;解耦的作用。但是在日常开发工作中&#xff0c;就是简单的生产者发送消息&#xff0c;消费者接受消息。所以&#xff0c;从今天开始。我要吃掉这个技术。 一.这该死的订单…

word解决文字与公式mathtype不对齐

修改字体和段落里面的这两个。

每日学术速递5.23

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CL 1.Tree of Thoughts: Deliberate Problem Solving with Large Language Models 标题&#xff1a;思想树&#xff1a;用大型语言模型有意识地解决问题 作者&#xff1a;Shunyu Yao, …

每日学术速递5.24

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.Reprompting: Automated Chain-of-Thought Prompt Inference Through Gibbs Sampling 标题&#xff1a;重新提示&#xff1a;通过 Gibbs 采样的自动思维链提示推理 作者&#xff1…

《java核心卷Ⅰ》读书笔记

&#x1f6eb; JDK和JRE傻傻分不清?&#x1f6eb; HelloWorld的输出都经历了啥&#xff1f;&#x1f6eb; Java的三个版本都是啥&#xff1f;&#x1f6eb; 关于main方法你都知道啥&#xff1f;main方法被声明为private会怎样&#xff1f;&#x1f6eb; 强制and自动类型转换都…

数据结构基础内容-----第四章 栈与队列

文章目录 栈栈的定义站的抽象数据类型两栈共享空间栈的作用递归的定义 栈运算 队列循环队列队列链式存储结构及实现 栈 栈的定义 栈&#xff08;Stack&#xff09;是计算机科学中的一种抽象数据类型&#xff0c;它是一个只能在一端进行插入和删除操作的线性数据结构。栈按照后…

tomcat what

tomcat是什么 对于tomcat是什么有什么作用。曾经看到一个大神是这样解释tomcat的&#xff0c;现在分享给大家 内容大体是&#xff1a; 我家有一台机器&#xff0c;可以把石头变成金子。你快递给我一箱石头&#xff0c;让我把它们变成一箱金子再快递给你。 这个机器就是web项…

【Python从入门到进阶】21、爬虫相关概念介绍

接上篇《20、HTML页面结构的介绍》 上一篇我们正式进入了Python爬虫的实战教程&#xff0c;主要讲解了要爬取的HTML页面的结构。本篇我们来介绍爬虫的相关概念。 一、什么是互联网爬虫 如果我们把互联网比作一张大的蜘蛛网&#xff0c;那一台计算机上的数据便是蜘蛛网上的一个…