RocketMQ 事务消息发送

news2024/12/24 21:36:08

目录

事务消息介绍

应用场景

功能原理

使用限制

使用示例

使用建议​


事务消息介绍

在一些对数据一致性有强需求的场景,可以用 RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

应用场景

分布式事务的诉求

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。

  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。

  • 积分系统状态变更:变更用户积分,更新用户积分表。

  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足

为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难

将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:

  • 消息发送成功,订单没有执行成功,需要回滚整个事务。

  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。

  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

基于RocketMQ分布式事务消息:支持最终一致性

上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。

而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

Apache RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势。具体事务消息的原理和流程,请参见下文的功能原理。

功能原理

什么是事务消息

事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

  1. 生产者将消息发送至RocketMQ服务端。

  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

 

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.typeTRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
    private static boolean checkOrderById(String orderId) {
        return true;
    }
    //演示demo,模拟本地事务的执行结果。
    private static boolean doLocalTransaction() {
        return true;
    }
    public static void main(String[] args) throws ClientException {
        ClientServiceProvider provider = new ClientServiceProvider();
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
        Producer producer = provider.newProducerBuilder()
                .setTransactionChecker(messageView -> {
                    /**
                     * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                     * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                     */
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
                        // 错误的消息,直接返回Rollback。
                        return TransactionResolution.ROLLBACK;
                    }
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            return;
        }
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

使用建议​

避免大量未决事务导致超时

Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

  • 程序能正确识别正在进行中的事务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1045651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】深度剖析最优建堆及堆的经典应用 - 堆排列与topk问题

&#x1f6a9;纸上得来终觉浅&#xff0c; 绝知此事要躬行。 &#x1f31f;主页&#xff1a;June-Frost &#x1f680;专栏&#xff1a;数据结构 &#x1f525;该文章分别探讨了向上建堆和向下建堆的复杂度和一些堆的经典应用 - 堆排列与topk问题。 ❗️该文章内的思想需要用到…

排序学习总结

取每个对象的内接矩形框&#xff0c;然后再排序&#xff0c;根据排序的结果确定原对象顺序。 inner_rectangle1(RegionAffineTrans1, Row1, Column1, Row2, Column2) gen_rectangle1(Rect,Row1, Column1, Row2, Column2) sort_region(Rect,RectSort,character,true, row)count…

Oracle物化视图(Materialized View)

与Oracle普通视图仅存储查询定义不同&#xff0c;物化视图&#xff08;Materialized View&#xff09;会将查询结果"物化"并保存下来&#xff0c;这意味着物化视图会消耗存储空间&#xff0c;物化的数据需要一定的刷新策略才能和基表同步&#xff0c;在使用和管理上比…

【嵌入式】使用嵌入式芯片唯一ID进行程序加密实现

目录 一 背景说明 二 原理介绍 三 设计实现 四 参考资料 一 背景说明 项目程序需要进行加密处理。 考虑利用嵌入式芯片的唯一UID&#xff0c;结合Flash读写来实现。 加密后的程序&#xff0c;可以使得从芯片Flash中读取出来的文件&#xff08;一般为HEX格式&#xff09;不能…

C#WPF命令Command使用实例

本文实例演示C#WPF命令使用实例 定义: 命令(Command):命令表示一个任务单元,并且可跟踪该任务的状态,实际上是实现了ICommand接口的类。然而,命令实际上可以包括任务执行的逻辑代码,也可以不包括从而仅作为联系命令源与命令目标的媒介。比如,WPF 默认的接口实现类Ro…

el-collapse 嵌套中 el-checkbox作为标题,选中复选框与el-tree联动

<el-drawertitle"应用授权":visible.sync"menuDrawer"><el-collapse accordion style"padding: 15px"><el-collapse-item v-for"item in platList"><template slot"title"><el-checkbox v-model…

PostMan、ApiFox等工具Post请求中@RequestParam和@RequestBody的混合使用如何传参

方法签名 PostMapping("/mms/sendAudit")public R sendAudit(RequestParam("mmsId") Long mmsId,RequestParam("ecId") Long ecId,RequestBody(required false) SignMatchRule signMatchRule) {以ApiFox为例子 RequestParam的Params的参数正常…

基于51单片机0.001s精度秒表9.999s仿真设计(源码+仿真+原理图+PCB+报告+器件清单+讲解)

基于51单片机0.001s精度秒表9.999s仿真设计&#xff08;源码仿真原理图PCB报告器件清单讲解 讲解视频1 功能说明&#xff1a;2 仿真电路&#xff1a;3 原理图&#xff1a;4 PCB&#xff1a;5 程序&#xff1a;6 资料清单&&下载链接&#xff1a; 基于51单片机0.001s精度…

2023年【煤气】报名考试及煤气免费试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 煤气报名考试根据新煤气考试大纲要求&#xff0c;安全生产模拟考试一点通将煤气模拟考试试题进行汇编&#xff0c;组成一套煤气全真模拟考试试题&#xff0c;学员可通过煤气免费试题全真模拟&#xff0c;进行煤气自测…

什么是图像翻译

域(Domain)&#xff1a;一系列具有相同风格的图像集合。 图像翻译(Image Translation)&#xff1a;从一张图像到另一张图像的变换&#xff0c;也是域迁移。 &#xff08;风格迁移、图像上色、图像分割...&#xff09; 图像翻译GAN模型分类 根据作用区域&#xff1a;…

Qt学习_13_可执行文件.exe添加图标/logo

本文简单记录一下如何给Qt生成的exe&#xff08;可执行&#xff09;文件&#xff0c;添加图标/logo 第一步 去选一个你喜欢的图标&#xff0c;下载下来 ByteDance IconPark (oceanengine.com) iconfont-阿里巴巴矢量图标库 第二步 用第一步下载的图片&#xff0c;在线生成一…

python 绘制 graphviz

dot 绘图 python 绘制 graphviz 环境 上一节中在本地安装了 graphviz&#xff0c; python 要想使用还需安装 pip 包 pip install graphvizpython 使用 dot Digraph(comment"My Graph") # 添加一些节点 dot.node("A", "Node A") dot.node(&q…

杭州亚运会开幕式惊现数字人火炬手,动捕设备迸发动画制作新动能

在第十九届亚运会开幕式上&#xff0c;首次出现了“数字人”点火形式&#xff0c;打造了亚运史上首个数字点火仪式&#xff0c;这种点火方式是一种颠覆性创作的同时&#xff0c;这也是裸眼3D技术、现实增强和AI人工智能技术的完美结合。 此次数字火炬手的背后是采用了动捕设备&…

九、完整打印立方体贴图的一个面

从上一节可以看出&#xff0c;打印出来的图片是有背景色的&#xff0c;也就是摄像机位置不对。那应该放在哪里呢&#xff1f; 答案是&#xff1a;给定投影矩阵的 fov 为 90 度以捕捉整个面&#xff0c;且摄像机距离该面的距离是立方体边长的一半。 即、 这里我用的立方体是长度…

it运维监控运维方案主要应用在哪些场景

公司越来越依赖IT基础设施。为了保证业务的高效运行&#xff0c;IT运维监控已经成为公司不可或缺的一部分。本文将详细介绍IT运维监控方案&#xff0c;以及如何优化运维效率&#xff0c;并将其应用于各种场景。 IT运维监控方案 监视系统 监控系统是IT运维监控的基础&#xff…

代码随想录 Day6 哈希 LeetcodeT454 四数之和II T383赎金信 T15 三数之和 T18 四数之和

本文代码思路来源于: 代码随想录 前言 希望大家在刷这部分题的时候先熟悉熟悉哈希结构的基本常用api,比较方便理解. LeetCode T454 四数之和 题目链接:454. 四数相加 II - 力扣&#xff08;LeetCode&#xff09; 题目思路 暴力解法仍然是遍历四个数组解决此题, 哈希的思路有…

干货分享 | TSMaster—CCP/XCP标定功能详解

众所周知&#xff0c;CCP是CAN Calibration Protocol CAN 标定协议的缩写&#xff0c;XCP是Universal Measurement and Calibration Protocol 通用测量与标定协议的缩写。二者都普遍使用于开发、测试和车载标定&#xff0c;由ASAM&#xff08;自动化和测量系统标准化协会&#…

k8s--storageClass自动创建PV

文章目录 一、storageClass自动创建PV1.1 安装NFS1.2 创建nfs storageClass1.3 测试自动创建pv 一、storageClass自动创建PV 这里使用NFS实现 1.1 安装NFS 安装nfs-server&#xff1a; sh nfs_install.sh /mnt/data03 10.60.41.0/24nfs_install.sh #!/bin/bash### How to i…

STM32cubeIDE 更改Repository folder

使用STM32CubeIDE时&#xff0c;会调用STM32CubeMX&#xff0c;但是这两个软件下载的更新包都放在C:/user/目录下面&#xff0c;而且文件很大&#xff0c;用不了多久就会把C盘填满&#xff0c;所以刚开始安装的时候就要把更新目录更换掉。具体更换方法如下&#xff1a; Window…

为您的视频编辑应用添加动力,美摄视频剪辑SDK

在当今的数字化时代&#xff0c;视频已经成为了最受欢迎的媒体形式之一。无论是社交媒体平台&#xff0c;还是在线教学站点&#xff0c;甚至是商业广告&#xff0c;都离不开视频的支持。而在这个领域&#xff0c;美摄视频剪辑SDK无疑是您的最佳选择。它不仅功能强大&#xff0c…