目录
- 一、背景
- 二、事务绑定事件介绍
- 三、事务绑定事件原理
- 四、结语
一、背景
实际工作中碰到一个场景,现存系统有10w张卡需要进行换卡,简单来说就是为用户生成一张新卡,批量换卡申请需要进行审核,审核通过后异步进行处理。
为什么要异步呢?首先是每批次的处理量很大,还有就是换卡的过程需要多次外调第三方,整个事务非常耗时,流程图如下:
这里有个问题,就是审核通过后修改申请记录的状态是一个事务,事务可能没提交,申请记录状态未变更为审核通过,MQ消费端就已经处理了,由于没查询到审核通过的记录,导致操作失败。
那么这个问题怎么解决呢?有同学会说审核通过后延迟几秒发送MQ,但事务提交的时间取决于变更的记录,网络耗时等等,不可控的因素比较多。
最好的方式就是等事务提交后再发送MQ进行异步处理,在Spring中有两种方式进行处理:
- 编程式事务,借用
PlatformTransactionManager
平台事务管理器手动提交事务后发送MQ。 - 声明式事务,通过
TransactionalEventListener
监听事务是否提交,然后再进行处理,这也是我们今天要介绍的事务绑定事件监听机制。
当然,通过事件发布和订阅的方式,也利于业务代码之间的解耦。
二、事务绑定事件介绍
从Spring 4.2
版本开始,事件监听器可以绑定到事务的某个阶段,最典型的应用就是当事务完成时再处理事件。注册一个常规的监听器我们可以通过@EventListener
来实现。
如果我们需要将事件和事务绑定可以使用 @TransactionalEventListener
注解,默认情况下,监听器会绑定到事务的提交阶段。
举个例子:
事务事件发布者:
@Component
public class TransactionalEventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
public void publishCreationEvent(CreationEvent creationEvent) {
applicationEventPublisher.publishEvent(creationEvent);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public static class CreationEvent extends ApplicationEvent {
public CreationEvent(Object source) {
super(source);
}
}
}
事务事件监听器:
@Component
public class TransactionalEventListener {
@TransactionalEventListener
public void handleOrderCreatedEvent(CreationEvent creationEvent) {
}
}
TransactionalEventListener
注解暴露了一个phase
的属性,通过该属性可以指定监听器绑定到事务的哪个阶段,该属性的值有:
- BEFORE_COMMIT:事务提交前。
- AFTER_COMMIT:事务提交后。
- AFTER_ROLLBACK:事务回滚后。
- AFTER_COMPLETION:事务完成后(提交或者回滚)。
如果事件发布时没有事务运行,事务监听器不会被调用,但可以通过设置fallbackExecution
为true来指定即使事件发布时没有事务运行,监听器也会被调用。
三、事务绑定事件原理
正常的ApplicationEvent
会在事件发布时同步调用事件监听器进行处理,但是当事件在事务环境中运行发布,且被TransacntionApplicationListener
监听时不会直接调用监听器的处理方法,而是会通过回调的方式根据事务所处的阶段进行回调。
那么这种方式到底是怎么实现的呢?
想看实现,先得看监听器的实现,处理事务事件的监听器为TransactionalApplicationListener
,它的核心实现为TransactionalApplicationListenerMethodAdapter
,从名字就能看出来是里面包含了事务事件处理的处理逻辑。
类图如下:
主要看onApplicationEvent
这个方法,可以看到逻辑也很简单:
- 如果当前线程有事务绑定,那么会通过
TransactionSynchronizationManager
事务同步管理器注册一个事务事件监听的同步器。 - 如果当前线程没有事务绑定,且
fallbackExecution
属性为true,那么会直接调用父类ApplicationListenerMethodAdapter
的processEvent
方法,说白了就是调用事件监听方法。
备注:事务事件监听器也只有在由
PlatformTransactionManage
平台事务管理器管理的线程绑定的事务中才生效。
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
else if (this.fallbackExecution) {
if (getTransactionPhase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
至于TransactionalApplicationListenerSynchronization
是啥呢?
其实它就是一个事务同步的回调接口,主要由AbstractPlatformTransactionManager
抽象平台事务管理器在事务提交的各个阶段进行调用。
说到平台事务管理器大家肯定都熟悉,它包含了Spring中事务处理基本流程,比如:
- 当前方法是否有事务运行。
- 应用指定的隔离级别。
- 挂起或者恢复事务。
- 检查提交时的
rollback-only
标识。 - 触发注册的同步回调(TransactionSynchronization)。
public interface TransactionSynchronization extends Ordered, Flushable {
/** Completion status in case of proper commit. */
int STATUS_COMMITTED = 0;
/** Completion status in case of proper rollback. */
int STATUS_ROLLED_BACK = 1;
/** Completion status in case of heuristic mixed completion or system errors. */
int STATUS_UNKNOWN = 2;
@Override
default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
default void suspend() {
}
default void resume() {
}
@Override
default void flush() {
}
default void beforeCommit(boolean readOnly) {
}
default void beforeCompletion() {
}
default void afterCommit() {
}
default void afterCompletion(int status) {
}
}
从上面的方法可以看出,TransactionSynchronization
包含了一些事务相关的回调方法,我们看看它的实现TransactionalApplicationListenerSynchronization
到底做了什么?
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
private final E event;
private final TransactionalApplicationListener<E> listener;
private final List<TransactionalApplicationListener.SynchronizationCallback> callbacks;
public TransactionalApplicationListenerSynchronization(E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
this.event = event;
this.listener = listener;
this.callbacks = callbacks;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}
@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
private void processEventWithCallbacks() {
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
this.listener.processEvent(this.event);
}
catch (RuntimeException | Error ex) {
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
throw ex;
}
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}
}
上面的代码也很简单,该类的beforeCommit和afterCompletion方法会判断当前事务的状态以及监听器里指定的事务阶段去调用监听器里的业务方法。
四、结语
至此,事务事件监听的实现还是不复杂的,平时工作中也会有很多场景会用到,非常实用,下面是一个简单的流程图: