(十)springboot实战——springboot3下的webflux项目mysql数据库事务处理

news2024/12/26 23:46:27

前言

WebFlux 是 Spring Framework 5.0 中引入的一种新型反应式编程模型,支持非阻塞 I/O,适用于高并发、高吞吐量的应用程序。在 WebFlux 应用程序中使用事务需要注意以下几点。使用 Reactive R2DBC:WebFlux 支持使用 Reactive R2DBC 访问关系型数据库。R2DBC 是一个反应式的数据库连接规范,它允许开发人员以响应式方式访问关系型数据库。在 WebFlux 应用程序中使用 Reactive R2DBC 可以实现非阻塞式的数据库操作。使用 @Transactional 注解:在 WebFlux 应用程序中,可以使用 @Transactional 注解声明事务边界,将多个数据库操作绑定到同一事务中。需要注意的是,@Transactional 注解需要与 Reactive R2DBC 结合使用,确保事务管理器与 R2DBC 兼容。遵循响应式编程原则:在 WebFlux 应用程序中,需要遵循响应式编程的原则,使用 Mono 和 Flux 等响应式类型来处理数据流,而不是传统的阻塞式方法。这意味着在事务中涉及到的所有操作,都需要返回 Mono 或 Flux 对象,并且保证响应式链条的正确性。

本节内容以关系型数据库mysql为例,通过使用spring-boot-starter-data-r2dbc框架完成关系型数据库的调用,并通过具体的案例实现webflux应用下的数据库事务管理,包含最佳的使用实战案例,通过spring的AOP实现最终的事务控制。

正文

①引入必要的pom组件,其中spring-boot-starter-aop可以不引入,如果不使用aop管理r2dbc的数据库事务

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webflux-ui -->
<dependency>
	<groupId>org.springdoc</groupId>
	<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
	<version>2.3.0</version>
</dependency>

<!--        响应式 Spring Data R2dbc-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.asyncer/r2dbc-mysql -->
<dependency>
	<groupId>io.asyncer</groupId>
	<artifactId>r2dbc-mysql</artifactId>
	<version>1.0.6</version>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-test</artifactId>
	<scope>test</scope>
</dependency>

②创建控制层UserController请求接口测试方法:新增用户

@Operation(summary = "新增用户", description = "新增用户")
@PostMapping(value = "saveUser")
public Mono<ApiResponse> saveUser(@RequestBody User user) {
	Mono<User> userMono = userService.saveUser(user);
	return  userMono.map(ApiResponse::success);
}

③创建UserService的业务接口层

/**
 * 新增用户
 *
 * @param user
 * @return
 */
Mono<User> saveUser(User user);

 ④创建UserServiceImpl的业务实现层

    @Override
    public Mono<User> saveUser(User user) {
        return userRepository.save(user);
    }

 ⑤创建UserRepository数据操作层

package com.yundi.atp.repository;

import com.yundi.atp.entity.User;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;


@Repository
public interface UserRepository extends R2dbcRepository<User, Integer> {
    /**
     * 根据用户名获取用户信息
     *
     * @param name
     * @return
     */
    Mono<User> findUsersByName(@Param("name") String name);

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    @Query("update user set name = :#{#user.name}, age = :#{#user.age} where id = :#{#user.id}")
    Mono<User> updateUser(@Param("user") User user);


    /**
     * 动态更新用户信息
     *
     * @param user
     * @return
     */
    @Query("UPDATE User SET " +
            "name = CASE WHEN :#{#user.name} IS NULL THEN name ELSE :#{#user.name} END, " +
            "age = CASE WHEN :#{#user.age} IS NULL THEN age ELSE :#{#user.age} END " +
            "WHERE id = :#{#user.id}")
    Mono<User> updateUserQuery(@Param("user") User user);
}

⑥ 正常通过swagger工具访问用户新增接口,数据可以正常插入数据库

⑦在保存用户的业务方法中人为创建一个异常,验证是否能够正常保存数据

@Override
public Mono<User> saveUser(User user) {
	Mono<User> monoUser = userRepository.save(user);
    //人为创建一个数学异常
	System.out.println(1/0);
	return monoUser;
}

 ⑧使用swagger工具访问用户新增接口,从打印的日志来看,数据并没有插入成功,并抛出了异常,但是这里并没有数据库的事务处理

PS:这里可能会误导初学者,认为webflux的r2dbc具备天然的事务处理机制,其实不然,这是因为webflux是响应式非阻塞式编程,所有操作都是异步执行,导致业务方法会跳过用户保存的操作而先去执行异常的部分代码,直接消费异常处理结果,真正的用户保存方法并没有执行。

⑨使用@Transactional,加入事务处理注解,查看是否会有事务执行

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	Mono<User> mono = userRepository.save(user);
	System.out.println(1/0);
	return mono;
}

⑩使用swagger工具访问用户新增接口,从打印的日志来看,加入@Transactional确实切入了事务,但是由于异步执行,跳过了用户保存的方法,用户保存sql并未执行,这里的事务看起来并没有起什么作用

⑪使用Mono的flatMap方法将业务方法改为如下方式,先执行用户保存方法,在触发异常,查看打印结果

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	return userRepository.save(user).flatMap(item -> {
		System.out.println("item:" + item);
		System.out.println(1 / 0);
		return Mono.just(item);
	});
}

⑫使用swagger工具访问用户新增接口,从打印的日志来看,数据持久化sql操作真实执行了,但并没有真实插入数据库,事务确定已经真实生效,数据进行了回滚

⑬利用事务的传播机制,测试内层事务出现异常,内外层事务都会回滚

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	return userRepository.save(user).flatMap(item -> {
		System.out.println("item:" + item);
		return userRepository.findUsersByName(item.getName()).flatMap(it -> {
			System.out.println("it:" + it);
			System.out.println(1 / 0);
			return Mono.just(it);
		});
	});
}

⑭使用swagger工具访问用户新增接口,从打印的日志来看,内外层的sql都执行了,由于内层异常,导致全局的异常回滚,说明异常的传播机制是shengx

⑮最佳实践:使用aop切面实现数据库事务的统一管理,创建全局事务处理配置类ReactiveTransactionConfig

package com.yundi.atp.config;

import io.r2dbc.spi.ConnectionFactory;
import jakarta.annotation.Resource;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;


@Configuration
@EnableTransactionManagement
public class ReactiveTransactionConfig {
    @Resource
    private ConnectionFactory connectionFactory;

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager() {
        return new R2dbcTransactionManager(connectionFactory);
    }

    @Bean(name = "myTransactionInterceptor")
    public TransactionInterceptor myTransactionInterceptor() {
        //写事务控制
        DefaultTransactionAttribute txAttr_REQUIRED = new DefaultTransactionAttribute();
        txAttr_REQUIRED.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        txAttr_REQUIRED.setTimeout(2000);

        //读事务控制
        DefaultTransactionAttribute txAttr_REQUIRED_READONLY = new DefaultTransactionAttribute();
        txAttr_REQUIRED_READONLY.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

        NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
        source.addTransactionalMethod("page*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("find*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("query*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("list*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("get*", txAttr_REQUIRED_READONLY);

        source.addTransactionalMethod("save*", txAttr_REQUIRED);
        source.addTransactionalMethod("add*", txAttr_REQUIRED);
        source.addTransactionalMethod("update*", txAttr_REQUIRED);
        source.addTransactionalMethod("remove*", txAttr_REQUIRED);
        source.addTransactionalMethod("delete*", txAttr_REQUIRED);
        return new TransactionInterceptor(reactiveTransactionManager(), source);
    }

    /**
     * 切点
     *
     * @return
     */
    @Bean
    public Advisor advisor() {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression("execution(* com.yundi.atp.service..*(..))");
        return new DefaultPointcutAdvisor(pointcut, myTransactionInterceptor());
    }


}

⑯使用swagger工具访问用户新增接口,从打印的日志来看,不加@Transactional,依然可以实现事务处理

结语

至此,关于webflux项目数据库事务的内容到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

20240130在ubuntu20.04.6下给GTX1080安装最新的驱动和CUDA

20240130在ubuntu20.04.6下给GTX1080安装最新的驱动和CUDA 2024/1/30 12:27 缘起&#xff0c;为了在ubuntu20.4.6下使用whisper&#xff0c;以前用的是GTX1080M&#xff0c;装了535的驱动。 现在在PDD拼多多上了入手了一张二手的GTX1080&#xff0c;需要将安装最新的545的驱动程…

JavaScript-for循环的执行顺序

1.目标 掌握for执行顺序 2.实现思路 使用for循环输出0-到5 3.代码实现 Document 4.总结 for执行顺序 1.执行 var i 0 变量初始化 条件判断 是否成立 成立 执行循环体 不成立 退出for循环

GC8838取代DRV8838直流电机驱动芯片,可应用在摄像机,玩具等产品上

GC8838 一款 12V 直流电机驱动芯片&#xff0c;为摄像机、消费类产品、玩具和其他低压或者电池供电的运动控制类应用提供了集成的电机驱动解决方案。芯片一般用了驱动一个直流电机或者使用两颗来驱动步进电机。 可以工作在 0~12V 的电源电压上&#xff0c;能提供高达 1.5A 持续…

「优选算法刷题」:只出现一次的数字Ⅱ

一、题目 给你一个整数数组 nums &#xff0c;除某个元素仅出现 一次 外&#xff0c;其余每个元素都恰出现 三次 。请你找出并返回那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法且使用常数级空间来解决此问题。 示例 1&#xff1a; 输入&#xff1a;nums …

蓝桥杯AT24C02问题记录

问题1&#xff1a;从这个图片上可以看出这两个在IIC的.c文件里延时时间不一样&#xff0c;第一张图使用了15个_nop_(); 12M晶振机器周期是 1/12M*121uS&#xff1b;nop()要延时1个指令周期。延时时间不对会对时序产生影响&#xff0c;时序不对&#xff0c;则AT24C02有没被使用…

百度输入法往选字框里强塞广告

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 国内几乎100%的输入法都有广告&#xff0c;只是你们没发现而已&#xff01;&#xff01;&#xff01; 百度输入法居然在输入法键盘上推送广告&#xff0c;近日&#xff0c;博主阑夕 表示&#xff0c;V2EX论坛上有…

什么是DDOS流量攻击,DDoS防护安全方案

随着互联网的发展普及&#xff0c;云计算成新趋势&#xff0c;人们对生活方式逐渐发生改变的同时&#xff0c;随之而来的网络安全威胁也日益严重&#xff01; 目前在网络安全方面&#xff0c;网络攻击是最主要的威胁之一&#xff0c;其中DDoS攻击是目前最为常见的网络攻击手段…

Kafka 记录

推荐资源 官网http://kafka.apache.org/Githubhttps://github.com/apache/kafka书籍《深入理解Kafka 核心设计与实践原理》 Kafka 架构 Kafka使用ZooKeeper作为其分布式协调框架&#xff0c;其动态扩容是通过ZooKeeper来实现的。Kafka使用Zookeeper保存broker的元数据和消费者信…

Ubuntu22.04 网络图标突然消失

本来好好的&#xff0c;突然就发现没有网络了&#xff0c;图标也不见了。 特别是Ubuntu虚拟机&#xff0c;容易出现此问题。 修复办法 1. sudo service network-manager stop2. sudo rm /var/lib/NetworkManager/NetworkManager.state3. sudo service network-manager start到…

快速理解MoE模型

最近由于一些开源MoE模型的出现&#xff0c;带火了开源社区&#xff0c;为何&#xff1f;因为它开源了最有名气的GPT4的模型结构&#xff08;OPEN AI&#xff09;&#xff0c;GPT4为何那么强大呢&#xff1f;看看MoE模型的你就知道了。 MoE模型结构&#xff1a; 图中&#xff0…

基于单片机的自动浇花系统设计

摘要&#xff1a;快节奏的生活导致人们忙于工作而无暇顾及家中植物的及时浇水&#xff0c;影响了植物的生长发育&#xff0c; 也降低了其种植成功率。针对上述问题&#xff0c;该文设计了一种自动浇花系统&#xff0c;该系统能在无人环境下 根据土壤湿度情况自动启动&#xff0…

第16章_网络编程(网络通信要素,TCP与UDP协议,网络编程API,TCP网络编程,UDP网络编程,URL编程)

文章目录 第16章_网络编程本章专题与脉络1. 网络编程概述1.1 软件架构1.2 网络基础 2. 网络通信要素2.1 如何实现网络中的主机互相通信2.2 通信要素一&#xff1a;IP地址和域名2.2.1 IP地址2.2.2 域名 2.3 通信要素二&#xff1a;端口号2.4 通信要素三&#xff1a;网络通信协议…

Centos 7.9 安装 Veracrypt-1.26.7

1 下载 veracrypt-1.26.7-CentOS-7-x86_64.rpm VeraCrypt - Free Open source disk encryption with strong security for the Paranoid 2 切换到下载目录&#xff0c;打开终端&#xff0c;切换到管理员用户 运行 yum install veracrypt-1.26.7-CentOS-7-x86_64.rpm 3 安装完…

一文掌握 Golang 加密:crypto/cipher 标准库全面指南

一文掌握 Golang 加密&#xff1a;crypto/cipher 标准库全面指南 引言Golang 和加密简介crypto/cipher 库概览使用 crypto/cipher 实现加密高级功能和技巧最佳实践和性能优化总结资源推荐 引言 在现代软件开发领域&#xff0c;安全性是一个不容忽视的重要议题。随着信息技术的…

Java开发分析中文 ---- JProfiler 13

JProfiler 13是一款专业的Java应用程序性能分析工具&#xff0c;可以快速诊断和优化Java应用程序的性能问题。它支持多种操作系统和应用服务器&#xff0c;提供实时性能监控、CPU分析、内存分析、线程分析和数据库访问分析等功能。使用JProfiler 13可以深入了解应用程序的性能和…

修复TabbarButton 中标题展示不完全的问题

遇到一个问题&#xff0c;就是从列表页跳转到详情页之后&#xff0c;再返回的时候&#xff0c;tabbarbutton 中的文字变成…了 打开图层&#xff0c;如图所示&#xff0c; 标题的宽度不够了&#xff0c;本来是23的&#xff0c;返回之后变成20了。 这里就添加了一个容错&#…

Qt/C++音视频开发64-共享解码线程/重复利用解码/极低CPU占用/画面同步/进度同步

一、前言 共享解码线程主要是为了降低CPU占用&#xff0c;重复利用解码&#xff0c;毕竟在一个监控系统中&#xff0c;很可能打开了同一个地址&#xff0c;需要在多个不同的窗口中播放&#xff0c;形成多屏渲染的效果&#xff0c;做到真正的完全的画面同步&#xff0c;在主解码…

Linux(CentOS7)常见指令的常见用法(上)

指令功能hostname查看当前的主机名hostnamectl set-hostname修改主机名adduser添加用户passwd给用户设置密码userdel -r 删除用户ls显示某路径下的文件名ls -l ll 显示某路径下每个文件及其属性ls -la ls -al 显示某路径下所有文件包括隐藏文件及属性ls -d只看指定文件夹&…

作业车间调度问题:P还是NP

获取更多资讯&#xff0c;赶快关注上面的公众号吧&#xff01; 文章目录 基本概念多项式时间指数时间 P问题&#xff08;多项式问题&#xff09;NP问题&#xff08;非确定性多项式问题&#xff09;暴力穷举法动态规划 P与NP关系&#xff1a;作业车间调度问题是典型的NP难问题 …

源码篇--Redis 通信协议

文章目录 前言一、Redis 的通信过程&#xff1a;二、RESP 协议&#xff1a;三、客户端模拟RESP 通信&#xff1a;总结 前言 在我们知道redis 的网络模型后&#xff0c;继续看下 redis 的通信协议。 一、Redis 的通信过程&#xff1a; Redis是一个CS架构的软件&#xff0c;通信…