从0开始学习 RocketMQ:分布式事务消息的实现

news2025/1/23 6:12:35

消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。

应用场景

比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。

由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤,所以使用消息队列来异步清理购物车是更合理的设计。

在这里插入图片描述

对于订单系统来说,它做了两件事情

  1. 在订单库中插入了一条订单数据,创建了订单;
  2. 给 MQ 发送了一条订单消息。

对于购物车系统来说,它做了一件事情

  1. 接收订单消息,删除购物车库中的商品,清理购物车。

在分布式系统中,上面的这几个步骤,都有可能失败,如果失败了不做处理的话,就会造成订单数据和购物车数据不一致的情况。

比如:

  1. 创建了订单,没有清理购物车;
  2. 购物车中的商品清掉了,订单没有创建成功。

所以,我们需要做的就是,要保证在任何步骤失败的情况下,订单数据和购物车数据的一致性。

对于购物车系统,失败的处理比较简单,只有成功删除商品后再提交消费确认,如果发生失败,因为没有提交消费确认,消息队列会重试。

所以,问题的重点在于,怎么保证订单系统创建订单和发送消息的步骤,要么都成功,要么都失败,不能一个成功一个失败。

分布式事务

消息队列是如何实现分布式事务的?就要用到事务消息了。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

在这里插入图片描述半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

在上面的步骤中,如果第 4 步提交事务消息失败了(比如网络异常),怎么办?

对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。

  • Kafka :简单粗暴,直接抛出异常,让用户自行处理。可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿;
  • RocketMQ:事务反查机制。

RocketMQ方案

在 RocketMQ 的分布式事务实现中,增加了事务反查机制来解决事务消息提交失败的问题。

如果订单系统在第 4 步提交或回滚事务消息失败(如网络异常),Broker 迟迟没有收到提交或回滚的消息,Broker 会定期去订单系统上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

所以,订单系统需要提供一个反查本地事务状态的接口,即根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

在这里插入图片描述

使用限制

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

使用建议

避免大量未决事务导致超时

RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
  • 程序能正确识别正在进行中的事务。

使用示例

创建事务主题

sh bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION

生产者代码

模拟正常流程,本地事务成功提交

public class ProducerTransactionExample {


    public static void main(String[] args) throws Exception {
        String endpoint = "182.92.198.60:8080";
        String topic = "TransactionTopic";

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        builder.setRequestTimeout(Duration.ofSeconds(20));
        ClientConfiguration configuration = builder.build();

        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .setTransactionChecker(messageView -> {
                    System.out.println("5.broker回查事务状态");

                    String orderId = messageView.getProperties().get("orderId");

                    if (Strings.isNullOrEmpty(orderId)) {
                        return TransactionResolution.ROLLBACK;
                    }

                    if (checkOrderById(orderId)) {
                        System.out.println("7.本地事务状态成功,提交消息");
                        return TransactionResolution.COMMIT;
                    } else {
                        System.out.println("7.本地事务状态失败,回滚消息");
                        return TransactionResolution.ROLLBACK;
                    }

                })
                .build();


        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
            System.out.println("1.开启事务");
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            System.out.println("1.事务开启失败");
            return;
        }


        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("transaction")
                .addProperty("orderId", "o10086")
                // 消息体。
                .setBody(("测试事务消息,订单号o10086").getBytes())
                .build();


        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
            System.out.println("2.半消息发送成功,messageId:" + sendReceipt.getMessageId());
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            System.out.println("2.半消息发送失败");
            return;
        }

        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
                System.out.println("4.commit事务消息");
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
                System.out.println("4.commit事务消息失败");
            }
        } else {
            try {
                transaction.rollback();
                System.out.println("4.rollback事务消息");
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
                System.out.println("4.rollback事务消息失败");
            }
        }
    }

    /**
     * 模拟本地事务的执行结果
     *
     * @return
     */
    private static boolean doLocalTransaction() {
        System.out.println("3.执行本地事务,处理中");
        try {
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("3.执行本地事务成功,提交事务");
        return true;
    }

    /**
     * 模拟本地事务反查
     *
     * @param orderId
     * @return
     */
    private static boolean checkOrderById(String orderId) {
        System.out.println("6.反查本地事务状态,订单号:" + orderId + "能查到");
        return true;
    }
}

在这里插入图片描述
消费端在第4步后可以消费到消息。

在这里插入图片描述

模拟异常流程,将第4步提交/回滚的代码注释掉
在这里插入图片描述
消费端在第7步后可以消费到消息。

在这里插入图片描述

设置第一次事务回查时间
CHECK_IMMUNITY_TIME_IN_SECONDS 属性定义了从事务消息发送到 Broker 后,Broker 在多长时间内不会对这条消息发起回查。这个时间窗口为生产者提供了一个缓冲期,以确保即使在网络延迟或短暂的服务中断情况下,事务消息也不会被过早地回查。

Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("transaction")
                .addProperty("orderId", "o10086")
                .addProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", "300")
                // 消息体。
                .setBody(("测试事务消息,订单号o10086").getBytes())
                .build();

在这里插入图片描述
消费端消费

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2134834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redhat 7,8系(复刻系列) 一键部署Oracle21-xe rpm

Oracle21c-xe前言 无论您是开发人员、DBA、数据科学家、教育工作者,还是仅仅对数据库感兴趣,Oracle Database Express Edition (XE) 都是理想的入门方式。它是全球企业可依赖的强大的 Oracle Database,提供简单的下载、易于使用和功能齐全的体验。您可以在任何环境中使用该…

orangepi部署web环境

orangepi web环境搭建 mysql安装mysql卸载 FTP安装java安装tomcat安装Maven配置 mysql安装 查看MySQL安装包 接下来可以使用以下命令安装MySQL服务器: 安装MySQL 8.0 # 安装最新版本 sudo apt install -y mysql-server # 安装指定版本 sudo apt install -y mysql…

Mystic 会是 Midjourney 的终结者吗?

作者:老余捞鱼 原创不易,转载请标明出处及原作者。 写在前面的话: 这两年来 Midjourney 一直是互联网上最好的人工智能图像生成器。它制作了一些我们所见过的最流行和最具争议的 AI 图像,Midjourney 无与伦比的快速连贯性和照片级真实感使其领先于 OpenAI、谷歌和亚…

喜报!普罗格入选软件百强企业榜单

金秋九月,一个象征着成熟与收获的季节,普罗格迎来了发展历程中的又一里程碑。近期,普罗格以稳健的发展和创新的技术入选武汉软件百强企业榜单。在面对2024年复杂经营环境挑战的背景下,这一荣誉更显珍贵,彰显了武汉企业…

MySQL迁移达梦报错,DMException: 第1 行附近出现错误: 无效的表或视图名[ACT_GE_PROPERTY]

达梦数据库选好模式和登录用户,迁移时的目标模式名要和达梦的当前登录的用户名相同,否则查询的时候需要“form 模式名.表名”,只from表名就会报表不存在的错误。

单机快速部署开源、免费的分布式任务调度系统——Apache DolphinScheduler

本文主要为大家介绍Apache DolphinScheduler的单机部署方式,方便大家快速体验。 环境准备 需要Java环境,这是一个老生常谈的问题,关于Java环境的安装与配置期望大家都可以熟练掌握。 验证java环境 java -version 下载安装包并解压 使用wg…

深度挖掘| 如何高效实现Cloudera 安装之基础环境搭建

Cloudera Manager是CDH市场领先的管理平台。它以其强大的数据管理和分析能力,帮助企业能够轻松驾驭海量数据,实现数据的实时分析与洞察。 作为业界第一的端到端 Apache Hadoop 的管理应用,Cloudera Manager对CDH的每个部件都提供了细粒度的可…

ROS2 std_msg 报错!

编译ros2时候报错如下: CMake Error at CMakeLists.txt:11 (find_package): By not providing "Findstd_msg.cmake" in CMAKE_MODULE_PATH this project has asked CMake to find a package configuration file provided by "…

80V转5V4A同步降压WT6037

80V转5V4A同步降压WT6037 WT6037 被定义为一款高压同步降压转换器,其设计可在 10V 至 90V 的宽泛工作电压区间内稳定运行。该转换器尤其适用于需承受宽电压输入范围的电池组系统,诸如 12V 至 72V 的电池组,以及 60V 至 90V 的降压应用场景。…

Python编码系列—Python建造者模式:构建复杂对象的优雅之道

🌟🌟 欢迎来到我的技术小筑,一个专为技术探索者打造的交流空间。在这里,我们不仅分享代码的智慧,还探讨技术的深度与广度。无论您是资深开发者还是技术新手,这里都有一片属于您的天空。让我们在知识的海洋中…

Spring-cloud-gateway报错问题总结

1. 访问接口出现 There was an unexpected error (typeService Unavailable, status503).Unable to find instance for order 假设我们有服务 spring-appication-name: order 但命名路由id 也为order 就会出现这类错误 因为 gateway 有默认路由

Mysql | 知识 | 理解是怎么加锁的

文章目录 一、怎么加行级锁的?二、唯一索引加锁2.1 唯一索引等值查询1、记录存在的情况2、记录不存在的情况 2.2 唯一索引范围查询a. 针对「大于」的范围查询b. 针对「大于等于」的范围查询的情况。c. 「小于」范围查询,记录「不存在」表中的情况d. 「小…

JMeter压力测试

下载地址 第一步:修改配置,并启动软件 进入bin目录,修改jmeter.properties文件中的语言支持为language=zh_CN,然后点击jmeter.bat 启动软件。 第二步:添加线程组 第三步:添加Http取样

STL之Vector容器

容器 容器的分类 序列式容器(Sequence containers) - 每个元素都有固定位置--取决于插入时机和地点和元素值无关 - vector、deque、list、stack、queue 关联式容器(Associated containers) - 元素位置取决于特定的排序准则,和插入顺序无关 - set、multiset、ma…

ARADEX伺服驱动器电源维修G565 D565/60 M5ref

伺服驱动器维修常见故障:无显示、缺相、过流、过压、欠压、过热、过载、接地、参数错误、有显示无输出、模块损坏。我们本着诚信待人的宗旨,凭借娴熟的技术和丰富的维修经验,为国内外诸多企业修了各种不同的伺服电机、驱动器和电源。 短路保…

SpringBoot - 广场舞团

专业团队,咨询就送开题报告,欢迎留言私信 摘 要 随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各…

Web 安全基础教程:从零基础入门到精通

一、Web 安全概述 (一)Web 安全的定义与重要性 1.定义 Web 安全是指保护 Web 应用程序免受各种网络威胁,确保 Web 服务的保密性、完整性和可用性。在当今数字化时代,Web 应用广泛存在于各个领域,从电子商务到社交媒…

Vue2 qrcode+html2canvas 实现二维码的生成和保存

1.安装 npm install qrcode npm install html2canvas 2.引用 import QRCode from qrcode import html2canvas from html2canvas 效果&#xff1a; 1. 二维码生成&#xff1a; 下载二维码图片&#xff1a; 二维码的内容&#xff1a; 实现代码&#xff1a; <template>…

Linux进程优先级

&#x1f4dd;目录 &#x1f31f; 查看进程信息&#x1f31f; PRI and NI 风过无痕 忘川如斯 如日方升 策引千问 &#x1f31f; 查看进程信息 ps -l命令 UID : 代表执行者的身份PID : 代表这个进程的代号PPID &#xff1a;代表这个进程是由哪个进程发展衍生而来的&#xff0c…

二维码模组扫码器C#实现串口自动监听功能

C# Demo&#xff0c;调用二维码模块的tx_windows_hidpos.dll扫码库&#xff0c;支持QR-M20 、QR-M10、QR-M30等二维码型号。 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Text; using…