【RocketMQ系列六】RocketMQ事务消息

news2025/1/18 4:43:04

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 事务消息的定义
    • 2.事务消息的实现流程
    • 3. 事务消息的实现示例
      • 3.1. 事务消息的消费者
      • 3.2. 本地事务的实现
      • 3.3. 事务消息的生产者
        • 运行结果:

1. 事务消息的定义

事务消息可以认为是一个两阶段的提交消息实现,以确保分布式事务的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子执行。

两阶段提交主要保证了分布式事务的原子性:即所有结点要么全做要么全不做,所谓的两个阶段是指:第一阶段:准备阶段;第二阶段:提交阶段。

事务消息有三种状态:

  1. TransactionStatus.CommitTransaction: 提交事务,表示允许消费者消费该消息。
  2. TransactionStatus.RollbackTransaction: 回滚事务,表示该消息将被删除,不允许消费。
  3. TransactionStatus.Unknow: 中间状态,表示需要MQ回查才能确定状态。

2.事务消息的实现流程

RocketMQ事务消息二阶段提交

  1. 生产者发送half消息,broker接收到half消息并回复half消息。
  2. 生产者调用 TransactionListener.executeTransaction() 方法执行本地事务。
  3. 生产者获得本地事务执行状态,提交给broker。如果状态是COMMIT_MESSAGE状态的话则broker会将消息推送给消费者。如果状态是ROLLBACK_MESSAGE状态的话则broker会丢弃此消息。如果状态是中间状态UNKNOW状态则broker会回查本地事务状态。
  4. 生产者调用 TransactionListener.checkLocalTransaction() 方法回查本地事务执行状态,并再次执行5,6,7三步骤,若回查次数超过15次则丢弃。

使用限制:

  1. 事务性消息没有调度和批处理支持。
  2. 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认单条消息的检查次数限制为15次,但用户可以通过更改 transactionCheckMax 来更改此限制,如果一条消息的检查次数超过 transactionCheckMax 次,broker默认会丢弃这条消息,同时打印错误日志。用户可以重写 AbstractTransactionCheckListener 类来改变这种行为。
  3. 事务消息将一定时间后检查,该时间由代理配置中的参数 transactionTimeout 确定。并且用户也可以在发送事务消息时通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,这个参数优先于 transactionMsgTimeout 参数。
  4. 一个事务性消息会被检查或消费不止一次。
  5. 事务性消息的生产者ID不能与其他类型消息的生产者ID共享,与其他类型的消息不同,事务性消息允许向后查询。MQ服务器通过其生产者ID查询客户端。
  6. 提交给用户目标主题的消息reput可能会失败,目前它取决于日志记录,高可用是由RocketMQ本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使用同步双写机制。

3. 事务消息的实现示例

3.1. 事务消息的消费者

事务消息的消费者与普通消息的消费者基本相同,也就是说事务消息是控制生产者端和broker端。

	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_transaction_consumer");
		consumer.setNamesrvAddr("172.31.184.89:9876");
		consumer.subscribe("TransactionTopic", "*");
		// 4.创建一个回调函数
		consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
			// 5.处理消息
			for (MessageExt msg : msgs) {
				System.out.println(msg);
				System.out.println("收到的消息内容:" + new String(msg.getBody()));
			}
			// 返回消费成功的对象
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		});
		// 6.启动消费者
		consumer.start();
		System.out.println("消费者已经启动");

3.2. 本地事务的实现

事务消息最关键的地方是生产者本地事务的实现,生产者本地事务实现 TransactionListener 接口,并实现该接口中的executeLocalTransaction方法和checkLocalTransaction方法。

其中,executeLocalTransaction 方法的作用是执行本地事务。它在生产者每次发送half消息的时候被调用,

  1. 如果调用此方法返回LocalTransactionState.COMMIT_MESSAGE状态,则此消息会被消费者消费到。
  2. 如果返回 LocalTransactionState.ROLLBACK_MESSAGE 状态,则此消息会被broker丢弃
  3. 如果返回 LocalTransactionState.UNKNOW 状态,即中间状态,则broker会调用checkLocalTransaction方法进行回查,最多回查15次。

checkLocalTransaction方法的作用是检查本地事务, 它是生产者发送完所有消息的时候调用,主要是针对的是中间状态的消息进行调用。

同样的如果调用此方法返回前面提到的三种状态,broker也会做出相同的处理。

public class TransactionListenerImpl implements TransactionListener {

	/**
	 * 执行本地事务
	 * 当事务half消息发送成功,这个方法将被执行
	 * 事务的half消息是发到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中
	 *
	 * @param msg 消息
	 * @param arg arg 自定义业务参数
	 * @return {@link LocalTransactionState}
	 */
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		String tags = msg.getTags();
		System.out.println("============执行executeLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
		if (StringUtils.contains(tags, "tagA")) {
			return LocalTransactionState.COMMIT_MESSAGE;
		} else if (StringUtils.contains(tags, "tagB")) {
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		return LocalTransactionState.UNKNOW;
	}

	/**
	 * 检查本地事务
	 * 回查本地事务状态,当half消息没响应时调用。
	 *
	 * @param msg 消息
	 * @return {@link LocalTransactionState}
	 */
	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		String tags = msg.getTags();
		System.out.println("============执行checkLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
		if (StringUtils.contains(tags, "tagC")) {
			return LocalTransactionState.COMMIT_MESSAGE;
		} else if (StringUtils.contains(tags, "tagD")) {
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		return LocalTransactionState.UNKNOW;
	}
}

3.3. 事务消息的生产者

事务消息的生产者与普通消息的生产者最核心的区别是事务消息的生产者需要事务监听器,并且是调用sendMessageInTransaction 方法发送 half 消息。

         //1.定义事务监听器
		TransactionListener transactionListener = new TransactionListenerImpl();
		//2.定义生产者
		TransactionMQProducer producer = new TransactionMQProducer("transaction_produce_group");
		producer.setNamesrvAddr("172.31.184.89:9876");
		//3.定义线程池
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 10, TimeUnit.SECONDS,
				new ArrayBlockingQueue<>(100), (runnable, executor) -> {
			BlockingQueue<Runnable> queue = executor.getQueue();
			try {
				queue.put(runnable);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		});
		//4.设置线程池
		producer.setExecutorService(threadPoolExecutor);
		//5.设置事务监听器
		producer.setTransactionListener(transactionListener);
		// 启动生产者
		producer.start();
		String[] tags = {"tagA", "tagB", "tagC", "tagD","tagE"};
		//发送10条half消息,消费者是收不到half消息的
		for (int i = 0; i < 10; i++) {
			Message message = new Message("TransactionTopic", tags[i % tags.length],
					"key" + i, ("飞哥测试事务消息" + tags[i % tags.length]+"_"+i).getBytes(StandardCharsets.UTF_8));
			TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
			System.out.println("本次发送的消息是=" + new String(message.getBody()));
			System.out.printf("%s%n", transactionSendResult);
			Thread.sleep(10);
		}
		System.out.println("==========所有消息发送完成======");
运行结果:

生产者:

image-20231005150155847

从运行结果可以看出中间状态的消息最多回查15次,就像图中的消息 执行checkLocalTransaction方法,;消息内容是=飞哥测试事务消息tagE_9 broker调用checkLocalTransaction 方法回查了15次。

image-20231005150245921

消费者:

我们可以看到最终消费者消费到的是消费的tags是tagA以及tagC的四条消息。

image-20231005150341147

那么,为啥生产者发送的half消息,消费者不会里面收到呢?这是因为half消息会被放到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中,直到本地事务返回 COMMIT_MESSAGE 状态时,消费者才能消费到此消息 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1104214.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

海洋CMS仿爱美剧影视电影视频网站模版源码/自适应手机端

海洋CMS仿爱美剧网站模板&#xff0c;自适应手机端&#xff0c;内含视频、资讯、留言模块。 下载地址&#xff1a;https://bbs.csdn.net/topics/617419787

wps/word 之 word中的两个表格 如何合并成为一个表格(已解决)

第一步&#xff1a;新建两个表格&#xff1a; 如何实现上面的两个表格合并呢&#xff1f; 分别选定每个表格&#xff0c;然后鼠标右键---》表格属性 在表格属性中的 表格---》选择 无文字环绕。 第二个表格按照同样的方法 设置 无文字环绕。 然后将中的文本行删去即可以了。选…

金融液冷数据中心,噱头还是趋势?

当前&#xff0c;全社会数字化进程加速&#xff0c;金融行业已全面进入数字化和智能化时代。与此同时&#xff0c;随着云计算、分布式架构、大数据分析、通用人工智能等技术的广泛运用&#xff0c;金融行业从数据大集中到分布式融合&#xff0c;金融企业的数据中心建设正围绕其…

什么是MIMO?

什么是MIMO&#xff1f;从SISO到MIMO - 华为 (huawei.com) MIMO&#xff08;Multiple-Input Multiple-Output&#xff09;是指在无线通信领域使用多天线发送和接收信号的技术。MIMO技术主要应用在Wi-Fi&#xff08;WiFi&#xff09;领域和移动通信领域&#xff0c;可以有效提高…

Unity之ShaderGraph如何实现光边溶解

前言 今天我们来实现一个最常见的随机溶剂效果。如下图所示&#xff1a; 光边溶解效果&#xff1a; 无光边效果 主要节点 Simple Noise&#xff1a;根据输入UV生成简单噪声或Value噪声。生成的噪声的大小由输入Scale控制。 Step&#xff1a;对于每个组件&#xff0c;如果输…

软件招标测试包含哪些测试?对软件项目起到什么作用?

在当前日益竞争激烈的软件市场中&#xff0c;一款优质的软件产品是企业获得竞争优势的关键。而软件招标测试正是评估软件质量的重要环节。   那么&#xff0c;什么是软件招标测试呢?简单来说&#xff0c;软件招标测试主要是验证软件产品的关键指标是否符合投标书要求。它通过…

多维时序 | MATLAB实现SSA-CNN-GRU-Attention多变量时间序列预测(SE注意力机制)

多维时序 | MATLAB实现SSA-CNN-GRU-Attention多变量时间序列预测&#xff08;SE注意力机制&#xff09; 目录 多维时序 | MATLAB实现SSA-CNN-GRU-Attention多变量时间序列预测&#xff08;SE注意力机制&#xff09;预测效果基本描述模型描述程序设计参考资料 预测效果 基本描述…

中国移动集采120万部,助推国产5G赶超iPhone15

近期媒体纷纷传出消息指中国移动将大规模集采&#xff0c;预计将采购国产5G手机120万台&#xff0c;加上另外两家运营商的集采数量&#xff0c;估计集采数量可能达到300万部&#xff0c;如此将有助于它在国内高端手机市场赶超苹果。 国产5G手机在8月底突然上市&#xff0c;获益…

libportaudio.so.2: cannot open shared object file: No such file or directory

ubuntu安装完pyaudio后报错 ImportError libportaudio.so.2: cannot open shared object file: No such file or directory解决方案 sudo apt-get install libportaudio2 libportaudiocpp0 portaudio19-dev

图片处理AIGC人工智能

以下是人工智能用文字生成的图片 文字&#xff1a;一个好看的亚洲美女站在桥上&#xff0c;周围是开满荷花的池塘 文字&#xff1a;一个好看的亚洲美女站在99道弯的天路上 文字&#xff1a;一个好看的亚洲美女骑在骆驼上 文字&#xff1a;一个好看的亚洲美女站在金字塔前 文字…

元宇宙虚拟展览馆,感受虚拟世界不一样的展览体验

引言&#xff1a; 随着科技的迅猛发展&#xff0c;元宇宙概念逐渐进入了大众的视野。不同于传统展览馆&#xff0c;元宇宙虚拟展览馆为人们提供了一个虚拟的展示空间&#xff0c;打破了时空的限制。 一、什么是元宇宙虚拟展览馆&#xff1f; 元宇宙虚拟展览馆是一种结合了虚拟…

[架构之路-240]:目标系统 - 纵向分层 - 应用层 - 应用层协议与业务应用程序的多样化,与大自然生物的丰富多彩,异曲同工

目录 前言&#xff1a; - 倒金子塔结构 - 大自然的组成 一、应用层在计算机系统中的位置 1.1 计算机应用程序的位置 1.1.1 业务应用程序概述 1.1.2 应用程序的分类 - 按照计算机作用范围 1.1.3 业务应用程序分类 - 按照行业分类 1.2 网络应用协议的位置 1.2.1 网络协…

你的助听器装置效果好吗?

作者&#xff1a;兰明 助听效果的好坏是一个多维的概念&#xff0c;简单的讲就是能使听障人士成功地应付生活的程度。影响助听装置效果的因素主要有三个方面&#xff1a;听障人士自身的因素、助听装置本身的因素以及专业服务的因素。其中病史超过半年的听障人士自身的因素&…

基于ssm的旅游管理系统

功能如下图所示 摘要 基于SSM框架的旅游管理系统代表了信息技术在旅行业中的崭新机遇&#xff0c;为旅行企业提供了强大的工具&#xff0c;以应对现代旅游市场的复杂挑战。这个系统的研发和实施具有广泛的研究意义&#xff0c;它深刻影响了旅游业的发展&#xff0c;具体表现如下…

深度学习——残差网络(ResNet)

深度学习——残差网络&#xff08;ResNet&#xff09; 文章目录 前言一、函数类二、残差块三、ResNet模型四、模型训练五、小结总结 前言 随着设计越来越深的网络&#xff0c;深刻理解“新添加的层如何提升神经网络的性能”变得至关重要。更重要的是设计网络的能力&#xff0c…

Compose Desktop 使用中的几个问题(分平台加载资源、编写Gradle 任务下载平台资源、桌面特有组件、鼠标键盘事件)

前言 在我之前的文章 Compose For Desktop 实践&#xff1a;使用 Compose-jb 做一个时间水印助手 中&#xff0c;我们使用 Compose For Desktop 写了一个用于读取照片 EXIF 中的拍摄日期参数并以文字水印的方式添加到照片上的桌面程序。 但是事实上&#xff0c;这个程序的名字…

C语言 输入输出

输出 printf 发送格式化输出到标准输出 stdout 调用格式 printf("<格式化字符串>",<参数表>); 头文件 stdio.h 声明 int printf(const char *format,...) 参数 format -- 是字符串&#xff0c;包含要被写入到标准输出 stdout 的文本。可以包含…

Pycharm的安装和使用

目录 环境安装 环境安装 下载并安装 打开网站&#xff1a;https://www.jetbrains.com/pycharm/download/#sectionwindows 下拉到最下面 开始安装 运行pycharm

人工智能时代大模型算法之文心大模型4.0

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

VsCode通过Git History插件查看某个页面的版本修改记录

首先需要安装插件Git History 方式一&#xff1a;通过 点击File History 查看某个文件变更&#xff1b;即通过commit的提交记录去查看某个文件的修改 方式二&#xff1a;通过点击选择toggle File Blame 查看当前页面每一行所有提交修改记录