@Transactional声明式事务回调编程

news2024/11/25 11:32:17

文章目录

  • 1. 理论阐述
  • 2. 代码实现
    • 2.1. 问题代码
    • 2.2. 改进方案

本文参考:

事务回调编程

大事务问题

1. 理论阐述

最近在学习数据库事务的过程中,了解到了大事务的危害:

  1. 并发情况下,数据库连接资源容易耗尽
  2. 锁定数据较多,容易造成大量阻塞和锁超时,进而接口超时
  3. 执行时间长,容易造成主从延迟
  4. 回滚所需要的时间变长

那么大事务又是如何产生的呢?

  1. 单个事务操作数据库操作较多
  2. 事务中存在 RPC/MQ 等非 DB 耗时操作
  3. 大量的锁竞争

项目编程中我们经常会用到 Spring 的声明式事务 @Transactional 注解,我去反思了下项目中对事务的使用,还真的存在事务中嵌套 MQ 的用法,比方说本地数据库操作过程中穿插着 ES 写入消息(牺牲直连写入 ES 的时效性,中间加一层 MQ 可以提升容灾性),这就容易产生大事务,整体架构如下:
在这里插入图片描述

在分布式异常场景下这种模式也是有问题的:

比方说数据库操作执行报错,或者 MQ 消息超时,本地事务需要回滚,但是 MQ 消息已经发出去了,没法执行回滚操作,这就没法保证本地事务+MQ的原子性了。

在这里插入图片描述

想一下怎么尽可能避免发送MQ但又需要回滚的场景,其实就是把发MQ消息的时机往后放放,本地事务执行成功了,才发送 MQ 消息,这样子也避免大事务中嵌套 MQ,这在业务上也是可以接受的。

在这里插入图片描述

这种做法底层避免了数据库操作失败,MQ 需要回滚但是没法回滚的困境,但仍然有它的缺点,就是仍然没法保证 “数据库操作 + MQ” 的原子性,比方说下面,数据库事务提交了之后,App 重启或者宕机了,就不会发出 MQ 消息。
这其实涉及到了分布式事务的处理策略,我们当然可以用本地消息表或者其他分布式处理策略如TCC来解决这个问题。

所以这里谈论到的策略其实并不是一种分布式事务的处理方案,重点在于优化代码结构避免长事务,同时尽量保证“数据库操作 + MQ” 的原子性。

在这里插入图片描述

2. 代码实现

2.1. 问题代码

在@Transactional 声明式事务编程中,两个 insert 操作中穿插着发送MQ消息,典型的大事务问题。

@Transactional
public void doTransaction() {
    log.info("start tx");
    User user1 = new User();
    user1.setId(9);
    user1.setAge(2);
    user1.setName("jxz");
    user1.setEmail("111@qq.com");
    userMapper.insert(user1);
    log.info("insert user1...");

    log.info("调用其他 RPC 或者发送 MQ 消息");

    User user2 = new User();
    user2.setId(10);
    user2.setAge(3);
    user2.setName("jxz");
    user2.setEmail("111@qq.com");
    userMapper.insert(user2);
    log.info("insert user2...");
    log.info("end tx");
}

那正如前面所说的,我们可以在数据库本地事务提交以后,再去调用 RPC 或者 MQ。这个时候代码结构是需要调整的,如果只是单纯把 RPC 或者 MQ 从 @Transactional 注解声明的方法中抽取出来,后置调用,伪代码如下:

public void doRpcAfterTransaction() {
    // 原先 @Transactional 声明的数据库操作,事务失效
    doTransaction();
    log.info("调用其他 RPC 或者发送 MQ 消息");
}

@Transactional 注解也会失效,因为这属于方法内部调用 @Transactional 声明的方法,Spring 不是拿到的代理对象去调用。此外这种方式还增加了代码的复杂性,改动量太大。

2.2. 改进方案

那么是否存在一种代码改动量较小,能让人一眼看懂,最好在静态上还是内嵌在原来 @Transactional 声明式事务编程中;同时还能在当前事务执行完以后,能够及时回调 RPC/MQ 等第三方调用的。

仍然声明一下,这种方案是为了尽可能保证“本地事务+RPC/MQ”的原子性,并且代码结构简单,并不是分布式事务的解决方案

Spring 提供这样的 SPI 扩展,TransactionSynchronization 就提供事务执行完成以后回调的接口。
在这里插入图片描述

其中包括多个事务回调的拓展点:

在这里插入图片描述

其中 TransactionSynchronization#afterCompletion(int status) 就会根据事务执行结果(成功 commit 或者回滚 rollback),status 入参数代表事务执行状态,其实现就会执行事务后置处理。

这一切都建立在当前方法上下文存在活跃的事务,Spring 也提供了静态方法来让我们调用判断 TransactionSynchronizationManager#isActualTransactionActive()

最终写了个工具类实现代码如下:

package com.jxz.util;

import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2024/10/2
 */
public class TransactionUtils {
    /**
     * 事务后置处理 api,可以优化大事务提升数据库性能,尽量保证“本地事务 + RPC/MQ”的原子性
     *
     * @param runnable 事务后置处理任务
     */
    public static void doAfterTransaction(Runnable runnable) {
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization(new DoTransactionCompletion(runnable));
        }
    }

    /**
     * 实现 TransactionSynchronization 接口,重写其中的 afterCompletion 方法
     */
    public static class DoTransactionCompletion implements TransactionSynchronization {
        // 待执行的任务
        Runnable runnable;

        public DoTransactionCompletion(Runnable runnable) {
            this.runnable = runnable;
        }

        // 在事务 commit/rollback 以后回调
        @Override
        public void afterCompletion(int status) {
            // 当事务状态是 COMMITTED 时
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                runnable.run();
            }
        }
    }
}

在原先调用的地方修改也很简单:

内嵌在 @Transactional 声明式事务中,甚至连 RPC/MQ 调用的代码位置都不需要变动,内部实现的就是事务执行完成之后的后置回调。

@Transactional
public void doTransaction2() {
    log.info("start tx");
    User user1 = new User();
    user1.setId(13);
    user1.setAge(2);
    user1.setName("jxz");
    user1.setEmail("111@qq.com");
    userMapper.insert(user1);
    log.info("insert user1...");

    TransactionUtils.doAfterTransaction(() ->
            log.info("afterCommit, 调用其他 RPC 或者发送 MQ 消息"));

    User user2 = new User();
    user2.setId(14);
    user2.setAge(3);
    user2.setName("jxz");
    user2.setEmail("111@qq.com");
    userMapper.insert(user2);
    log.info("insert user2...");
    log.info("end tx");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2195274.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot企业级开发(SpringSecurity安全控制+pringBatch批处理+异步消息+系统集成SpringIntegration)

Spring Security 多个过滤器来实现所有安全的功能,只需要注册一个特殊的DelegatingFilterProxy过滤器到WebAppliationInitializer即可 实际使用中需要让自己的Initializer类继承AbstractSecurity WebApplicationInitializer抽象类即可。 AbstractSecurityWebAppli…

【瑞昱RTL8763E】刷屏

1 显示界面填充 用户创建的各个界面在 rtk_gui group 中。各界面中 icon[]表对界面进行描述,表中的每个元素代表一 个显示元素,可以是背景、小图标、字符等,UI_WidgetTypeDef 结构体含义如下: typedef struct _UI_WidgetTypeDef …

每年每门学科排名第一的学生 和每年总成绩都有所提升的学生

一张学生成绩表(student_scores),有year-学年,subject-课程,student-学生,score-分数这四个字段,请完成如下问题: 问题1:每年每门学科排名第一的学生 问题2:每年总成绩都有所提升的…

【STM32 HAL库】MPU6050 DMP库移植 与 自检失败的处理

【STM32 HAL库】MPU6050 DMP库移植 与 自检失败的处理 本文参考移植步骤文件配置代码修改inv_mpu.cinv_mpu.hinv_mpu_dmp_motion_driver.c 使用 自检失败怎么处理ret -1改正DEBUG过程 ret -9改正DEBUG过程 本文参考 B站 CSDN 移植步骤 文件配置 新建一个 dmp 文件夹 并将…

【斯坦福CS144】Lab1

一、实验目的 1.实现一个流重组器——一个将字节流的小块 (称为子串或段 )按正确顺序组装成连续的字节流的模块; 2.深入理解 TCP 协议的工作方式。 二、实验内容 编写一个名为"StreamReassembler"的数据结构,它负责…

【Nacos入门到实战十四】Nacos配置管理:集群部署与高可用策略

个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119qq.com] &#x1f4f1…

原神5.1前瞻网页HTML+CSS+JS

这篇文章主要是总结一下我在制作页面的时候用到的一些技术以及经验总结,博主也是第一次写网页,代码也是在不断地“进化”,哪里写的不好大家可以随意指出。 下面就是一些经验总结,如果想看具体效果我这里也不好展示,需要…

pytorch导入数据集

1、概念: Dataset:一种数据结构,存储数据及其标签 Dataloader:一种工具,可以将Dataset里的数据分批、打乱、批量加载并进行迭代等 (方便模型训练和验证) Dataset就像一个大书架,存…

QSerialPort 串口通信示例

之前使用过MFC写过串口通信的示例,今年学了Qt,特意使用Qt写了串口通信的示例,发现比MFC要容易一些, MFC串口示例如下: Qt示例如下: Qt这个做的很简单,主要还是想验证一下api, 核心…

今日指数day8实战补充(上)

1.用户管理 1.多条件综合查询 1.1 多条件综合查询接口说明 1)原型效果 2)接口说明 功能描述:多条件综合查询用户分页信息,条件包含:分页信息 用户创建日期范围 服务路径:/api/users 服务方法&#xff1…

Linux的Tomcat安装部署

1.下载jdk11 java11的官方URL 此时进入可能会有登录注册,挺简单的,注册登录就好 2.上传到Linux 3.解压 命令: tar -zxvf /root/linux.jdk/jdk-11.0.24_linux-x64_bin.tar.gz 4.移动解压文件夹到新建文件夹 新建文件夹: mkdir -p /export/server 移动命令: mv jdk-11.0…

联想服务器配置阵列、安装操作系统

文章目录 [toc]1.配置阵列2.制作启动盘3.安装系统 1.配置阵列 1.根据提示进入BIOS设置(F1) 2.系统设置 3.存储 4.第四步可以看到raid卡信息 5.Main Menu 6.Configuration Management 7.Create Virtual Drive 8.Select RAID Level raid5 9.Select Drives…

透明物体的投射和接收阴影

1、让透明度测试Shader投射阴影 (1)同样我们使用FallBack的形式投射阴影,但是需要注意的是,FallBack的内容为:Transparent / Cutout / VertexLit,该默认Shader中会把裁剪后的物体深度信息写入到 阴影映射纹…

降重秘籍:如何利用ChatGPT将重复率从45%降至10%以下?

AIPaperGPT,论文写作神器~ https://www.aipapergpt.com/ 重复率高达45%?很多人一查论文的重复率,瞬间想“完了,这次真的要重写了”。但其实不用这么绝望!有了ChatGPT,降重真的没那么难。今天就教你几招&a…

VGG16模型实现MNIST图像分类

MNIST图像数据集 MNIST(Modified National Institute of Standards and Technology)是一个经典的机器学习数据集,常用于训练和测试图像处理和机器学习算法,特别是在数字识别领域。该数据集包含了大约 7 万张手写数字图片&#xf…

wsl环境下安装MySQL5.7

安装操作需root权限: 1-通过 sudo su - ,切换到root用户。 2-在每一个命令前加上sudo,临时提升权限 1、下载apt仓库文件 wget https://dev.mysql.com/get/mysql-apt-config_0.8.12-1_all.deb 安装包是.deb的文件2、配置仓库,使…

MyBatis 批量插入方案

MyBatis 批量插入 MyBatis 插入数据的方法有几种: for 循环,每次都重新连接一次数据库,每次只插入一条数据。 在编写 sql 时用 for each 标签,建立一次数据库连接。 使用 MyBatis 的 batchInsert 方法。 下面是方法 1 和 2 的…

Linux防火墙-案例(一)filter表

作者介绍:简历上没有一个精通的运维工程师。希望大家多多关注作者,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 我们经过上小章节讲了Linux的部分进阶命令,我们接下来一章节来讲讲Linux防火墙。由于目前以云服务器为主&#x…

51单片机的水位检测系统【proteus仿真+程序+报告+原理图+演示视频】

1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块水位传感器继电器LED、按键和蜂鸣器等模块构成。适用于水位监测、水位控制、水位检测相似项目。 可实现功能: 1、LCD1602实时显示水位高度 2、水位传感器采集水位高度 3、按键可设置水位的下限 4、按键可手动加…

动手学大模型应用开发之大模型简介

动手学大模型应用开发之大模型简介 主要学习目标什么是大语言模型大模型的能力和特点涌现能力作为基座模型支持多元应用的能力支持对话作为统一入口的能力大模型特点 常见大模型ChatGpt通义千问 LangChainLangChain的核心模块 总结相关学习链接 主要学习目标 学习如何进行大模…