SpringBoot中使用RocketMQ实现事务消息来保证分布式事务的一致性(有代码)

news2025/1/23 11:52:44

前言

分布式事务是分布式系统中非常常见的问题。是非常必要钱常见的。实现的方式也是多种多样。今天这个视频主要来分享一下RocketMQ实现事务消息来保证分布式事务的一致性。不知道大家使用过这种方式没有。这种分布式事务的原理其实和本地消息表一样。

本地消息表实现分布式事务的基本原理

本地消息表实现分布式事务的基本原理是通过两个阶段的事务处理来保证分布式环境中的数据一致性。以下是其基本步骤:
大致就是将本地消息表和要执行的第一个业务逻辑放在一个事务中,这样就可以一起成功一起失败。当第一阶段成功后。根据本地消息表中的记录去让下游的业务执行成功。扫描本地消息表中的消息然后执行下游业务。执行成功后在删除本地消息表中消息。不成功则重试。

1.本地事务:

在开始分布式事务时,首先执行本地操作。例如,更新某个服务的数据。
如果本地操作成功,事务进入下一步;如果失败,则回滚本地事务,并结束流程。
消息记录:
创建一条消息记录,通常称为“本地消息”,将需要在后续阶段执行的远程操作信息保存在本地数据库的一个消息表中。这个消息记录包含了执行远程操作所需的所有数据。
消息发送:

将本地消息发送到消息队列,如RocketMQ或其他消息中间件。此时,消息队列并不保证消息已经被消费,只是简单地将消息放入队列。
消息消费:

消息队列的消费者监听并处理消息。消费者通常是另一个服务,它接收消息并执行相应的远程操作,比如更新另一个服务的数据。
确认与补偿:

如果远程操作成功,消费者会发送一个确认信号(ACK),通知生产者操作已完成。这时,生产者可以删除本地消息表中的记录。
如果远程操作失败,消费者可能会尝试重新消费消息,或者根据策略回滚本地事务,然后通知生产者消息处理失败。
最终一致性:

尽管可能有短暂的延迟,但最终所有服务的数据状态会达到一致,因为本地操作和远程操作都会成功完成,或者在失败时都会回滚。
异常处理:

为了处理异常情况,系统通常会有超时和重试机制。如果消费者长时间没有确认,生产者可能会重新发送消息,或者在一定时间后回滚本地事务。
本地消息表方案的优点在于它避免了分布式事务的复杂性,实现了最终一致性,而不是强一致性。但是,它也有一些缺点,比如增加了系统的复杂性,需要维护额外的消息表,以及可能出现消息丢失或重复消费的问题。因此,它更适合对实时性要求不高,但对最终一致性有要求的场景。

本地消息表是一种最终一致性方案。并不是强一致性方案。

rocketmq事务消息

今天重点来说一下rocketmq事务消息是怎么做的。先理解一下Rocketmq事务消息
在这里插入图片描述

这种类似的图片挺多的。简单的来看一下 然后一会结合代码看一下。生产者先送消息到MQserve。然后mq去执行本地事务。通过回查的方式来保证第一阶段消息执行的成功。然后下游消费者来消费这个消息。

代码

我们需要实现分布式事务的两个服务分别是用户中心的服务以及im业务服务。功能是注册的功能。用户的注册信息基本信息存储在用户中心表。然后其他信息存储在im_user表里面。这个听起来有点奇怪。因为我这套代码是计划用户中心存储多个app的用户信息。通义提供鉴权服务什么的。然后基本信息存储在自己的业务用户表里面。大概是这样的设计思路。可以看代码。

	/**
	 * 使用rocketmq实现事务
	 * @param dto
	 * @return
	 * @throws Exception
	 */
	@ApiOperation("使用邮箱和密码注册")
	@PostMapping("/sys/registByWeb")
	public GenericResponse registByWebTX(@RequestBody SysRegisterForm dto) throws Exception {
		String uuid = UUID.randomUUID().toString() + new Random().nextInt();
		SysUserEntity sysUserEntity = new SysUserEntity();
		sysUserEntity.setPassword(dto.getPassword());
		sysUserEntity.setUsername(dto.getUsername());
		sysUserEntity.setOpenid(uuid);
		//注册需要的实体类
		RegisterFeign registerFeign = new RegisterFeign();
		registerFeign.setOpenid(uuid);
		registerFeign.setUsername(dto.getUsername());
		registerFeign.setEmail(dto.getEmail());
		TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
				MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
		String sendStatus = sendResult.getSendStatus().name();
		String localTXState = sendResult.getLocalTransactionState().name();
		logger.info("sendStatus---" + sendStatus);
		logger.info("localTXState---"+localTXState);

		// 注意:这里不能立即返回成功,因为事务还未完成,实际应用中可能需要设计异步回调通知客户端事务结果
		// 以下仅为示例逻辑,实际应用中需根据业务需求调整
		return GenericResponse.response(ServiceError.NORMAL);
	}

这里实现注册功能。然后
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
这行代码用来发送事务消息;
需要给rocketmq配置一个生产者端的消息监听器

@Slf4j
@RocketMQTransactionListener
public class UserRegistrationTransactionListener implements RocketMQLocalTransactionListener {


    @Autowired
    private SysUserService sysUserService;


    @Autowired
    SysUserDao sysUserDao;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
            SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
            sysUserService.saveUser(sysUserEntity);
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
//        return  RocketMQLocalTransactionState.UNKNOWN;
        return result;
    }


    /**
     * 步骤四
     * 描述:mq回调检查本地事务执行情况
     * @param msg
     * @return
     */

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
        // 检查本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
//            List<OrderEntity> list = orderService.selectOrder(order);
            List<Map> list = sysUserDao.queryUserByOpenid(sysUserEntity.getOpenid(),sysUserEntity.getUsername());
            if(list.size()<=0){
                result = RocketMQLocalTransactionState.UNKNOWN;
            }

        } catch (Exception e) {
            // 异常就回滚
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }


}

@RocketMQTransactionListener注意这个注解不能落下。
然后可以配置一下下游消费者。

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumeRegister", topic = "TX_REGISTER_ADD",consumeMode = ConsumeMode.ORDERLY)
public class RegisterListener implements RocketMQListener<RegisterFeign> {

    @Autowired
    private WeChatService weChatService;


    /**
     *
     * @param dto
     */
    @Override
    public void onMessage(RegisterFeign dto) {
        log.info("接收到消息,开始消费..dto" + dto);
        weChatService.registByOpenid(dto);

    }

}

我们在这个地方来接受一下消息。然后调用这个服务的保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1658228.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电脑中的两个固态硬盘比一个好,想知道为什么吗

你当前的电脑很有可能有一个NVME SSD作为主驱动器&#xff0c;但可能至少还有一个插槽可以放另一个SSD&#xff0c;而且这样做可能是个好主意。 两个SSD可以提高性能 如果你有两个固态硬盘&#xff0c;你可以从中获得比有一个更好的性能。一种方法是使用RAID 0将两个驱动器组…

HR招聘面试,如何测评候选人的执行力和岗位胜任力

执行力是人才测评中的重要组成&#xff0c;尤其是对于小微企业那就更加重要了&#xff0c;几乎每个岗位都需要员工有独挡一面的能力&#xff0c;没有执行力的员工是无法在中小企业生存的&#xff0c;那么对于大型企业来说&#xff0c;是不是执行力不重要&#xff1f;非也&#…

报错(已解决):无法加载文件 D:\code\NodeJs\pnpm.ps1,因为在此系统上禁止运行脚本。

问题&#xff1a; 在vscode运行uniapp项目需要拉取全部依赖&#xff0c;需要使用到pnpm&#xff0c;在vscode终端运行命令&#xff1a;pnpm install后报错&#xff1a; 解决办法&#xff1a; 1&#xff1a;我未安装pnpm&#xff0c;首先打开电脑cmd&#xff0c;运行下列命令&a…

Selenium 自动化 —— 常用的定位器(Locator)

什么是定位器 定位器&#xff08;Locator&#xff09;是识别DOM中一个或多个特定元素的方法。 也可以叫选择器 Selenium 通过By类&#xff0c;提供了常见的定位器。具体语法如下&#xff1a; By.xxx("");我们选择单个元素时可以使用findByElement&#xff1a; Web…

JUC下的ForkJoinPool详解

详细介绍 ForkJoinPool 是 Java 并发包 (java.util.concurrent) 中的一个特殊线程池&#xff0c;专为分治算法设计&#xff0c;能够高效地处理大量可分解的并行任务。它基于工作窃取&#xff08;work-stealing&#xff09;算法&#xff0c;当一个工作线程的任务队列为空时&…

13 华三三层链路聚和

13 华三三层链路聚和 AI 解析 华三三层静态路由是指在华三交换机上配置的一种路由方式。它通过在交换机上手动配置路由表&#xff0c;将不同网络之间的数据进行转发。 华三三层静态路由的配置步骤如下&#xff1a; 1. 配置交换机接口的IP地址&#xff1a;在交换机上选择要配…

拦截器添加以及注册

自定义拦截器 自定义一个类 实现 HandlerInterceptor 接口 并重写里面的方法 preHandle、postHandle、afterCompletion preHandle&#xff1a;在执行具体的Controller方法之前调用 postHandle&#xff1a;controller执行完毕之后被调用 afterCompletion&#xff1a;方法需要…

NOIP,CSP-J,CSP-S——函数

一、函数概念 /*函数返回类型 函数名&#xff08;参数&#xff09;{语句 } */ int add&#xff08;int x&#xff0c;int y&#xff09;{return xy; } 调用这个函数add int main(){int x,y,z;scanf("%d%d",&x,&y);zadd(x,y);printf("%d",z); } …

我从这些书籍中学来的财务以及税务知识

“你不能指望在开始工作的头两年攒下任何积蓄。” 这句话一直是我的座右铭&#xff0c;也是我给大学生的个人理财建议。这也就难怪我二十出头的时候&#xff0c;基本就是靠薪水过日子。 回想起来&#xff0c;我意识到其实这并不是最好的建议&#xff0c;甚至非常不好。 我现…

纹理映射技术在AI去衣应用中的关键作用

引言&#xff1a; 随着人工智能技术的飞速发展&#xff0c;其在图像处理领域中的应用也日益广泛。AI去衣&#xff0c;作为一种颇具争议的技术应用&#xff0c;指的是利用深度学习算法自动移除或替换图片中的衣物。在这一过程中&#xff0c;纹理映射技术扮演了不可或缺的角色。本…

Anaconda安装和深度学习环境的安装(TensorFlow、Pytorch)

换了新电脑&#xff0c;重新装一下anaconda这些编程环境。好久没装过了&#xff0c;自己也需要查查资料&#xff0c;然后记录一下&#xff0c;分享给别人。 目标&#xff0c;三个环境&#xff1a;1.anaconda基础环境&#xff08;包含xgboost和lightgbm&#xff09;&#xff0c…

地图位置的二维码怎么做?在线制作地图二维码的方法

怎么定位一个位置做成二维码呢&#xff1f;随着互联网的不断发展&#xff0c;现在通过扫描二维码来获取导航位置的方式有很多的场景都在应用。这种方式的好处在于其他人都可以通过这个二维码来获取位置&#xff0c;有利于分享。 导航地图二维码可以在电脑的二维码生成器上快速…

springboot3项目练习详细步骤(第一部分:用户业务模块)

目录 环境准备 用户模块 注册 注册接口文档 ​编辑 实现结构 Spring Validation 登录 登录的接口文档 实现登录逻辑 JWT令牌 完善登录认证 拦截器 获取用户详细信息 接口文档 Usercontroller类中编写方法接口 忽略属性返回 优化代码ThreadLocal 更新用户基本信…

win11 安装oracle11g详细流程及问题总结

1.安装包下载地址 本案例操作系统&#xff0c; Oracle 11g下载-Oracle 11g 64位/32位下载官方版(附详细的安装图解教程) - 多多软件站多多为大家免费提供Oracle 11g下载&#xff0c;包含64位/32位官方版本&#xff0c;并附详细的Oracle 11g安装图解教程&#xff0c;同时希望能…

全网最详细的Python自动化测试(unittest框架)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

【AIGC】深入探索AIGC技术在文本生成与音频生成领域的应用

&#x1f680;文章标题 &#x1f680;AIGC之文本生成&#x1f680;应用型文本生成&#x1f680;创作型文本生成&#x1f680;文本辅助生成&#x1f680;重点关注场景 &#x1f680;音频及文字—音频生成&#x1f680;TTS(Text-to-speech)场景&#x1f680;乐曲/歌曲生成&#x…

鸿蒙开发-ArkTS语言-容器-非线性容器

鸿蒙开发-UI-web 鸿蒙开发-UI-web-页面 鸿蒙开发-ArkTS语言-基础类库 鸿蒙开发-ArkTS语言-并发 鸿蒙开发-ArkTS语言-并发-案例 鸿蒙开发-ArkTS语言-容器 文章目录 前言 一、非线性容器 1.HashMap 2.HashSet 3.TreeMap 4.TreeSet 5.LightWeightMap 6.LightWeightSet 7.P…

【qt】QString字符串

前言&#xff1a; 这节很轻松&#xff0c;大家可以放心食用 ♪(&#xff65;ω&#xff65;)&#xff89; QString目录 一.与cString的区别二.隐式共享三.初始化四.判断是否为空串五.字符串的长度六.添加字符串1.尾加2.任意位置加 七.替换字符串八.修改字符串九.删除字符串1.清…

《吸血鬼崛起》大剑技能是什么 大剑武器连招教学

V Rising《吸血鬼崛起》是一款热门游戏&#xff0c;在STEAM刚刚推出了正式版&#xff0c;而在游戏中如何利用武器连招输出高是新手玩家常常困扰的问题。如果你还不太清楚&#xff0c;那么一起来看看V Rising中的武器连招推荐吧。 在V Rising中&#xff0c;你可以在数字栏里装备…

物联网实战--平台篇之(五)账户界面

目录 一、界面框架 二、首页(未登录) 三、验证码登录 四、密码登录 五、帐号注册 六、忘记密码 本项目的交流QQ群:701889554 物联网实战--入门篇https://blog.csdn.net/ypp240124016/category_12609773.html 物联网实战--驱动篇https://blog.csdn.net/ypp240124016/cat…