背景:
在 事务-1 事务隔离级别和Spring事务传播机制 中对事务的特性、隔离级别、Spring事务的传播机制结合案例进行了分析;在 事务-2 Spring与Mybatis事务实现原理
中对JDBC、Mybatis、Spring整合Mybatis实现事务的原理结合框架源码进行了介绍,过程中对SqlSession和SqlSessionTemplate的线程安全性也进行了说明。
本文以前两篇文章为基础,补充了一些遗漏的知识点(偏向于Spring),建议读者先阅读完上述两篇文章,有任何疑问或问题可以在品论区留言。
1.Spring事务
ORM框架是连接代码和数据库的桥梁,Spring作为基础框架提供了Spring Data JPA,也提供了适配其他ORM框架的能力,如集成Mybatis和Hibernate等。需要注意ORM框架提供的事务能力依赖于数据库事务,是对数据库事务的一层封装;如果底层数据库不支持事务(如Mysql的MyISAM引擎),在此之上的所有数据库或者DAO操作都无事务特性。
Spring事务提供了两种使用方式:声明式 (通过@Transactional注解的方式) 和编程式 (使用TransactionTemplate包裹代码方式)。其中,声明式使用较为简单且不容易出错;编程式相对比较灵活,可以进行细粒度控制。
声明式和编程式的底层实现原理相同,因编程式不常见,下文以声明式为例对Spring事务进行介绍。
对于声明式事务,Spring通过AOP机制将业务代码包裹起来,在业务代码前后开启和关闭事务,根据执行情况以及自定义条件进行事务的提交或回滚;提交/回滚时需要先获取对应的Connection对象,然后基于该对象进行。如何保证执行sql语句时的Connection对象和提交/回滚时的Connection对象为同一对象,以及同一线程在不同事务上下文的执行过程中获取的Connection对象为不是同一个;前者说明了Spring事务的底层原理,后者提供了线程安全保证。
这部分内容是本文的重点内容,也是理解Spring事务概念的关键;其实读者心里已经有概念了:ThreadLocal。
2.使用方式
参考: 事务-1 事务隔离级别和Spring事务传播机制 和 事务-2 Spring与Mybatis事务实现原理
3.实现原理
Spring Boot项目中常见的ORM框架有JPA、Hibernate、Mybatis, 本章节以Spring Boot整合Mybatis的业务场景为例对Spring事务进行说明。
3.1 Spring事务组件
在Spring项目中,可以通过xml配置(配置AOP)或注解(@EnableTransactionManagement)的方式开启事务功能;但都需要定义PlatformTransactionManager类型的Bean对象, 如下所示:
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
而SpringBoot项目通过自动装配机制,默认开启了Spring事务。
Spring为支持事务功能提供了InfrastructureAdvisorAutoProxyCreator后置处理器、TransactionInterceptor拦截器和BeanFactoryTransactionAttributeSourceAdvisor增强等组件,在本章节后续中会陆续进行介绍。
3.2 代理对象
InfrastructureAdvisorAutoProxyCreator同 Spring系列-8 AOP使用与原理 文中介绍的AnnotationAwareAspectJAutoProxyCreator均为AbstractAdvisorAutoProxyCreator的实现类,而AOP逻辑被封装在了AbstractAdvisorAutoProxyCreator中,且在Spring系列-8 AOP使用与原理 中已对该部分进行了较为全面地介绍,因此相同部分认为读者已知,本文不再赘述。
InfrastructureAdvisorAutoProxyCreator作为BPP,在对象初始化后期会调用InfrastructureAdvisorAutoProxyCreator的postProcessAfterInitialization方法:
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
不考虑循环依赖场景,进入wrapIfNecessary(bean, beanName, cacheKey)
方法:
// 省略缓存、日志、异常分支
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
return createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
}
return bean;
}
逻辑较为清晰:先通过getAdvicesAndAdvisorsForBean获取增强逻辑,后然通过动态代理技术将增强逻辑织入到目标对象中。
跟进getAdvicesAndAdvisorsForBean
方法,查看获取增强逻辑部分的源码实现:
// AbstractAdvisorAutoProxyCreator类中
protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
List<Advisor> candidateAdvisors = findCandidateAdvisors();
List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
extendAdvisors(eligibleAdvisors);
if (!eligibleAdvisors.isEmpty()) {
eligibleAdvisors = sortAdvisors(eligibleAdvisors);
}
return eligibleAdvisors;
}
从IOC中获取所有Avisor类型的Bean对象,过滤出符合条件的部分,排序后返回;整体逻辑同Spring系列-8 AOP使用与原理,区别在于过滤部分:findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
。
过滤逻辑如下:
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
} else if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
} else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}
入参:
(1) Advisor advisor:为BeanFactoryTransactionAttributeSourceAdvisor类型,来自IOC容器;
(2) Class<?> targetClass:目标类的class对象;
(3) boolean hasIntroductions:事务增强不存在引介,为false;
BeanFactoryTransactionAttributeSourceAdvisor为PointcutAdvisor子类,进一步跟踪canApply方法:
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
MethodMatcher methodMatcher = pc.getMethodMatcher();
Set<Class<?>> classes = new LinkedHashSet<>();
classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
for (Class<?> clazz : classes) {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
return false;
}
其中,methodMatcher.matches(method, targetClass)
会返回方法是否添加了@Transactional注解,通过computeTransactionAttribute
方法解析得到的TransactionAttribute是否为空进行确定:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
//。。。 省略
return null;
}
首先判断方法是否为public, 非public方法返回false;尝试从目标方法中解析出TransactionAttribute对象,如果不为空则返回;否则再次尝试从目标类对象中解析TransactionAttribute对象。
因此,需要满足以下条件,才可以生产Spring事务代理对象,实现事务功能:
(1)目标对象是被IOC管理的Bean对象;
(2)需要在类或者方法上注解@Transactional;方法中的优先级高于类;类上的注解对整个类中的方法生效;
(3)要求方法必须为public和非static;
本节最后再看一下BeanFactoryTransactionAttributeSourceAdvisor增强对象的拦截器:
当完成代理后,TransationInterceptor拦截器作为增强逻辑被织入到了目标对象中;当目标方法被调用时,进行拦截器逻辑。
3.3 拦截器
当业务代码被调用时,进入动态代理的拦截器即TransactionInterceptor的invoke方法中:
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
MethodInvocation对象作为CGLIB代理拦截器方法的入参,持有目标对象的类型信息、目标方法以及调用目标方法的逻辑,分别对应上述:AopUtils.getTargetClass(invocation.getThis())
、invocation.getMethod()
、invocation::proceed
.
invokeWithinTransaction的主线逻辑如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, InvocationCallback invocation) {
// ⚠️1.开启事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// ⚠️2.直接调用目标方法,得到返回结果
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// ⚠️3.异常场景处理-根据条件判断是否rollback
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
cleanupTransactionInfo(txInfo);
}
// ⚠️4.正常场景处理-commit
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
step1: 开启事务
Spring通过TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
开启事务,并将信息存储在TransactionInfo对象中。
在进入代码之前,先了解一下TransactionInfo对象的组成结构:
上图可以形象地表示为:
TransactionInfo持有TransactionStatus对象;
TransactionStatus持有DataSourceTransactionObject
DataSourceTransactionObject持有ConnectionHolder
ConnectionHolder持有connection对象;
即,TransactionInfo属性中保存着connection对象信息。
跟进createTransactionIfNecessary方法的主线逻辑:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override public String getName() { return joinpointIdentification;}
};
TransactionStatus status = tm.getTransaction(txAttr);
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
先介绍一下如参:
(1) PlatformTransactionManager tm: 从IOC容器中根据TransactionManager获取的Bean对象,此时为DataSourceTransactionManager类型;
(2) TransactionAttribute txAttr: 对@Transactional注解信息进行的封装;
(3) String joinpointIdentification:字符串类型,目标方法对应的 “包名.类名.方法名”。
createTransactionIfNecessary方法逻辑线如下:先对TransactionAttribute进行了一层简单封装DelegatingTransactionAttribute,增强了一个getName()方法,返回joinpointIdentification信息;
然后通过PlatformTransactionManager和TransactionAttribute获取TransactionStatus对象(该部分是关键);
最后将PlatformTransactionManager、DelegatingTransactionAttribute、TransactionStatus整合成TransactionInfo并返回。
其中,第二步中涉及Connection对象的创建以及事务的开启,需要引起重视:
// 删除日志和异常分支,并尽可能进行简化以突出主线逻辑:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 构造DataSourceTransactionObject对象
Object transaction = doGetTransaction();
return startTransaction(definition, transaction, false, null);
}
上述方法逻辑较为简单:创建一个DataSourceTransactionObject对象,创建一个空的ConnectionHolder对象并复制给DataSourceTransactionObject对象的connectionHolder属性。然后调用startTransaction
方法开启事务:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
startTransaction代码逻辑较为简单:根据已有的事务信息对构建DefaultTransactionStatus对象,并通过doBegin
和prepareSynchronization
对其进行处理。
本文关心的内容在doBegin
方法中:
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
// ⚠️1.根据DataSource对象创建Connection对象
Connection newCon = this.obtainDataSource().getConnection();
// ⚠️2.将Connection对象通过ConnectionHolder包装后赋值给DataSourceTransactionObject对象
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// ⚠️3.对DataSourceTransactionObject对象进行状态设置
Connection con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
// ⚠️4.将Connection对象绑定到线程上下文中
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
}
上述代码逻辑可以分为以下几个步骤:
(1)获取DataSource对象(此时为HikariDataSource), 然后基于该对象创建Connection对象;因此不同事务中 (即使同一线程) 的Connection不同。
(2)将Connection对象通过ConnectionHolder包装后赋值给DataSourceTransactionObject对象,用于后续的事务会滚或提交。
(3)为DataSourceTransactionObject设置超时时间、是否自动提交、隔离级别等;
(4)将Connection对象绑定到线程上下文中,为操作数据库时提供connection对象。进入TransactionSynchronizationManager类中的bindResource方法,ConnectionHolder对象被保存到resources对象中,该对象的定义如下:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
即resources对象为ThreadLocal,这要求事务内部只有一个线程运行。对于如下场景,事务会失去其作用:
目标方法中包括了数据库操作1-4,线程1在执行目标方法前后开启和关闭了事务;执行过程中,开启了两个子线程:线程2用于处理数据库操作1,线程3用于处理数据库操作2,数据库操作3和数据库操作4在线程1中进行。此时数据库操作3和4在线程1开启的事务中生效;而数据库操作1和2成为了独立的事务。
step2: 直接调用目标方法,得到返回结果
createTransactionIfNecessary
方法的入参InvocationCallback invocation
类型是一个函数式接口:
@FunctionalInterface
protected interface InvocationCallback {
@Nullable
Object proceedWithInvocation() throws Throwable;
}
传参为:invocation::proceed
,当InvocationCallback对象的proceedWithInvocation()方法被调用时,执行invocation::proceed
这个lambda表达式,即目标对象的方法被执行。
step3: 回滚事务
当通过反射调用目标方法抛出Throwable类型的异常时,进入completeTransactionAfterThrowing(txInfo, ex);
分支:
// 这里删除了日志和异常处理带阿妹,突出主线逻辑:
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
// 回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} else {
// 提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
txInfo.transactionAttribute
属性中包含着注解在@Transactional中信息,如下所示:
其中,txInfo.transactionAttribute.rollbackOn(ex)
用于校验目标方法执行时抛出的异常是否在指定异常范围内:范围之内满足回滚条件,触发回滚;否则,触发提交操作。
举例说明:
@Transactional(rollbackFor = Exception.class)
public void updateMoney(int money) {
accountRepository.updateMoney(money);
}
通过@Transactional注解指定的异常类型为Exception,当accountRepository.updateMoney(money);
语句执行抛出Exception或者其子类时(如IOException),执行回滚操作。
本文主题是介绍Spring实现事务的原理,不涉及Spring事务传播机制原理的介绍
因此,提交或者回滚关注主逻辑,略去事务传播机制的逻辑分支
回滚事务时,追踪txInfo.transactionAttribute.rollbackOn(ex)
:
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.rollback();
} catch (SQLException var5) {
throw new TransactionSystemException("Could not roll back JDBC transaction", var5);
}
}
提交事务时,追踪txInfo.transactionAttribute.commit(ex)
:
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.commit();
} catch (SQLException var5) {
throw new TransactionSystemException("Could not commit JDBC transaction", var5);
}
}
无论rollback还是commit都依赖于Connection对象,即依赖于数据库的事务能力。
其中,获取Connection对象来自DefaultTransactionStatus对象,即来源于step1: 开启事务中创建的Connection对象。
step4: 提交事务
当目标方法执行过程中无异常发生时,进入以下逻辑:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
与 step3: 回滚事务中介绍一致,用于提交事务对象。
3.4 数据库操作
在3.3 拦截器节中的step2步骤中调用invocation::proceed表达式反射调用目标方法,本节中结合Mybatis案例进行细节分析。
案例如下所示:
// Mapper类及其配置文件
@Mapper
public interface AccountMapper {
void updateMoney(int money);
}
<mapper namespace="com.seong.dao.AccountMapper">
<update id="updateMoney" parameterType="java.lang.Integer">
update t_account
set money = #{money}
where name = 'a'
</update>
</mapper>
当AccountMapper(动态代理对象)的updateMoney方法被调用时,由拦截器进入SqlSessionTemplate的update方法:
public int update(String statement, Object parameter) {
return this.sqlSessionProxy.update(statement, parameter);
}
sqlSessionProxy也是一个代理对象,再次进入拦截器对象中(上述两次拦截在前文提及的事务文章中有介绍,有需要请参考) :
// 尽可能删除分支,突出核心逻辑:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// ⚠️1.获取sqlSession对象
SqlSession sqlSession = SqlSessionUtils.getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
// ⚠️2.数据库操作
Object result = method.invoke(sqlSession, args);
// ⚠️3.关闭 SqlSession
SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
return result;
}
这部分的逻辑是获取sqlSession对象,并执行数据库操作;注意这里也有一个ThreadLocal对象,读者可自行分析其作用。这里关注的重点是SqlSessionUtils.getSqlSession
获取sqlsession对象以及执行过程。
SqlSession对象的属性和行为如下图所示:
通过executor间接包含了transaction事务对象,即每个SqlSession对象内部持有一个事务对象。事务对象创建之初内部的Connection属性为空,直到需要执行数据库操作时才会根据事务对象生成。通过SqlSessionUtils.getSqlSession
方法得到的SqlSession对象也是如此。
追踪该方法的调用链,进入openSessionFromDataSource方法:
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Environment environment = configuration.getEnvironment();
TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
Transaction tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
}
从Mybatis的configuration配置中获取环境对象,并从环境对象中获取事务工厂;注意:这里的configuration和环境对象、事务工厂是全局唯一的。当Spring集成Mybatis时,TransactionFactory会被设置为SpringManagedTransactionFactory类型。由SpringManagedTransactionFactory事务工厂创建的对象自然为SpringManagedTransaction类型:
public class SpringManagedTransactionFactory implements TransactionFactory {
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new SpringManagedTransaction(dataSource);
}
}
当执行sql语句前先通过transaction对象获取数据库连接对象:
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
进入SpringManagedTransaction的getConnection方法:
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
所以,SpringManagedTransaction是通过DataSourceUtils.getConnection(this.dataSource)
获取Connection对象,追踪其调用链进入doGetConnection方法:
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);
if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {
// ...
} else {
conHolder.requested();
return conHolder.getConnection();
}
}
通过TransactionSynchronizationManager.getResource(dataSource)
获取的ConnectionHolder对象正是开启事务时放入的。
因此,保证了事务在执行sql以及会滚/提交过程中使用的是同一个Connection对象。