前面文章说到在开启事务后,会将数据库连接存放在当前线程的ConnectionHolder。那么后续的数据库持久化操作是怎么感知的呢。这里就要说到一个重要的类TransactionSynchronizationManager。
TransactionSynchronizationManager
TransactionSynchronizationManager是一个桥梁用来连接事务和中间持久化操作逻辑。主要是共享数据库connection及事务的判断。
来看下spring官方文档对TransactionSynchronizationManager解释:
TransactionSynchronizationManager是一个中央委托机制,用于管理每个线程的资源和事务同步。它被用于资源管理代码,而不是典型的应用程序代码。
资源管理代码应该通过getResource方法来检查与线程绑定的资源,例如JDBC连接或Hibernate会话。通常情况下,该代码不应该将资源绑定到线程,因为这是事务管理器的责任。另一个选项是在第一次使用时延迟绑定,如果事务同步处于活动状态,则可以执行跨多个资源的事务。
事务同步必须由事务管理器通过initSynchronization()和clearSynchronization()来激活和停用。AbstractPlatformTransactionManager自动支持此功能,并且所有标准的Spring事务管理器(如JtaTransactionManager和DataSourceTransactionManager)也支持该功能。
当该管理器处于活动状态时,资源管理代码应仅在此注册同步,可以通过isSynchronizationActive()进行检查;否则,应立即执行资源清理操作。如果事务同步未处于活动状态,则表示当前没有事务,或者事务管理器不支持事务同步。
同步机制例如用于在JTA事务中始终返回相同的资源,例如给定DataSource的JDBC连接或给定SessionFactory的Hibernate会话。
下面看下该类的主要结构
主要属性
//事务资源
ThreadLocal<Map<Object, Object>> resources;
ThreadLocal<Set<TransactionSynchronization>> synchronizations;
//当前事务名称
final ThreadLocal<String> currentTransactionName;
//当前事务是否只读
ThreadLocal<Boolean> currentTransactionReadOnly;
//当前事务隔离级别
ThreadLocal<Integer> currentTransactionIsolationLevel;
//当前是否有事务
ThreadLocal<Boolean> actualTransactionActive;
这里看到所有都是ThreadLocal类型变量。同样提供了静态方法对以上属性进行设置和获取
主要方法:
属性的设置
这些相关属性都是事务相关,所以要从开启事务地方开始看
1AbstractPlatformTransactionManager#startTransaction
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
这里有两步
第一步doBegin
DataSourceTransactionManager#doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
这里看到最后一步调用bindResource将ConnectionHolder存放到了resources里,这里的key值是datasource。
第二步prepareSynchronization
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
这里设置了其它属性的值,其中入参definition是@Transactionnal注解配置信息
JdbcTemplate
首先jdbcTemplate初始化时候需要注入一个datasource
@Bean
public JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(dataSource());
}
JdbcTemplate#execute方法
private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources)
throws DataAccessException {
//获取连接
Connection con = DataSourceUtils.getConnection(obtainDataSource());
PreparedStatement ps = null;
try {
ps = psc.createPreparedStatement(con);
applyStatementSettings(ps);
T result = action.doInPreparedStatement(ps);
handleWarnings(ps);
return result;
}
catch (SQLException ex) {
//...
}
finally {
if (closeResources) {//关闭连接
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
}
获取连接getConnection方法最后会调用doGetConnection方法获取连接
DataSourceUtils#doGetConnection
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
//...
}
return con;
}
这里看到获取连接首先会从ConnectionHolder里取获取连接,如果获取不到在从datasource里取一个连接。所以如果开启了事务就是使用事务开启时存放到ConnectionHolder里的Connection。
MyBatis
同样的mybatis在获取sqlsession的时候也是先会从ConnectionHolder里尝试获取。
SqlSessionUtils#getSqlSession
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
LOGGER.debug(() -> "Creating a new SqlSession");
session = sessionFactory.openSession(executorType);
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
这里的SqlSessionHolder是继承自ResourceHolderSupport,包装了一下把mybatis的session,执行类型存放进去,ConnectionHolder也是继承自ResourceHolderSupport。这里getResource的key是sessionFactory,上面分析事务开启设置的resource是connection。
sessionHolder方法就是从holder里取session。那么session是什么时候放到holder呢?
这要看registerSessionHolder方法
SqlSessionUtils#registerSessionHolder
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");
//包装session到holder
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
//注册SqlSessionSynchronization
TransactionSynchronizationManager
.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because synchronization is not active");
}
}
第一次取肯定取不到session,因为事务开启的时候key值是datasource,sqlsession是mybatis特有的这里以sessionFacotry作为key进行存储,取不到会调用registerSessionHolder方法进行SessionHolder注册,如有下一个mapper在获取session的时候就可以正常取到了。
TransactionSynchronizationManager的resources是一个map结构存储,key值不同不会覆盖原来事务设置的connectionholder。
这里设置resources的同时还注册了一个SqlSessionSynchronization,这个在下面解除绑定的时候会回调完成解除。
解除绑定
解除绑定通过回调Synchronization的beforeCompletion来完成。这里SqlSessionSynchronization是SqlSessionUtils的一个内部类
SqlSessionSynchronization#beforeCompletion
public void beforeCompletion() {
if (!this.holder.isOpen()) {
LOGGER
.debug(() -> "Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(sessionFactory);
this.holderActive = false;
LOGGER.debug(() -> "Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
this.holder.getSqlSession().close();
}
}
这里会将sessionFactory从TransactionSynchronizationManager中移除,在事务提交的时候回调。
在事务TransactinoManager处理提交的方法processCommit中有一步triggerBeforeCompletion(status);这里就会调到
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
boolean beforeCompletionInvoked = false;
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
//这一步解除绑定
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//...
}
最后会调用
TransactionSynchronizationUtils#triggerBeforeCompletion
public static void triggerBeforeCompletion() {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
synchronization.beforeCompletion();
}
}
这里就是拿出注册的Synchronizations进行依次调用,回看下前面在绑定的时候注册了一个new SqlSessionSynchronization(holder, sessionFactory)。最后调用其beforeCompletion完成解除绑定。