10.事务消息

news2025/1/16 19:55:36

4.6 事务消息

4.6.1 流程分析

image.png

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1)事务消息发送及提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • LocalTransactionState.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • LocalTransactionState.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • LocalTransactionState.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

4.6.1 发送事务消息

1) 创建事务消息生产者:TransactionMQProducer

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。

java public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException {        //创建事务监听器        TransactionListener transactionListener = new TransactionListenerImpl();        //创建消息生产者        TransactionMQProducer producer = new TransactionMQProducer("group6");        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");        //生产者设置监听器        producer.setTransactionListener(transactionListener);        //启动消息生产者        producer.start();        String[] tags = new String[]{"TagA", "TagB", "TagC"};        for (int i = 0; i < 3; i++) {            try {                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                SendResult sendResult = producer.sendMessageInTransaction(msg, null);                System.out.printf("%s%n", sendResult);                TimeUnit.SECONDS.sleep(1);           } catch (MQClientException | UnsupportedEncodingException e) {                e.printStackTrace();           }       }        //producer.shutdown();   } }

2)实现事务的监听接口:TransactionListener

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

java public class TransactionListenerImpl implements TransactionListener {    /**      * 在该方法中执行本地事务      */    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        System.out.println("执行本地事务");        if (StringUtils.equals("TagA", msg.getTags())) {            return LocalTransactionState.COMMIT_MESSAGE;       } else if (StringUtils.equals("TagB", msg.getTags())) {            return LocalTransactionState.ROLLBACK_MESSAGE;       } else {            return LocalTransactionState.UNKNOW;       } ​   }    /**      * 执行事务回查      */    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");        return LocalTransactionState.COMMIT_MESSAGE;   } }

4.6.2 使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECKIMMUNITYTIMEINSECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。需要做幂等。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

4.6.3 注意事项

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/678511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang -> Go 语言快速开发入门

Go 语言快速开发入门 开发一个 hello.go 程序Golang 执行流程分析两种执行流程的方式区别:编译和运行说明 Go 程序开发的注意事项注释行注释多行注释 开发一个 hello.go 程序 package mainimport "fmt"func main() {fmt.Print("hello") }输出: hello对上图…

Cesium 入门

文章目录 一、了解 Cesium二、创建第一个 Cesium 地球三、案例1. Cesium 查看器、场景、实体、数据源介绍2. Cesium 的坐标与转换3. Cesium 相机系统方法一&#xff1a;setView方法二&#xff1a;flyTo方法三&#xff1a;lookAt方法四&#xff1a; viewBoundingSphere 四、案例…

【Leetcode60天带刷】day27回溯算法——39. 组合总和,40.组合总和II,131.分割回文串

​ 题目&#xff1a; 39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返回这些组合。 candidates 中的 同一…

Redis 实战:逐步指南,让你轻松在 Linux 上安装与部署

目录 前言为什么会出现 Redis&#xff1f;磁盘、内存数据库缓存中间件 安装Redis5Redis6 使用总结 前言 Redis 中文网站&#xff1a;http://redis.cn/ Redis 是一个开源&#xff08;BSD 许可&#xff09;的&#xff0c;内存中的数据结构存储系统&#xff0c;它可以用作数据库…

【编译、链接、装载十二】动态链接2

【编译、链接、装载十二】动态链接2 四、延迟绑定&#xff08;PLT&#xff09;五、动态链接相关结构1 “.interp”段2 “.dynamic”段3 .动态符号表——dynsym、动态符号字符串表——.dynstr4、动态链接重定位表 六、动态链接的步骤和实现1、动态链接器自举2、装载共享对象3、重…

2.10 高性能异步IO机制:io_uring

一、io_uring的引入 为了方便说明io_uring的作用&#xff0c;先举一个通俗点的例子 1、通过异步提高读写的效率 假设有一批数量很大的货&#xff0c;需要分批次运到厂里处理。这个时候就有两种方式&#xff1a; 1&#xff09;同步方式&#xff1a;运送一批到厂里&#xff0c…

TypeScript ~ TS 掌握编译文件配置项 ④

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; TypeScript ~ TS &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &…

初识EasyX图形库

EasyX图形库 1. EasyX是什么&#xff1f;2. 入手EasyX3. EasyX函数介绍创建和关闭绘图窗口操作initgraphclosegraph 设置绘图背景setbkcolorcleardevice 画图形circlefillcirclerectanglefillrectangle 图形颜色及样式设置setfillcolorsetlinecolorsetbkcolorsetbkmodesetlines…

计算物理专题:有限差分法解决本征值问题

计算物理专题&#xff1a;有限差分法解决本征值问题 定态薛定谔方程差分形式 一维定态薛定谔方程 谐振子 解法代码 import numpy as np def householder(symmetric_matrix):M symmetric_matrixassert np.allclose(M,M.T),"matrix is not symmetric"N len(M)for …

chatgpt赋能python:用Python分析电影评分数据

用Python分析电影评分数据 Python是一种流行的数据分析和可视化工具&#xff0c;它可以让我们更深入地了解电影的评分数据。在本文中&#xff0c;我们将使用Python来分析一些电影评分数据&#xff0c;并试图找出一些有趣的模式和趋势。 数据来源 我们将使用公共数据集IMDb电…

第4章 网络层

1‌、下列关于路由算法描述错误的是&#xff08; &#xff09; A. 链路状态算法是一种全局路由算法&#xff0c;每个路由器需要维护全局状态信息B. OSPF 是一种域内路由协议&#xff0c;核心是基于 Dijkstra 最低费用路径算法C. RIP 是一种域内路由算法&#xff0c;核心是基…

采用SqlSugar的DBFirst相关功能创建数据库表对应的实体类

.NET Core官方教程中推荐使用的EF Core数据库ORM框架虽然能用&#xff0c;但是用起来并不是太方便&#xff08;或者是不习惯&#xff0c;之前用的最多的还是linq&#xff09;。之前下载的开源博客项目中使用的SqlSugar&#xff0c;后者是由果糖大数据科技团队维护和更新 &#…

基于WebAssembly构建Web端音视频通话引擎

Web技术在发展&#xff0c;音视频通话需求在演进&#xff0c;怎么去实现新的Web技术点在实际应用中的值&#xff0c;以及给我们带来更大的收益是需要我们去探索和实践的。LiveVideoStackCon 2022北京站邀请到田建华为我们从实践中来介绍WebAssembly、WebCodecs、WebTransport等…

【裸机开发】IRQ 中断服务函数(一) —— 汇编初始化

IRQ 和前面的Reset 函数不大一样&#xff0c;当一个IRQ中断产生时&#xff0c;我们也不知道这个IRQ中断来自哪个外设&#xff0c;因此&#xff0c;需要先获取到中断ID&#xff0c;随后才会跳转到真正的中断服务函数执行处理逻辑。 整个 IRQ 中断处理可以看做是包含了两个部分&…

MySQL 自增主键一定是连续的吗?

众所周知&#xff0c;自增主键可以让聚集索引尽量地保持递增顺序插入&#xff0c;避免了随机查询&#xff0c;从而提高了查询效率 但实际上&#xff0c;MySQL 的自增主键并不能保证一定是连续递增的。 下面举个例子来看下&#xff0c;如下所示创建一张表&#xff1a; 自增值保…

ORCA优化器浅析——GP数据库调用优化器流程

首先我们需要看CGPOptimizer类(src/include/gpopt/CGPOptimizer.h)为Greenplum数据库提供ORCA优化器export出来的函数的封装。Greenplum数据库主流程调用extern "C"中提供的函数&#xff0c;比如初始化ORCA优化器的函数InitGPOPT&#xff0c;优化查询树的函数GPOPTOp…

springboot+jsp农产品商城宣传网站设计与实现oo6e3

在该在线助农系统设计与实现中&#xff0c;idea能给用户提供更多的方便&#xff0c;其特点一是方便学习&#xff0c;方便快捷&#xff1b;二是有非常大的信息储存量&#xff0c;主要功能是用在对数据库中查询和编程。其功能有比较灵活的数据应用&#xff0c;只需利用小部分代码…

【Leetcode60天带刷】day30回溯算法——332.重新安排行程 , 51. N皇后 ,37. 解数独

​ 题目&#xff1a; 332. 重新安排行程 给你一份航线列表 tickets &#xff0c;其中 tickets[i] [fromi, toi] 表示飞机出发和降落的机场地点。请你对该行程进行重新规划排序。 所有这些机票都属于一个从 JFK&#xff08;肯尼迪国际机场&#xff09;出发的先生&#xff0c;…

【从零开始学习JAVA | 第十四篇】继承

目录 前言&#xff1a; 引入&#xff1a; 继承&#xff1a; 小拓展&#xff1a; 优点&#xff1a; 成员方法的继承问题&#xff1a; 总结&#xff1a; 前言&#xff1a; 继承是面向对象三大特性之一&#xff0c;它是在封装之后我们讲解的一个重要的性质&#xff0c;继承…

在github上创建个人主页的方法【2023更新版】

01-进入github的网站&#xff0c;链接 https://github.com/ &#xff0c;然后注册&#xff0c;登陆&#xff0c;注意登陆时设置的用户名(username)就是将来你个人主页的三级域名&#xff0c;所以这里一定要慎重填写username。如下图所示&#xff1a; 02-注册完成后进入个人主…