RocketMQ事务消息实现分布式事务

news2025/1/16 7:45:03

文章目录

    • 简介
    • 实现原理
    • 实现逻辑

简介

RocketMQ事务消息
RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。

RocketMQ的事务消息实现方式主要包括以下几个步骤:
1 生产者发送half消息到Broker。half消息在消费者看来是不可见的,这样可以避免消费者消费到事务未提交的数据,类似于数据库的隔离级别读已提交级别,避免脏读。
2.生产者创建订单,根据创建订单成功与否,向Broker发送commit或rollback。
3.生产者还可以提供Broker回调接口,当Broker发现一段时间half消息没有收到任何操作命令,会主动调用此接口来查询订单是否创建成功。
4.一旦half消息commit了,消费者库存系统就会来消费,如果消费成功,则消息销毁,分布式事务成功结束。
5.如果消费失败,则根据重试策略进行重试,最后还失败则进入死信队列,等待进一步处理。

在这里插入图片描述
在这里插入图片描述

实现原理

RocketMQ的事务消息实现原理基于两阶段提交协议(Two-Phase Commit Protocol),具体流程如下:
1.发送准备消息:当一个事务消息需要发送时,生产者会发送一个准备消息,该消息包含了实际的业务数据和事务的标识符。
2.执行本地事务:接收到准备消息后,生产者会执行一个本地事务,如果本地事务执行成功,则返回COMMIT状态,否则返回ROLLBACK状态。
3.提交或回滚本地事务:当生产者返回COMMIT状态时,代表本地事务已经执行成功,此时,RocketMQ会将消息标记为可提交状态,并发送一个commit消息给消费者;当生产者返回ROLLBACK状态时,代表本地事务执行失败,此时,RocketMQ会将消息标记为可回滚状态,并发送一个rollback消息给消费者。
4. 消费者处理消息:消费者在接收到commit或rollback消息后,会根据消息的状态来执行相应的操作。
5.提交或回滚事务:如果所有的消费者都接收到了commit消息,则代表该事务消息已经提交,此时生产者会提交本地事务,否则,如果有任何一个消费者接收到了rollback消息,则代表该事务消息已经回滚,生产者会回滚本地事务。
事务消息的实现原理基于两阶段提交协议,这是一种经典的分布式事务协议,可以保证数据的一致性和可靠性。RocketMQ通过实现TransactionListener接口来支持事务消息,同时也提供了许多配置选项和工具类来方便用户进行使用和扩展。

实现逻辑

RocketMQ的事务消息通过TransactionListener接口来实现。下面是事务消息的基本实现步骤:
1.实现事务监听器:首先,你需要实现RocketMQ提供的TransactionListener接口,该接口包括两个方法:executeLocalTransaction和checkLocalTransaction。
○ executeLocalTransaction方法用于执行本地事务,当发送事务消息时,RocketMQ会调用此方法来执行本地事务。在该方法内部,你需要执行实际的业务逻辑,并根据执行结果返回事务状态,可以是提交、回滚或是未知状态。
○ checkLocalTransaction方法用于检查本地事务状态,当RocketMQ没有收到事务消息的确认或者取消时,会调用此方法来检查本地事务的状态,然后对消息进行处理。
2.发送事务消息:在发送事务消息时,你需要指定事务监听器,并在executeLocalTransaction方法中执行实际的业务逻辑,然后根据业务逻辑的执行结果返回事务状态。
3.事务状态检查:RocketMQ会定期调用checkLocalTransaction方法来检查本地事务的状态,然后对消息进行处理,例如提交或者回滚。
4.事务消息处理:在消息消费端,你需要根据消息的实际状态来执行相应的处理逻辑。
下面是一个简单的示例代码,演示了如何使用RocketMQ的事务消息:

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务监听器
        TransactionListener transactionListener = new MyTransactionListener();
        producer.setTransactionListener(transactionListener);

        // 启动生产者
        producer.start();

        try {
            // 创建事务消息
            Message msg = new Message("TopicTest", "TagA", "KEY1", "Hello, RocketMQ".getBytes());

            // 发送事务消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

class MyTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务,例如数据库操作等
        // 返回本地事务的状态,可以是COMMIT、ROLLBACK或UNKNOW
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态,返回本地事务的状态,例如COMMIT、ROLLBACK或UNKNOW
    }
}

上述代码展示了如何创建一个事务消息生产者,并实现一个简单的事务监听器。在实际应用中,你需要根据业务需求和具体场景来实现更复杂的事务逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1331520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MicroPython的交互式解释器模式 REPL

MicroPython的交互式解释器模式又名REPL(read-eval-print-loop),就是一种命令输入交互模式,跟Python的REPL是类似的,就是在命令行直接输入Python代码或表达式执行并打印结果。关于MicroPython的REPL跟通常的Python类似…

电子病历编辑器源码,提供电子病历在线制作、管理和使用的一体化电子病历解决方案

概述: 电子病历是指医务人员在医疗活动过程中,使用医疗机构信息系统生成的文字、符号、图表、图形、数据、影像等数字化信息,并能实现存储、管理、传输和重现的医疗记录,是病历的一种记录形式。 医院通过电子病历以电子化方式记录患者就诊的信息,包括&…

【常见的语法糖(详解)】

🟩 说几个常见的语法糖 🟢关于语法糖的典型解析🟢如何解语法糖?🟢糖块一、switch 支持 String 与枚举📙糖块二、泛型📝糖块三、自动装箱与拆箱🍁糖块四、方法变长参数🖥️…

Linux多线程:POSIX信号量,基于信号量的环形队列实现生产者消费者模型

目录 一、POSIX信号量1.1 初始化信号量1.2 销毁信号量1.3 等待信号量1.4 发布信号量1.5 基于环形队列的生产消费模型(用信号量控制生产者和消费者之间的同步互斥关系)1.5.1 makefile1.5.2 RingQueue.hpp1.5.3 Sem.hpp1.5.4 Task.hpp1.5.5 main.cc 二、信号量控制的环形队列原理…

.Net 访问电子邮箱-LumiSoft.Net,好用

序言: 网上找了很多关于.Net如何访问电子邮箱的方法,但是大多数都达不到想要的需求,只有一些 收发邮件。因此 花了很大功夫去看 LumiSoft.Net.dll 的源码,总算做出自己想要的结果了,果然学习诗人进步。 介绍&#xff…

Qt 开源项目

Qt 开源项目 Omniverse View链接技术介绍 QuickQanava链接技术介绍QField链接技术介绍 AtomicDEX链接技术介绍 Status-desktop链接技术介绍 Librum链接技术介绍 A Simple Cross-Platform ReaderQPrompt链接技术介绍 GCompris链接技术介绍 Scrite链接技术介绍 QSkinny链接技术介…

如何在PC上运行大模型

如何在PC上运行大模型 在PC上使用CPU运行大模型不如使用GPU高效,但仍然是可以实现的大模型推理。 大模型训练要求的资源更高,这里直接使用面向开源的Facebook’s LLaMA model(llama-2-7b-chat.Q2_K.gguf)。 连接CPU与LLaMA model的是llama.cpp。 为方便…

认识Linux背景

1.发展史 Linux从哪里来?它是怎么发展的?在这里简要介绍Linux的发展史 要说Linux,还得从UNIX说起 UNIX发展的历史 1968年,一些来自通用电器公司、贝尔实验室和麻省理工学院的研究人员开发了一个名叫Multics的特殊操作系统。Mu…

LLaMA开源大模型源码分析!

Datawhale干货 作者:宋志学,Datawhale成员 花了一晚上照着transformers仓库的LLaMA源码,把张量并行和梯度保存的代码删掉,只留下模型基础结构,梳理了一遍LLaMA的模型结构。 今年四月份的时候,我第一次接触…

第一次记录QPSK,BSPK,MPSK,QAM—MATLAB实现

最近有偶然的机会学习了一次QPSK防止以后忘记又得找资料,这里就详细的记录一下 基于 QPSK 的通信系统如图 1 所示,QPSK 调制是目前最常用的一种卫星数字和数 字集群信号调制方式,它具有较高的频谱利用率、较强的抗干扰性、在电路上实现也较为…

基于STM32单片机模拟智能电梯步进电机控制升降毕业设计3

STM32单片机模拟智能电梯步进电机控制数码管显示3 演示视频(复制到浏览器打开): 基于STM32单片机的智能电梯控制系统模拟智能电梯步进电机控制系统设计数码管显示楼层设计/DIY开发板套件3 产品功能描述: 本系统由STM32F103C8T6单…

技术交底二维码的应用

二维码技术交底可以逐级落实、责任到人、有据可查、是目前最方便、实用的交底方式,下面我们讲解技术交底二维码的应用。 1、生成对应的技术交底二维码,将施工方案、技术资料、安全教育资料等内容上传到二维码里。打印出来现场粘贴,便于作业班…

(一)深入理解Mysql底层数据结构和算法

什么是索引 索引是帮助MySQL高效获取数据的排好序的数据结构 数据结构有哪些 数据结构模拟网站:Data Structure Visualization 二叉树 不适合做自增ID的数据结构。如下示意图,假设采用二叉树作为表自增主键ID的数据存储结果如下:当查询i…

行列式:方程组未知数的计算:克拉默法则

行列式:方程组未知数的计算 ![ ](https://img-blog.csdnimg.cn/direct/4a9c2800da3746ea95c1a3c93057d796.png)

VS Code实现“Ctr+save”保存代码自动格式化

一、下载Prettier - Code formatter插件 点击安装即可 二、配置 【1】打开文件——首选项——设置 或者左下角齿轮打开设置 【2】搜索设置框输入editor default formatter(意思是默认格式化设置),接着下拉选中刚下好的插件名称Prettier - C…

【Vulnhub 靶场】【Corrosion: 1】【简单】【20210731】

1、环境介绍 靶场介绍:https://www.vulnhub.com/entry/corrosion-1,730/ 靶场下载:https://download.vulnhub.com/corrosion/Corrosion.ova 靶场难度:简单 发布日期:2021年07月31日 文件大小:7.8 GB 靶场作者&#xf…

Windows安装cnpm报错 The operation was rejected by your operating system.

Windows在安装cnpm时出现如下错误 npm ERR! The operation was rejected by your operating system. npm ERR! Its possible that the file was already in use (by a text editor or antivirus), npm ERR! or that you lack permissions to access it. npm ERR! npm ERR! If y…

[vue]Echart使用手册

[vue]Echart使用手册 使用环境Echart的使用Echart所有组件和图表类型Echart 使用方法 使用环境 之前是在JQuery阶段使用Echart,直接引入Echart的js文件即可,现在是在vue中使用,不仅仅时echarts包,还需要安装vue-echarts: "…

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…

C语言——小细节和小知识6

一、转义字符相关 \ 反斜杠,转义字符中的转义序列符 \? 将?转义,防止他被识别成三字母词(很早的东西)中的问号 //三字母词 //??(是[ //??)是] printf("%s","??(??)"); //打印结果是[] 二、fopen函数fc…