源码解析
自动化配置
在spring-boot-autoconfigure
查看spring.factories
引入TransactionAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,\
查看TransactionAutoConfiguration
源码发现包含注解@EnableTransactionManagement
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnBean({TransactionManager.class})
@ConditionalOnMissingBean({AbstractTransactionManagementConfiguration.class})
public static class EnableTransactionManagementConfiguration {
public EnableTransactionManagementConfiguration() {
}
@Configuration(
proxyBeanMethods = false
)
@EnableTransactionManagement(
proxyTargetClass = true
)
@ConditionalOnProperty(
prefix = "spring.aop",
name = {"proxy-target-class"},
havingValue = "true",
matchIfMissing = true
)
public static class CglibAutoProxyConfiguration {
public CglibAutoProxyConfiguration() {
}
}
@Configuration(
proxyBeanMethods = false
)
@EnableTransactionManagement(
proxyTargetClass = false
)
@ConditionalOnProperty(
prefix = "spring.aop",
name = {"proxy-target-class"},
havingValue = "false",
matchIfMissing = false
)
public static class JdkDynamicAutoProxyConfiguration {
public JdkDynamicAutoProxyConfiguration() {
}
}
}
查看@EnableTransactionManagement
源码,发现TransactionManagementConfigurationSelector
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default 2147483647;
}
查看TransactionManagementConfigurationSelector
,引入了ProxyTransactionManagementConfiguration
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
public TransactionManagementConfigurationSelector() {
}
protected String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[]{this.determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return ClassUtils.isPresent("javax.transaction.Transactional", this.getClass().getClassLoader()) ? "org.springframework.transaction.aspectj.AspectJJtaTransactionManagementConfiguration" : "org.springframework.transaction.aspectj.AspectJTransactionManagementConfiguration";
}
}
切面和拦截器
ProxyTransactionManagementConfiguration
往容器中注入BeanFactoryTransactionAttributeSourceAdvisor
,切面是TransactionAttributeSource
,拦截器是TransactionInterceptor
@Configuration(
proxyBeanMethods = false
)
@Role(2)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
public ProxyTransactionManagementConfiguration() {
}
@Bean(
name = {"org.springframework.transaction.config.internalTransactionAdvisor"}
)
@Role(2)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder((Integer)this.enableTx.getNumber("order"));
}
return advisor;
}
@Bean
@Role(2)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(2)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
判断类是否需要被事务拦截。
AnnotationTransactionAttributeSource#isCandidateClass
,判断是候选类
public boolean isCandidateClass(Class<?> targetClass) {
Iterator var2 = this.annotationParsers.iterator();
TransactionAnnotationParser parser;
do {
if (!var2.hasNext()) {
return false;
}
parser = (TransactionAnnotationParser)var2.next();
} while(!parser.isCandidateClass(targetClass));
return true;
}
SpringTransactionAnnotationParser#isCandidateClass
public boolean isCandidateClass(Class<?> targetClass) {
return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}
拦截器的具体逻辑
拦截器TransactionInterceptor
,执行TransactionInterceptor#invoke
方法。
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method var10001 = invocation.getMethod();
invocation.getClass();
return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}
TransactionAspectSupport#invokeWithinTransaction
,具体的事务处理逻辑。
-
createTransactionIfNecessary
。创建事务,包含数据库的连接和事务状态等信息; -
completeTransactionAfterThrowing
。调用目标方法报错,回滚; -
commitTransactionAfterReturning
。调用目标方法成功,提交事务(里面有一些判断,不符合条件可能会回滚)
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
TransactionAttributeSource tas = this.getTransactionAttributeSource();
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
TransactionManager tm = this.determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// ...
} else {
PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);
String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
Object retVal;
// ...
} else {
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
try {
retVal = invocation.proceedWithInvocation();
} catch (Throwable var18) {
this.completeTransactionAfterThrowing(txInfo, var18);
throw var18;
} finally {
this.cleanupTransactionInfo(txInfo);
}
if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
this.commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}
获取事务
TransactionAspectSupport#createTransactionIfNecessary
,创建事务
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
AbstractPlatformTransactionManager#getTransaction
,获取事务。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();
Object transaction = this.doGetTransaction();
boolean debugEnabled = this.logger.isDebugEnabled();
if (this.isExistingTransaction(transaction)) {
return this.handleExistingTransaction(def, transaction, debugEnabled);
} else if (def.getTimeout() < -1) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
} else if (def.getPropagationBehavior() == 2) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {
if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = this.getTransactionSynchronization() == 0;
return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
} else {
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
if (debugEnabled) {
this.logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return this.startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (Error | RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
}
}
}
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
DataSourceTransactionManager#doBegin
,事务管理器开启连接,设置未提交。判断事务是新的,则将资源绑定到TransactionSynchronizationManager
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
事务的数据库连接信息将保存到TransactionSynchronizationManager
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
map = new HashMap();
resources.set(map);
}
Object oldValue = ((Map)map).put(actualKey, value);
if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
} else {
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");
}
}
}
事务回滚
TransactionAspectSupport#completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException var6) {
this.logger.error("Application exception overridden by rollback exception", ex);
var6.initApplicationException(ex);
throw var6;
} catch (Error | RuntimeException var7) {
this.logger.error("Application exception overridden by rollback exception", ex);
throw var7;
}
} else {
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException var4) {
this.logger.error("Application exception overridden by commit exception", ex);
var4.initApplicationException(ex);
throw var4;
} catch (Error | RuntimeException var5) {
this.logger.error("Application exception overridden by commit exception", ex);
throw var5;
}
}
}
}
AbstractPlatformTransactionManager#rollback
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
this.processRollback(defStatus, false);
}
}
DataSourceTransactionManager#doRollback
,调用到Connection
的回滚方法
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
this.logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
} catch (SQLException var5) {
throw new TransactionSystemException("Could not roll back JDBC transaction", var5);
}
}
事务提交
TransactionAspectSupport#commitTransactionAfterReturning
,进行事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
获取连接
SpringManagedTransaction#getConnection
public Connection getConnection() throws SQLException {
if (this.connection == null) {
this.openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
LOGGER.debug(() -> {
return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring";
});
}
DataSourceUtils#getConnection
,获取连接,是从TransactionSynchronizationManager
获取的,用到了线程变量。
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
} catch (SQLException var2) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", var2);
} catch (IllegalStateException var3) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + var3.getMessage());
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);
if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
ConnectionHolder holderToUse = conHolder;
if (conHolder == null) {
holderToUse = new ConnectionHolder(con);
} else {
conHolder.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
} catch (RuntimeException var4) {
releaseConnection(con, dataSource);
throw var4;
}
}
return con;
} else {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
}