实现分布式事务的新标杆:RocketMQ的全面解析与应用指南

news2025/1/11 5:12:16

在分布式系统中,实现事务的一致性和可靠性是一项重要的挑战。本文将详细介绍如何利用 RocketMQ 的半消息机制来实现分布式事务,并提供具体的代码示例和最佳实践。

1. 引言

在分布式系统中,事务处理是一项复杂而关键的任务。传统的 ACID 事务难以跨多个服务和数据库进行操作。RocketMQ 是一个分布式消息中间件,通过其半消息机制,我们可以实现分布式事务的一致性和可靠性。

2. RocketMQ 半消息概述

2.1 半消息的定义

使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

2.2 半消息的工作原理

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  2. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::

事务消息回查步骤如下: 7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

3. 示例代码和最佳实践

3.1 RocketMQ 事务生产者配置

首先,配置 RocketMQ 事务生产者的相关参数,包括 nameserver 地址、生产者组、事务监听器等。

TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();

3.2 事务消息的发送

在发送事务消息时,需要使用 TransactionSendResult 的 sendMessageInTransaction 方法,并指定一个实现了 TransactionListener 接口的类。

Message message = new Message("transaction_topic", "transaction_tag", "Transaction Message".getBytes());
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);

3.3 事务监听器的实现

在实现 TransactionListener 接口的类中,需要编写本地事务逻辑和消息回查逻辑。

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务逻辑
            // 在这里编写执行本地事务的代码,包括数据库操作、服务调用等。

            // 本地事务成功,返回 COMMIT_MESSAGE
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败,返回 ROLLBACK_MESSAGE
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 消息回查逻辑
        // 在这里编写消息回查的代码,根据本地事务的状态返回 COMMIT_MESSAGE、ROLLBACK_MESSAGE 或 UNKNOW。
    }
}

3.4 消费者的消息确认

在消费者端,接收到消息后,需要根据本地事务的状态进行确认。

public class MessageListenerImpl implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息
            // 在这里编写处理消息的代码,包括业务逻辑的执行等。

            // 根据本地事务状态确认消息
            if (transactionState == LocalTransactionState.COMMIT_MESSAGE) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } else if (transactionState == LocalTransactionState.ROLLBACK_MESSAGE) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

4. 注意事项

在使用 RocketMQ 的半消息实现分布式事务时,需要注意以下几点:

  • 设置合适的回查间隔和次数:根据业务需求和系统性能,设置合理的消息回查间隔和次数,以保证事务的最终一致性。
  • 处理消息重复问题:由于消息回查机制的存在,可能会出现消息重复的情况。在消费者端,需要考虑如何处理重复消息,以避免对业务造成影响。
  • 保证消息的幂等性:在消息的处理过程中,需要保证消息的幂等性,以防止重复处理已经成功的消息。
  • 监控和报警:建立合适的监控和报警机制,及时发现和处理异常情况,保证系统的稳定性和可靠性。

5. 总结

本文详细介绍了如何使用 RocketMQ 的半消息机制实现分布式事务。通过发送半消息、执行本地事务逻辑、消息回查和消息确认,可以保证消息的可靠处理和一致性。同时,提供了具体的代码示例和最佳实践,帮助读者更好地理解和应用 RocketMQ 的半消息机制。

使用 RocketMQ 的半消息实现分布式事务可以有效解决传统事务在分布式系统中的挑战,并提高系统的可靠性和一致性。在实际应用中,需要根据具体场景进行适当的调整和优化,以满足系统的需求。

=================================

如果文章对你有帮助,请不要忘记加个关注、点个赞!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringCloud微服务毕业论文管理系统设计与实现

一、概述 1.1 课题背景及意义 随着学校不断扩大和学生人数的猛增,关于各类教学信息也越来越多。毕业论文的管理也成为了不可避免的一道关卡,学生需要及时获取论文相关进度,学校的管理者要求能方便对论文进行处理。基于这些需求,开发一个实用的微服务管理系统,以满足双方…

13-使用调度框架quartz,为系统增加定时调度功能

1 、创建后端模块batch 1.1、创建maven项目 1.2、pom.xml文件中导入依赖 <dependencies><!-- common--><dependency><groupId>com.wei</groupId><artifactId>common</artifactId></dependency><!-- 热部署…

电磁兼容常用测量单位及转换关系 dB、dBm、dBw、dBμV、dBmV、dBV、dBA等单位介绍

目 录 摘要分贝简介分贝在EMC测试中电磁兼容常用测量单位常用线性单位的对数转换单位间转换基于50Ω额定阻抗的电压、电流和功率单位间的换算基于50Ω额定阻抗的场强单位间的换算 单位换算的应用参考文献总结 摘要 为获得更大的相对幅度显示范围&#xff0c;电磁兼容测试常使用…

Graalvm Native Image 元数据适配

Graalvm Native Image 元数据适配 本文章主要叙述在 Java 应用适配 Graalvm Native Image 中的步骤和遇到的一些问题&#xff01;因为 Graalvm 官方文档相关概念叙述过于简单。基本靠问才能知道些许有用信息。所以写此文章。 关于 Graalvm 基础知识的相关学习&#xff0c;可以…

部署 LAMP平台Linux,Apache,MySQL ,PHP源码编译安装

目录 一、.LAMP简介与概述 1.LAMP平台概述 2. 构建LAMP平台顺序 3. 编译安装的优点 4. 各组件作用 5. 数据流向 二、编译安装Apache httpd服务 1.关闭防火墙&#xff0c;将安装Apache所需软件包传到/opt目录下 2.安装环境依赖包 3.配置软件模块 4.编译及安装 5.优…

chatgpt赋能python:Python获取父类:探究继承关系的技巧

Python获取父类&#xff1a;探究继承关系的技巧 Python是一种高级编程语言&#xff0c;它被广泛用于Web开发、数据分析、人工智能等领域。Python的面向对象编程是其特色之一。在面向对象编程中&#xff0c;我们经常需要使用继承关系来定义不同类之间的关系。在这篇文章中&…

小程序学习(一):基本知识点笔记

1.小程序与普通网页开发的区别 1.运行环境不同 网页运行在浏览器环境中 小程序运行在微信环境中 2.API不同 由于运行环境的不同,小程序无法调用DOM和BOM的API。 但是,小程序中可以调用微信环境提供的各种API,例如:地理定位、扫码、支付... 3.开发模式不同 网页的开发模式:浏览器…

C语言笔记-4 输入输出

目录 输入输出简单举例scanf()getchar()putchar()puts()gets() 标准文件%d 格式化输出整数%f 格式化输出浮点型数据 getchar() & putchar() 函数gets() & puts() 函数scanf() 和 printf() 函数 输入输出 简单举例 scanf() 空格、回车也是字符&#xff0c;下面情况也…

APP盲盒系统开发前端后台详细功能讲解

一、栏目 功能 说明 登录注册 注册 输入手j号&#xff0c;获取验zm&#xff0c;输入密m 密码登录 手机号密码 忘记密码 输入手j号&#xff0c;获取y证m&#xff0c;输入新m码 底部导航 开盲盒、寄售中心、商城、个人中心 开盲盒 显示盲盒 可滑动更换其他盲盒 立即开盒 点…

从程序员到架构师——数据持久化层场景

全文摘自&#xff1a;从程序员到架构师&#xff08;王伟杰著&#xff09; 购买链接&#xff1a;https://item.jd.com/13626926.html 程序员之间的能力差异在哪里&#xff1f;如果是学技术&#xff0c;大家可以阅读同样的书籍和网络文章&#xff0c;为什么还会造成最终专业能力的…

SC7515运算放大器(OPA)可pin对pin兼容AD8138

对于一般运算放大器&#xff0c;SC7515 在差分信号处理方面获得了巨大进步。SC7515 即可以用作单端至差分放大器或也可以差分至差分放大器&#xff0c;像运算放大器一样易于使用&#xff0c;并且大大简化了差分信号放大与驱动。可pin对pin兼容AD8138。该放大器输入噪声低、-3 d…

Spring之BeanFactory与ApplicationContext区别、实例化Bean的三种⽅式、延迟加载(lazy-Init )

Spring IoC进阶 IOC之BeanFactory与ApplicationContext区别启动 IoC 容器的方式 实例化Bean的三种⽅式使用无参构造方法实例化工厂静态方法实例化工厂实例方法实例化 Spring IOC之延迟加载(lazy-Init )Bean的延迟加载&#xff08;延迟创建&#xff09;应用场景 IOC之BeanFactor…

案例分享|梅雨季的机房湿度问题如何解决?

6月底&#xff0c;和Q3的KPI一起赶来的是南方的梅雨季。 持续不断的降雨&#xff0c;使空气湿度一度高达75%-80%&#xff0c; 南方人正式开启高温酷暑下的桑拿模式。 高温高湿除了带来体感不适&#xff0c;还会导致心情抑郁、烦燥、易疲倦...... 毕竟&#xff0c;与此强相关…

Sentinel spring的全局异常处理器,导致熔断规则(异常数规则)失效解决方案

最近在使用sentinel过程中发现&#xff0c;如果使用springboot的RestControllerAdvice全局异常捕获&#xff0c;那么设置sentinel的异常数熔断规则就会失效&#xff0c;去github上看sentinel的Issues发现已经有人提过这问题&#xff0c;官方也是没有正面回复 官方文档更不用说&…

matlab之table Excel对大量数据的缺失

Excel表格对于数据量的限制 ####### matlab之table table好处&#xff0c;可完整保留导入数据&#xff0c;不限于数值 matlab中table的切片等操作

C/C++编程安全标准GJB-8114解读——声明定义类

软件检测实验室在建立软件测试体系或申请cnas/cma相关资质时&#xff0c;需要依据相关标准&#xff0c;使用有效的方法开展检验检测活动&#xff0c;GJB-8114是一部嵌入式软件安全测试相关的国家标准&#xff0c;本系列文章我们就针对GJB-8114《C/C语言编程安全子集》的具体内容…

js逆向-md5加密算法逆向-案例网站

今天逆向的网站&#xff1a;aHR0cHM6Ly9mYW55aS55b3VkYW8uY29tL2luZGV4Lmh0bWwjLw &#xff08;去在线网站进行base64解密即可&#xff09; 1、点击翻译&#xff0c;触发请求 可以看到sign参数加密&#xff0c;加密长度为32为 md5加密特征&#xff1a; **长度固定。**无论输…

基于树莓派4B的YOLOv5-Lite目标检测的移植与部署(含训练教程)

前言&#xff1a;本文为手把手教学树莓派4B项目——YOLOv5-Lite目标检测&#xff0c;本次项目采用树莓派4B&#xff08;Cortex-A72&#xff09;作为核心 CPU 进行部署。该篇博客算是深度学习理论的初步实战&#xff0c;选择的网络模型为 YOLOv5 模型的变种 YOLOv5-Lite 模型。Y…

记一次解决vmware安装windows server 2019时, 虚拟机网络电缆被拔出,连不上网的问题

项目场景&#xff1a; 项目需要基于electron开发桌面端软件&#xff0c;实现前后端项目的自动化部署、可视化配置等功能&#xff0c;经过需求分析后&#xff0c;确定首先要适配的场景即&#xff1a;通过桌面端软件远程连接&#xff0c;在桌面端软件中执行安装、运行、配置等一…

用ZLmediaKit流媒体服务器时候遇到的常规问题

照zlmediakit的源码 自己复制了一份 然后有的地方编译不过修改了部分 测试的时候发现有两个问题 第一是 ffmpeg的ffplay 能播放 vlc不能播放 第二个问题是directProxy设置为0的时候 推流的时候 然后用ffplay播放 只有音频没有视频 查了好久终于解决这个问题 第一个…