理论部分来自
Seata
官网:http://seata.io/zh-cn/docs/dev/mode/at-mode.html
一、前提
- 基于支持本地
ACID
事务的关系型数据库。 Java
应用,通过JDBC
访问数据库。
二、整体机制
两阶段提交协议的演变:
-
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
三、写隔离
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
以一个示例来说明:
两个全局事务 tx1
和 tx2
,分别对 a
表的 m
字段进行更新操作,m
的初始值 1000
。
-
tx1
先开始,开启本地事务,拿到本地锁,更新操作m = 1000 - 100 = 900
。 -
本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。
-
tx2
后开始,开启本地事务,拿到本地锁,更新操作m = 900 - 100 = 800
。 -
本地事务提交前,尝试拿该记录的 全局锁 ,
tx1
全局提交前,该记录的全局锁被tx1
持有,tx2
需要重试等待 全局锁
tx1
二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1
的二阶段全局回滚,则 tx1
需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2
仍在等待该数据的 全局锁,同时持有本地锁,则 tx1
的分支回滚会失败。分支的回滚会一直重试,直到 tx2
的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1
的分支回滚最终成功。
因为整个过程 全局锁 在 tx1
结束前一直是被 tx1
持有的,所以不会发生 脏写 的问题。
3.1 脏写示例
假设你的业务代码是这样的:
updateAll()
用来同时更新A
和B
表记录,updateA()
、updateB()
则分别更新A
、B
表记录updateAll()
已经加上了@GlobalTransactional
@Service
class YourBussinessService {
@Autowired
DbServiceA serviceA;
@Autowired
DbServiceB serviceB;
@GlobalTransactional
public boolean updateAll(DTO dto) {
serviceA.update(dto.getA());
serviceB.update(dto.getB());
}
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
@Service
class DbServiceA {
@Transactional
public boolean update(A a) {
}
}
3.2 使用@GlobalTransactional 防止脏写
在updateA()
也加上@GlobalTransactional
@Service
class DbServiceA {
@GlobalTransactional
@Transactional
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
updateAll()
先被调用(未完成),updateA()
后被调用
异常信息:
- 底层异常:
io.seata.rm.datasource.exec.LockConflictException: get global lock fail
- 上层异常:
io.seata.rm.datasource.exec.LockWaitTimeoutException: Global lock wait timeout
3.3 使用@GlobalLock+select for update防止脏写
@Service
class DbServiceA {
@GlobalLock
@Transactional
public boolean updateA(DTO dto) {
serviceA.selectForUpdate(dto.getA());
serviceA.update(dto.getA());
}
}
updateAll()
先被调用(未完成),updateA()
后被调用
-
那如果是
updateA()
先被调用(未完成),updateAll()
后被调用呢?
由于2
个业务都是要先获得本地锁,因此同样不会发生脏写。 -
单独用
@GlobalLock
能不能防止脏写? 能 -
利用
@GlobalLock+select for update
方式中select for update
能带来的好处?- 锁冲突更“温柔”些。如果只有
@GlobalLock
,检查到全局锁,则立刻抛出异常,也许再“坚持”那么一下,全局锁就释放了,抛出异常岂不可惜了。 - 在
updateA()
中可以通过select for update
获得最新的A
,接着再做更新。
- 锁冲突更“温柔”些。如果只有
四、读隔离
目前数据库事务的隔离级别一共有 4
种,由低到高分别为:
Read uncommitted
:读未提交Read committed
:读已提交Repeatable read
:可重复读Serializable
:序列化
在数据库本地事务隔离级别 读已提交(Read Committed
) 或以上的基础上,Seata
(AT
模式)的默认全局隔离级别是 读未提交(Read Uncommitted
) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE
语句的代理。
SELECT FOR UPDATE
语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE
语句的本地执行)并重试。这个过程中,查询是被 block
住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata
目前的方案并没有对所有 SELECT
语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句
。
4.1 脏读示例
假设你的业务代码是这样的:
updateAll()
用来同时更新A
和B
表记录,queryA()
查询A
记录,updateAll()
未执行完成,另一业务后调用queryA()
updateAll()
已经加上了@GlobalTransactional
@Service
class YourBussinessService {
@Autowired
DbServiceA serviceA;
@Autowired
DbServiceB serviceB;
@GlobalTransactional
public boolean updateAll(DTO dto) {
serviceA.update(dto.getA());
serviceB.update(dto.getB());
}
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
@Service
class DbServiceA {
public A queryA(A a) {
}
}
4.2 使用@GlobalLock+select for update防止脏读
在queryA()
也加上@GlobalLock
并使用select for update
语句
@Service
class DbServiceA {
@GlobalLock
public A queryA(A a) {
}
}
结论:
读隔离:如果业务表的更新操存在于分布式事务中,此时本地方法中对业务表进行查询操作,建议在本地事务方法上使用@GlobalTransactional+select for update
或@GlobalLock+select for update
注解防止出现数据脏读,优选@GlobalLock+select for update
方式
注意事项:
使用select for update
时需动态传入参数列表,不可使用拼接好的完整字符串查询语句,会导致获取lockKeys
为空,引起脏读
五、源码解析
5.1 脉络
- 代理数据源的用途
DataSourceProxy
的作用(返回ConnectionProxy
)- 介绍
ConnectionProxy
的功能:- 存放
undolog
- 判断
inGlobalTransaction() or isGlobalLockRequire()
- 存放
- 介绍
ConnectionProxy
的作用(返回StatementProxy
)StatementProxy.execute()
的处理逻辑io.seata.rm.datasource.exec.UpdateExecutor
的执行逻辑(查前镜像、执行sql
、查后镜像、准备undoLog
)
SelectForUpdateExecutor
的执行逻辑(挣本地锁,查全局锁。有全局锁,回滚,再争…)ConnectionProxy.commit()
的处理逻辑(注册分支事务(争全局锁),写入undoLog
,数据库提交)
- 介绍
RootContext
GlobalTransactionalInterceptor
的不同代理逻辑- 带有
@GlobalTransactional
如何处理 - 带有
@GlobalLock
如何处理
- 带有
5.2 DataSourceProxy的作用
DataSourceProxy
帮助我们获得几个重要的代理对象
- 通过
DataSourceProxy.getConnection()
获得ConnectionProxy
ConnectionProxy
中的ConnectionContext
,它的有一个功能是存放undoLog
。
io.seata.rm.datasource.ConnectionProxy
#appendUndoLog
io.seata.rm.datasourc.ConnectionContext
#appendUndoItem
5.3 通过ConnectionProxy获得PreparedStatement
io.seata.rm.datasource.ConnectionProxy
#ConnectionProxy
io.seata.rm.datasource.AbstractConnectionProxy.prepareStatement()
获取StatementProxy
5.4 StatementProxy.execute()的处理逻辑
- 当调用
io.seata.rm.datasource.StatementProxy.execute()
会将sql
交给io.seata.rm.datasource.exec.ExecuteTemplate.execute()
处理。
ExecuteTemplate.execute()
方法中,Seata
根据不同dbType
和sql
语句类型使用不同的Executer
,调用io.seata.rm.datasource.exec.Executer
类的execute()
方法。
5.4.1 UpdateExecutor处理逻辑
以io.seata.rm.datasource.exec.UpdateExecutor
举例,UpdateExecutor extends AbstractDMLBaseExecutor extends BaseTransactionalExecutor
。 观察execute()
方法的具体操作
execute()
方法继承至BaseTransactionalExecutor
类中execute
方法,内部调用受保护的抽象方法doExecute
(实际调用子类AbstractDMLBaseExecutor
中的doExecute
实现)
io.seata.rm.datasource.exec.BaseTransactionalExecutor
#execute
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor
#doExecute
重写父类BaseTransactionalExecutor
的doExecute
受保护抽象方法
io.seata.rm.datasource.exec.UpdateExecutor
如果是DML
类型Executer
,可以在上面的executeAutoCommitFalse()
中看到,主要做了以下事情:
-
查询前镜像(
select for update
,因此此时获得本地锁)
-
执行业务sql
-
查询后镜像
-
准备
undoLog
使用到上节介绍的
ConnectionProxy
中的ConnectionContext
,它的有一个功能是存放undoLog
5.4.2 SelectForUpdateExecutor的执行逻辑
如果你的sql
是select for update
则会使用SelectForUpdateExecutor
(Seata
代理了select for update
),代理后处理的逻辑是这样的:
-
先执行
select for update
(获取数据库本地锁) -
如果处于
@GlobalTransactional or @GlobalLock
,检查是否有全局锁RootContext.inGlobalTransaction()
和RootContext.requireGlobalLock()
请见5.6章节介绍RootContext
-
如果有全局锁,则未开启本地事务下会
rollback
本地事务,再重新争抢本地锁和查询全局锁,直到全局锁释放
5.5 ConnectionProxy.commit()的处理逻辑
io.seata.rm.datasource.ConnectionProxy
#commit
- 处于全局事务中(即,数据持久化方法带有
@GlobalTransactional
)- 注册分支事务,获取全局锁
undoLog
数据入库- 让数据库
commit
本次事务
io.seata.rm.datasource.ConnectionProxy
#processGlobalTransactionCommit
io.seata.rm.datasource.ConnectionProxy
#register
io.seata.rm.AbstractResourceManager
#branchRegister
- 处于
@GlobalLock
中(即,数据持久化方法带有@GlobalLock
)- 向
tc
查询是否有全局锁存在 - 让数据库
commit
本次事务
- 向
-
除了以上情况(
else
分支)- 让数据库
commit
本次事务
- 让数据库
5.6 介绍RootContext
5.6.1 RootContext.getBranchType()的返回值怎么会是AT?
RootContext.getBranchType()
调用来自于5.4章节**ExecuteTemplate.execute()
方法
io.seata.core.context.RootContext
#getBranchType
io.seata.core.context.RootContext
#inGlobalTransaction
方法
RootContext.inGlobalTransaction()
也被5.4.2章节SelectForUpdateExecutor.doExecute()
方法调用
新的问题:哪里调用了RootContext.bind()
方法?
io.seata.core.context.RootContext
#bind
5.6.2 RootContext.requireGlobalLock()怎么判断当前是否需要全局锁?
RootContext.requireGlobalLock()
调用来自于5.4章节ExecuteTemplate.execute()
方法和5.4.2章节SelectForUpdateExecutor.doExecute()
方法
io.seata.core.context.RootContext
#requireGlobalLock
新的问题:哪里调用了RootContext.bindGlobalLockFlag()
方法?
io.seata.core.context.RootContext
#bindGlobalLockFlag
5.6.3 ConnectionProxy.commit()会根据context的不同状态区分处理,那ConnectionContext是如何判断inGlobalTransaction() or isGlobalLockRequire()的呢?
- 如何判断
inGlobalTransaction()
?(注意下,这里和上面提到的RootContext
不是一个东西)
io.seata.rm.datasource.ConnectionContext
#inGlobalTransaction
哪里调用的
ConnectionContext.bind(xid)
?
- 如何判断
isGlobalLockRequire()
?
哪里调用的
ConnectionContext.setGlobalLockRequire(xid)
?
以上问题的答案都在下面:
io.seata.rm.datasource.exec.BaseTransactionalExecutor
#execute
execute(Object... args)
重点解析:
- 读取
RootContext.getXID()
内容,通过statementProxy.getConnectionProxy().bind
调用ConnectionContext.bind(xid)
方法- 读取
RootContext.requireGlobalLock()
内容,通过statementProxy.getConnectionProxy().setGlobalLockRequire
调用ConnectionContext.setGlobalLockRequire(isLock)
方法- 执行
doExecute
方法
新的问题:RootContext.getXID()
和RootContext.requireGlobalLock()
获取的值来自哪里?
RootContext.getXID()
# 和@GlobalTransactional
有关RootContext.requireGlobalLock()
# 和@GlobalLock
有关
在看过代码后,我们知道,最后的问题回归到5.6.1章节和5.6.2章节提出两个问题:
RootContext.bind()
RootContext.bindGlobalLockFlag()
在哪儿被调用的呢?答案就在下方。
5.8 GlobalTransactionalInterceptor处理带有@GlobalTransactional或@GlobalLock的方法
带有@GlobalTransactional
和@GlobalLock
的方法会被代理,交给GlobalTransactionalInterceptor
处理
io.seata.spring.annotation.GlobalTransactionalInterceptor
#invoke
5.8.1 @GlobalTransactional处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor
#handleGlobalTransaction
来到了经典的seata
事务模板方法,我们要关注开启事务的部分:
io.seata.tm.api.TransactionalTemplate
#execute
io.seata.tm.api.TransactionalTemplate
#beginTransaction
io.seata.tm.api.TransactionalTemplate
#completeTransactionAfterThrowing
#commitTransaction
io.seata.tm.api.DefaultGlobalTransaction
#begin
看到了吗?RootContext.bind(xid);
io.seata.tm.api.DefaultGlobalTransaction
#commit
io.seata.tm.api.DefaultGlobalTransaction
#rollback
5.8.2 @GlobalLock处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor
#handleGlobalLock
也使用了模板方法来处理GlobalLock
io.seata.rm.GlobalLockTemplate
#execute
看到吗,一进模板方法就RootContext.bindGlobalLockFlag();
5.9 @GlobalLock 源码解析
io.seata.spring.annotation.GlobalLock
/**
* declare the transaction only execute in single local RM
* but the transaction need to ensure records to update(or select for update) is not in global transaction middle
* stage
*
* use this annotation instead of GlobalTransaction in the situation mentioned above will help performance.
*
* @see io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary(Object, String, Object) // the scanner for TM, GlobalLock, and TCC mode
* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // the interceptor of GlobalLock
* @see io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyAdvice#invoke(MethodInvocation) // the interceptor of GlobalLockLogic and AT/XA mode
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Inherited
public @interface GlobalLock {
/**
* customized global lock retry interval(unit: ms)
* you may use this to override global config of "client.rm.lock.retryInterval"
* note: 0 or negative number will take no effect(which mean fall back to global config)
* @return lock retry interval
*/
int lockRetryInterval() default 0;
/**
* customized global lock retry interval(unit: ms)
* you may use this to override global config of "client.rm.lock.retryInterval"
* note: 0 or negative number will take no effect(which mean fall back to global config)
* @return lock retry interval
*/
@Deprecated
@AliasFor("lockRetryInterval")
int lockRetryInternal() default 0;
/**
* customized global lock retry times
* you may use this to override global config of "client.rm.lock.retryTimes"
* note: negative number will take no effect(which mean fall back to global config)
* @return lock retry times
*/
int lockRetryTimes() default -1;
}
源码注释大概含义:
- 对于某条数据,如果正在 全局事务 中进行更新(或者选择更新)操作,这时某个本地事务需要更新该数据,需要在本地事务方法上使用
@GlobalLock
注解,确保其不会对全局事务中正在操作的数据造成影响(防止出现脏写)。 - 声明事务仅在单个本地
RM
中执行 - 使用
@GlobalLock
注解而不是@GlobalTransaction
将有助于提高性能
- 属性值
lockRetryInterval
覆盖全局配置client.rm.lock.retryInterval
,校验或占用全局锁重试间隔- 属性值
lockRetryTimes
覆盖全局配置client.rm.lock.retryTimes
,校验或占用全局锁重试次数`