重学SpringBoot3-集成RocketMQ(二)

news2025/1/23 3:14:18

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(二)

  • 1. 基础概念
  • 2. 准备工作
  • 3. 实现事务消息的生产者
  • 4. 事务监听器实现
  • 5. 消费者示例
  • 6. 发送事务消息
  • 7. 测试
    • 7.1 模拟本地事务正常提交
    • 7.2 模拟本地事务提交失败,未回查
    • 7.3 模拟本地事务提交失败,回查成功
    • 7.4 模拟本地事务提交失败,回查失败
    • 7.5 模拟本地事务提交成功,消费失败
  • 关键点总结

今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。

1. 基础概念

在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。

图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html

半消息的工作流程:

  1. 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
  2. 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
  3. 提交或回滚消息
    • 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
    • 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
  4. 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。

现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。

2. 准备工作

请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:

  consumer2:
    group: springboot-consumer-group2  # 新的消费者组名称
    topic: transaction-topic  # 订阅新的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

3. 实现事务消息的生产者

创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionalMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, String message) {
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
        System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());
    }
}

4. 事务监听器实现

通过实现 RocketMQLocalTransactionListener 接口,定义事务的提交或回滚逻辑。

package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态
        try {
            // 模拟本地事务处理逻辑
            System.out.println("Executing local transaction...");
            boolean success = performLocalTransaction();
            if (success) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查逻辑,确认本地事务的最终状态
        System.out.println("Checking local transaction...");

        // 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE
        System.out.println("local transaction check success");
        return RocketMQLocalTransactionState.COMMIT;
    }

    private boolean performLocalTransaction() {
        // TODO 模拟本地事务处理文件上传OSS
        try {
            System.out.println("Upload files to OSS...");
            Thread.sleep(3000);
            System.out.println("File upload to OSS completed");
            return true;
        } catch (InterruptedException e) {
            System.out.println("Failed to upload file to OSS");
            return false;
        }
    }

}

5. 消费者示例

创建一个消费者,订阅并消费事务消息。

RocketMQListener<String> 是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.printf("Received message: %s%n", message);
    }
}

6. 发送事务消息

在服务中调用事务消息生产者:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendTransactionMessage")
    public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {
        rocketMQProducer.sendTransactionMessage("transaction-topic", message);
        return ResponseEntity.ok("Transaction message sent: " + message);
    }
}

7. 测试

7.1 模拟本地事务正常提交

如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。

本地事务正常提交

7.2 模拟本地事务提交失败,未回查

在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000 表示 RocketMQ 最长等待 10 秒后进行事务状态回查。

本地事务提交失败,未回查

7.3 模拟本地事务提交失败,回查成功

本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。

transactionCheckMax

事务状态回查成功

7.4 模拟本地事务提交失败,回查失败

修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。

回查失败

7.5 模拟本地事务提交成功,消费失败

例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试,直至成功。

修改一下消费者处理逻辑:

package com.example.boot308rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {


    @Override
    public void onMessage(MessageExt message) {
        try {
            // 处理消息的业务逻辑
            String msgBody = new String(message.getBody(), "UTF-8");
            log.info("Received message:{}", msgBody);

            // 模拟业务处理
            boolean success = processBusinessLogic(msgBody);

            if (!success) {
                throw new RuntimeException("Business processing failure");
            }

            log.info("Business processing successful");

        } catch (Exception e) {
            log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());

            // 重新抛出异常,让 RocketMQ 进行重试
            throw new RuntimeException("Message consumption failed, retrying");
        }
    }

    private boolean processBusinessLogic(String message) {
        // 这里是业务逻辑,返回 true 表示成功,false 表示失败
        // 例如:数据库操作或其他远程调用
        return Math.random() > 0.5; // 模拟随机成功或失败
    }
}

消费失败

关键点总结

  1. 事务消息发送:通过 RocketMQTemplate.sendMessageInTransaction() 方法发送事务消息。
  2. 本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
  3. 事务回查:在 checkLocalTransaction() 方法中定义如何检查事务消息的最终状态。
    以上就是 SpringBoot3 结合 RocketMQ 实现的事务消息,提供了分布式事务中的最终一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2134849.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

rust学习——关联类型

什么是关联类型 关联类型是Rust中一种特殊的泛型抽象机制。在trait中&#xff0c;可以定义一个或多个关联类型&#xff0c;这些关联类型与trait的实现类型相关联。关联类型允许我们在trait中使用泛型&#xff0c;但不需要提前指定具体的类型。 不使用关联类型存在的问题 tra…

AI生成头像表情包,一次十分钟,就能实现月入过万的玩法,无脑操作

今天给大家带来的项目是AI生成表情包和头像&#xff0c;这个项目对于我们做ip来说是真心不错&#xff0c;就比如我这个头像。 为什么说每天只需要10分钟呢&#xff0c;那么我们继续往下看。 "项目介绍 这个项目的核心其实就是使用AI生成表情包或者说生成头像&#xff0c…

华为HCIA、HCIP和HCIE认证考试明细

华为认证体系包括三个主要等级&#xff1a;HCIA&#xff08;华为认证ICT助理&#xff09;、HCIP&#xff08;华为认证ICT高级工程师&#xff09;和HCIE&#xff08;华为认证ICT专家&#xff09;。每个等级的认证都有其特定的考试内容和费用。 HCIA&#xff08;华为认证ICT助理…

天翼云2024年最新版本认证必过资料(应知+从业者+解决方案架构师+高级解决方案架构师)

本资料为2024年认证最新材料&#xff0c;笔者因为工作需要考几个认证。天冀云全套认证包含如下图所示&#xff0c;本材料包含下图中红框内的 4个认证&#xff08;应知从业者解决方案架构师高级解决方案架构师&#xff09;。 笔者&#xff0c;亲测必过。见文章下面第二张考试记录…

模型加载pytorch版本不匹配的解决思路

模型部署总是会遇到pytorch版本推理与训练不匹配的问题&#xff0c;一般报错&#xff1a; AttributeError: Cant get attribute _rebuild_parameter_v2 on <module torch._utils from /usr/local/python3.9.0/lib/python3.9/site-packages/torch/_utils.py>提示pytorch …

医疗机构关于DIP/DRG信息化建设

推进DIP/DRG支付方式改革是一项系统性工程&#xff0c;牵一发而动全身。作为河北省DIP试点医院&#xff0c;河北医科大学第二医院将信息化与创新性管理理念融合&#xff0c;用好支付工具做好精细化管理&#xff0c;积极应对改革。 ■ 改革背景 国家医疗保障局制定的《DRG/DIP支…

18060 删除空格

**思路**: 1. 使用两个指针&#xff0c;一个指向当前字符位置&#xff0c;另一个指向下一个非空格字符应该放置的位置。 2. 遍历字符串&#xff0c;如果当前字符不是空格&#xff0c;则将其移动到目标位置指针处&#xff0c;并递增目标位置指针。 3. 最后在目标位置指针处添加字…

scratch作品练习-- 猫猫接月饼

作品展示 猫猫接月饼 作品设计 1&#xff09;内容&#xff1a; 嫦娥在最上方抛落月饼&#xff0c;猫猫要移动接住从上方掉落的月饼&#xff0c;接到月饼得分&#xff0c;没接住月饼落地消失。 2&#xff09;角色&#xff1a; 猫猫、月饼、嫦娥 3&#xff09;背景&#xff…

这么好用的桌面插件 怎么能不分享给你!

这么好用的桌面插件 怎么能不分享给你&#xff01;大家可能需要在桌面上创建一些功能&#xff0c;比如时间&#xff0c;日期&#xff0c;时钟&#xff0c;天气等&#xff0c;这时候就需要桌面插件才能实现&#xff0c;小编找过很多桌面插件或者桌面组件&#xff0c;发现了一个宝…

从0开始学习 RocketMQ:分布式事务消息的实现

消息队列中的事务&#xff0c;主要是解决消息生产者和消息消费者数据一致性的问题。 应用场景 比如订单系统创建订单后&#xff0c;会发消息给购物车系统&#xff0c;将已下单的商品从购物车中删除。 由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤&a…

Redhat 7,8系(复刻系列) 一键部署Oracle21-xe rpm

Oracle21c-xe前言 无论您是开发人员、DBA、数据科学家、教育工作者,还是仅仅对数据库感兴趣,Oracle Database Express Edition (XE) 都是理想的入门方式。它是全球企业可依赖的强大的 Oracle Database,提供简单的下载、易于使用和功能齐全的体验。您可以在任何环境中使用该…

orangepi部署web环境

orangepi web环境搭建 mysql安装mysql卸载 FTP安装java安装tomcat安装Maven配置 mysql安装 查看MySQL安装包 接下来可以使用以下命令安装MySQL服务器&#xff1a; 安装MySQL 8.0 # 安装最新版本 sudo apt install -y mysql-server # 安装指定版本 sudo apt install -y mysql…

Mystic 会是 Midjourney 的终结者吗?

作者:老余捞鱼 原创不易,转载请标明出处及原作者。 写在前面的话: 这两年来 Midjourney 一直是互联网上最好的人工智能图像生成器。它制作了一些我们所见过的最流行和最具争议的 AI 图像,Midjourney 无与伦比的快速连贯性和照片级真实感使其领先于 OpenAI、谷歌和亚…

喜报!普罗格入选软件百强企业榜单

金秋九月&#xff0c;一个象征着成熟与收获的季节&#xff0c;普罗格迎来了发展历程中的又一里程碑。近期&#xff0c;普罗格以稳健的发展和创新的技术入选武汉软件百强企业榜单。在面对2024年复杂经营环境挑战的背景下&#xff0c;这一荣誉更显珍贵&#xff0c;彰显了武汉企业…

MySQL迁移达梦报错,DMException: 第1 行附近出现错误: 无效的表或视图名[ACT_GE_PROPERTY]

达梦数据库选好模式和登录用户&#xff0c;迁移时的目标模式名要和达梦的当前登录的用户名相同&#xff0c;否则查询的时候需要“form 模式名.表名”&#xff0c;只from表名就会报表不存在的错误。

单机快速部署开源、免费的分布式任务调度系统——Apache DolphinScheduler

本文主要为大家介绍Apache DolphinScheduler的单机部署方式&#xff0c;方便大家快速体验。 环境准备 需要Java环境&#xff0c;这是一个老生常谈的问题&#xff0c;关于Java环境的安装与配置期望大家都可以熟练掌握。 验证java环境 java -version 下载安装包并解压 使用wg…

深度挖掘| 如何高效实现Cloudera 安装之基础环境搭建

Cloudera Manager是CDH市场领先的管理平台。它以其强大的数据管理和分析能力&#xff0c;帮助企业能够轻松驾驭海量数据&#xff0c;实现数据的实时分析与洞察。 作为业界第一的端到端 Apache Hadoop 的管理应用&#xff0c;Cloudera Manager对CDH的每个部件都提供了细粒度的可…

ROS2 std_msg 报错!

编译ros2时候报错如下&#xff1a; CMake Error at CMakeLists.txt:11 (find_package): By not providing "Findstd_msg.cmake" in CMAKE_MODULE_PATH this project has asked CMake to find a package configuration file provided by "…

80V转5V4A同步降压WT6037

80V转5V4A同步降压WT6037 WT6037 被定义为一款高压同步降压转换器&#xff0c;其设计可在 10V 至 90V 的宽泛工作电压区间内稳定运行。该转换器尤其适用于需承受宽电压输入范围的电池组系统&#xff0c;诸如 12V 至 72V 的电池组&#xff0c;以及 60V 至 90V 的降压应用场景。…

Python编码系列—Python建造者模式:构建复杂对象的优雅之道

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…