文章目录
- 一、前言
- 二、TransactionSynchronization
-
- 1. TransactionSynchronization
-
- 1.1 TransactionSynchronization 的定义
- 1.2 TransactionSynchronizationManager
- 2. TransactionSynchronization 原理简述
-
- 2.1 TransactionSynchronization#beforeCommit
- 2.2 TransactionSynchronization#beforeCompletion
- 2.3 TransactionSynchronization#afterCommit
- 2.4 TransactionSynchronization#afterCompletion
- 3. 总结
- 三、TransactionalEventListener
-
- 1. TransactionalEventListener
- 2. 执行流程
-
- 2.1 EventListenerMethodProcessor
-
- 2.1.1 EventListenerMethodProcessor 的注入
- 2.1.2 EventListenerMethodProcessor 的调用
- 2.2 EventListenerFactory
- 2.3 TransactionalApplicationListenerMethodAdapter
-
- 2.3.1 TransactionalApplicationListenerSynchronization
- 2.3.2 ApplicationListenerMethodAdapter#processEvent
一、前言
本文是 Spring源码分析的衍生文章。主要是因为本人菜鸡,在分析源码的过程中还有一些其他的内容不理解,故开设衍生篇来完善内容以学习。
全集目录:Spring源码分析:全集整理
背景不知道写啥
二、TransactionSynchronization
1. TransactionSynchronization
我们这里以一个 Demo 为例,如下:当调用 DemoService#testTransactionSynchronization 方法时会往sys_role 插入role_id = 1 和 role_id = 2 的两条记录。同时该方法通过 @Transactional(rollbackFor = Exception.class) 开启了事务。
@Service
public class DemoServiceImpl implements DemoService {
/**
* 节省空间 addRole 和 addPermission 的实现就不给出. 就是简单的往表里插入一条记录
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public String testTransactionSynchronization() {
// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录
addRole(1);
// step2 : 在 sys_role 表中插入一条 role_id = 2 的记录
addRole(2);
System.out.println("end");
return "";
}
// 在 sys_role 表中插入一条 role_id = id 的记录
@Override
public void addRole(int id) {
...
}
// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。节省空间不给出实现
@Override
public void addPermission(int id) {
...
}
}
那么可以预想到,step1 和 step 2 要么同时成功,要么同时失败、现在更改一下需求 :增加 step3 在 DemoService#testTransactionSynchronization 方法的事务提交后才执行。
实现方法有若干种,这里介绍本文的 TransactionSynchronization 的使用。借助 TransactionSynchronization,可以实现在事务挂起、恢复、提交前后、提交后等时机进行完成指定的操作。具体实现如下:
@Transactional(rollbackFor = Exception.class)
@Override
public String testTransactionSynchronization() {
// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录
addRole(1);
// step2 : 在 sys_role 表中插入一条 role_id = 2 的记录
addRole(2);
// 向事务同步管理器注册一个 TransactionSynchronization 匿名实现类,作用是当前事务提交后,执行 addPermission 方法。
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
// 当前事务提交后触发该方法
// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。
@Override
public void afterCommit() {
addPermission(1);
}
});
System.out.println("end");
return "";
}
上面的实际即是:当 testTransactionSynchronization 的事务提交后,才会执行 TransactionSynchronizationManager#registerSynchronization 注册的TransactionSynchronization。这里我们实现了 TransactionSynchronization#afterCommit,即会在事务提交后执行。
看到了上面的 Demo,下面我们来了解一下涉及到的 TransactionSynchronization 和 TransactionSynchronizationManager
1.1 TransactionSynchronization 的定义
TransactionSynchronization 作为事务同步回调的接口,可以实现 Ordered 接口来影响它们的执行顺序。 未实现 Ordered 接口的同步将附加到同步链的末尾。TransactionSynchronization 提供了很多方法,定义如下 :
public interface TransactionSynchronization extends Flushable {
/** Completion status in case of proper commit. */
// 正确提交时的完成状态
int STATUS_COMMITTED = 0;
/** Completion status in case of proper rollback. */
// 在正确回滚的情况下完成状态
int STATUS_ROLLED_BACK = 1;
/** Completion status in case of heuristic mixed completion or system errors. */
// 在启发式混合完成或系统错误的情况下的完成状态
int STATUS_UNKNOWN = 2;
// 事务挂起时调用。 如果管理任何资源,应该从 TransactionSynchronizationManager 取消绑定资源。
default void suspend() {
}
// 事务恢复时调用。如果管理任何资源,应该将资源重新绑定到 TransactionSynchronizationManager
default void resume() {
}
// 将底层会话刷新到数据存储区(如果适用):例如,Hibernate/JPA 会话。
@Override
default void flush() {
}
// 事务提交前调用。此处若发生异常,会导致回滚。
default void beforeCommit(boolean readOnly) {
}
// 事务提交前, 在 beforeCommit 后调用。此处若发生异常,不会导致回滚。
default void beforeCompletion() {
}
// 事务提交后调用
default void afterCommit() {
}
// 事务提交 或回滚后执行。
default void afterCompletion(int status) {
}
}
1.2 TransactionSynchronizationManager
TransactionSynchronizationManager 事务同步管理器,保存的是各个线程中的事务信息。Spring 在事务过程中通过此类来管理事务。部分定义如下:
public abstract class TransactionSynchronizationManager {
//线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
//事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
// 事务的名称
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
// 事务是否是只读
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
// 事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
// 事务是否开启 actual:真实的
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
....
}
其中 TransactionSynchronizationManager#registerSynchronization 实现如下:
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
// 获取当前线程绑定的 TransactionSynchronization 集合。
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
// 将当前 新增的 TransactionSynchronization 添加到集合中
synchs.add(synchronization);
}
可以看到,在 TransactionSynchronizationManager#registerSynchronization中会将注册的TransactionSynchronization 添加到 TransactionSynchronizationManager#synchronizations集合中,当事务执行到指定阶段后进行触发,我们下面来详细看一看。
2. TransactionSynchronization 原理简述
我们这里不关注 事务的挂起恢复等场景(太复杂,写一半放弃了 ),只关注 beforeCommit、beforeCompletion、afterCommit、afterCompletion 四个常用的方法 的调用时机 。常规的调用顺序为 :
业务代码 -> beforeCommit -> beforeCompletion -> afterCommit -> afterCompletion
在 Spring 中事务提交会调用 AbstractPlatformTransactionManager#processCommit 方法,上述四个方法都在该方法中有调用,其实现如下:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预留操作
prepareForCommit(status);
// 调用自定义触发器的方法
// 1. 触发 TransactionSynchronization#beforeCommit
triggerBeforeCommit(status);
// 2.1 触发 TransactionSynchronization#beforeCompletion
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果设置了保存点信息
if (status.hasSavepoint()) {
// 清除保存点信息
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 如果是新事物
else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 如果是独立事务则直接提交
doCommit(status);
}
// 不是新事物并不会直接提交,而是等最外围事务进行提交。
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
// 4.1 触发 TransactionSynchronization#afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
// 2.2 触发 TransactionSynchronization#beforeCompletion
doRollbackOnCommitException(status, ex);
}
else {
// 4.2 触发 TransactionSynchronization#afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交过程中会出现异常则回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 3. 触发 TransactionSynchronization#afterCommit
triggerAfterCommit(status);
}
finally {
// 4.3 触发 TransactionSynchronization#afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 清理事务信息
cleanupAfterCompletion(status);
}
}
上面的代码中我们标注了每个方法的调用,下面我们具体来看
2.1 TransactionSynchronization#beforeCommit
TransactionSynchronization#beforeCommit 在事务提交前触发,在当前方法抛出异常会导致整个事务回滚。
上面代码注释标明在 AbstractPlatformTransactionManager#triggerBeforeCommit 方法中调用了该方法,其实现如下:
protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
// 触发 TransactionSynchronization#beforeCommit 方法。入参是当前事务是否只读
TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
}
}
TransactionSynchronizationUtils#triggerBeforeCommit 实现如下:
public static void triggerBeforeCommit(boolean readOnly) {
// 从 TransactionSynchronizationManager 中获取所有注册的 TransactionSynchronization 触发 beforeCommit 方法
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
synchronization.beforeCommit(readOnly);
}
}
2.2 TransactionSynchronization#beforeCompletion
TransactionSynchronization#beforeCompletion 在事务提交前,在 TransactionSynchronization#beforeCommit 后调用。在AbstractPlatformTransactionManager#triggerBeforeCompletion 方法调用时自身捕获了异常打印了 debug 级别日志,所以如果该方法抛出异常并不会导致事务回滚。
AbstractPlatformTransactionManager#triggerBeforeCompletion 实现如下:
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering beforeCompletion synchronization");
}
TransactionSynchronizationUtils.triggerBeforeCompletion();
}
}
TransactionSynchronizationUtils#triggerBeforeCompletion 实现如下
public static void triggerBeforeCompletion() {
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
try {
synchronization.beforeCompletion();
}
catch (Throwable ex) {
// 捕获beforeCompletion抛出的异常,打印 debug 日志、
logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);
}
}
}
这里注意 beforeCompletion 的调用场景有两处。
- 代码注释 2.1 处是常规流程的执行
- 代码注释 2.2 处则是执行出现异常时,即当代码执行了 代码注释1处后抛出异常时执行。
2.3 TransactionSynchronization#afterCommit
TransactionSynchronization#afterCommit 在事务提交后调用。该方法抛出异常也不会导致事务回滚但是会触发 TransactionSynchronization#afterCompletion 方法。
AbstractPlatformTransactionManager#triggerAfterCommit 实现如下:
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
TransactionSynchronizationUtils#triggerAfterCommit 实现如下:
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
2.4 TransactionSynchronization#afterCompletion
TransactionSynchronization#afterCompletion 在 TransactionSynchronization#afterCommit 执行之后调用。无论事务正常提交还是异常回滚,都会触发该方法。
需要注意的是 该方法在 AbstractPlatformTransactionManager#processRollback 中也有调用。AbstractPlatformTransactionManager#processRollback 方法是事务回滚时执行的方法,其内部也是通过 AbstractPlatformTransactionManager#triggerAfterCompletion 来触发 afterCompletion 方法,因此在这里就不再展开。
AbstractPlatformTransactionManager#triggerAfterCompletion
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
// No transaction or new transaction for the current scope ->
// invoke the afterCompletion callbacks immediately
// 当前作用域没有事务或有新事务, 触发 TransactionSynchronization#afterCompletion 方法
invokeAfterCompletion(synchronizations, completionStatus);
}
else if (!synchronizations.isEmpty()) {
// Existing transaction that we participate in, controlled outside
// of the scope of this Spring transaction manager -> try to register
// an afterCompletion callback with the existing (JTA) transaction.
// 我们参与的现有事务,在此 Spring 事务管理器范围之外控制 -> 尝试使用现有(JTA)事务注册 afterCompletion 回调。
registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
}
}
}
protected void registerAfterCompletionWithExistingTransaction(
Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException {
// 状态置为未知
invokeAfterCompletion(synchronizations, TransactionSynchronization.STATUS_UNKNOWN);
}
protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}
3. 总结
总体来说 TransactionSynchronization 的逻辑还是比较简单的,如下:
- TransactionSynchronizationManager#registerSynchronization 将 TransactionSynchronization 注册到 TransactionSynchronizationManager#synchronizations 中。
- 当当前线程准备提交或回滚时会通过TransactionSynchronizationManager#getSynchronizations 获取到 TransactionSynchronizationManager#synchronizations 中的事务同步类。随后执行相应的方法。
三、TransactionalEventListener
在 Spring framework 4.2 之后还可以使用@TransactionalEventListener处理数据库事务提交成功后再执行操作,这种方式比 TransactionSynchronization 更加优雅。我们仍以上面的Demo为例加以改造。
如下: 同样的功能,事务内在 sys_role 表中插入一条 role_id = 10 和 20 两条记录。事务提交后执行DemoServiceImpl#onApplicationEvent 方法调用 addPermission,在 sys_rermission 表中插入一条记录。需要注意,这里不需要再实现 ApplicationListener 接口,否则会调用两次。
@Service
public class DemoServiceImpl implements DemoService, ApplicationContextAware {
@Autowired
private SysPermissionDao sysPermissionDao;
@Autowired
private SysRoleDao sysRoleDao;
private ApplicationContext applicationContext;
@Transactional(rollbackFor = Exception.class)
@Override
public String testTransactionalEventListener() {
// step1 : 在 sys_role 表中插入一条 role_id = 10 的记录
addRole(10);
// step2 : 在 sys_role 表中插入一条 role_id = 20 的记录
addRole(20);
this.applicationContext.publishEvent(new DemoApplicationEvent(10));
System.out.println("end");
return "";
}
/**
* 在 sys_role 表中插入一条记录
*/
@Override
public void addRole(int id) {
...
}
/**
* 在 sys_rermission 表中插入一条记录
*/
@Override
public void addPermission(int id) {
...
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static class DemoApplicationEvent extends ApplicationEvent {
@Getter
private Integer id;
public DemoApplicationEvent(Integer source) {
super(source);
this.id = source;
}
}
// phase 指定了执行阶段为 事务提交后。
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onApplicationEvent(DemoApplicationEvent demoApplicationEvent) {
// step3 : 在 sys_rermission 表中插入一条记录
addPermission(demoApplicationEvent.getId());
}
}
TransactionalEventListener 通过Spring监听器的方式实现了 TransactionSynchronization 的功能,下面我们来看一看具体原理。
1. TransactionalEventListener
首先来看一下 TransactionalEventListener 的定义,如下:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
// 执行执行阶段,默认事务提交后、除此之外还有 BEFORE_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
// 监听器的唯一id,可为空
String id() default "";
// 如果没有事务正在运行,是否应处理事件
boolean fallbackExecution() default false;
/**
* EventListener classes 属性的别名, 指定监听事件类型
* 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。
*/
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};
/**
* EventListener classes 属性的别名, 指定监听事件类型
* 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。
*/
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};
/**
* 用于使事件处理有条件的 Spring 表达式语言 (SpEL) 属性。
* 默认值为"" ,表示始终处理事件。
*/
String condition() default "";
}
2. 执行流程
下面我们来看看 TransactionalEventListener 注解是如何实现的。
2.1 EventListenerMethodProcessor
首先我们需要知道,一个被 @TransactionalEventListener 注解修饰的普通方法,如何具有监听器的效果?原因就在于 EventListenerMethodProcessor 类。
EventListenerMethodProcessor 的结构如下:
可以看到 EventListenerMethodProcessor 实现了三个关键接口 :
- SmartInitializingSingleton :afterSingletonsInstantiated 方法会在容器初始化所有非惰性Bean之后调用。EventListenerMethodProcessor 在 afterSingletonsInstantiated 方法中为被 @TransactionalEventListener 注解修饰的方法动态生成了 ApplicationListener 并添加到容器中。当执行的事件发送时,动态生成的 ApplicationListener 监听器则会触发,通过反射调用 注解方法。
- ApplicationContextAware :EventListenerMethodProcessor 通过 setApplicationContext 方法 获取到 ApplicationContext 实例。
- BeanFactoryPostProcessor : EventListenerMethodProcessor 通过 postProcessBeanFactory 方法获取到了Spring容器中的所有 EventListenerFactory 实例对象,保存到 EventListenerFactory#eventListenerFactories 中。
下面我们来看看 EventListenerMethodProcessor 的执行过程,如下:
2.1.1 EventListenerMethodProcessor 的注入
Spring 容器启动时会在通过 SpringApplication#createApplicationContext 创建应用上下文,其中调用链如下:
SpringApplication#createApplicationContext ->
AnnotationConfigServletWebServerApplicationContext构造函数 ->
AnnotatedBeanDefinitionReader构造函数 ->
AnnotationConfigUtils#registerAnnotationConfigProcessors
在 AnnotationConfigUtils#registerAnnotationConfigProcessors 中会向容器中注册 EventListenerMethodProcessor 的 BeanDefinition,如下图:
2.1.2 EventListenerMethodProcessor 的调用
Spring 容器启动时会在 AbstractApplicationContext#finishBeanFactoryInitialization 方法中完成非惰性加载 Bean的 的初始化。初始化结束后会调用SmartInitializingSingleton#afterSingletonsInstantiated 方法。EventListenerMethodProcessor#afterSingletonsInstantiated 的实现如下:
@Override
public void afterSingletonsInstantiated() {
ConfigurableListableBeanFactory beanFactory = this.beanFactory;
Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
// 获取容器中的所有 beanName
String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
Class<?> type = null;
try {
// 获取 bean的 类型
type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
}
catch (Throwable ex) {
... 日志打印
}
if (type != null) {
// 如果 bean 是 ScopedObject类型,进一步获取真实类型
if (ScopedObject.class.isAssignableFrom(type)) {
try {
Class<?> targetClass = AutoProxyUtils.determineTargetClass(
beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
if (targetClass != null) {
type = targetClass;
}
}
catch (Throwable ex) {
... 日志打印
}
}
try {
// 处理bean
processBean(beanName, type);
}
catch (Throwable ex) {
... 抛出异常
}
}
}
}
}
可以看到关键逻辑在 EventListenerMethodProcessor#processBean 中,其实现如下:
private void processBean(final String beanName, final Class<?> targetType) {
// nonAnnotatedClasses 缓存未命中当前类 && 给定的类是可携带指定注释的候选者 && 不是 org.springframework. 包下的类或被@Component 注解修饰
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
!isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
// 寻找被 @EventListener 注解修饰的方法
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
catch (Throwable ex) {
... 日志打印
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
// 缓存没有被 @EventListener 注解修饰的类
this.nonAnnotatedClasses.add(targetType);
... 日志打印
}
else {
// Non-empty set of methods
// 到这里则说明当前类一定有方法被 @EventListener 注解修饰
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
// 在 postProcessBeanFactory 方法中获取到的容器中的 EventListenerFactory 实现类
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
// 遍历 @EventListener 注解修饰的方法
for (Method method : annotatedMethods.keySet()) {
// 遍历所有的 EventListenerFactory
for (EventListenerFactory factory : factories) {
// 如果 EventListenerFactory 支持当前方法
if (factory.supportsMethod(method)) {
// 获取可调用的方法:context.getType(beanName) 获取到的可能是代理类,这里会找到真正要调用的诶类的方法
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
// 通过 EventListenerFactory#createApplicationListener 方法为找到的方法创建一个 ApplicationListener实现类
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
// ApplicationListenerMethodAdapter类型则调用 init 完成初始化
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
// applicationListener 添加到 Spring 容器中
context.addApplicationListener(applicationListener);
break;
}
}
}
... 日志打印
}
}
}
总结如下:
- EventListenerMethodProcessor 在 EventListenerMethodProcessor#postProcessBeanFactory 中获取到了所有 EventListenerFactory 类型的 bean。(在ConfigurationClassPostProcessor 中将所有bean 的BeanDefinition 扫描生成。而 ConfigurationClassPostProcessor 的优先级在EventListenerMethodProcessor 之前 )。
- 随后在EventListenerMethodProcessor#afterSingletonsInstantiated 方法中获取所有bean,筛选出被 @EventListener 注解修饰的类或方法的类。
- 随后找到 EventListenerFactory,通过 EventListenerFactory#supportsMethod 判断是可以处理当前类,如果可以则通过 EventListenerFactory#createApplicationListener 为当前类生成一个 ApplicationListener (ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter)类并添加到 容器中。
- 当有指定事件触发后触发容器中的 ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter 的 onApplicationEvent 方法。在这些方法中会通过反射调用被 @EventListener 修饰的方法。
2.2 EventListenerFactory
EventListenerFactory 接口定义如下
public interface EventListenerFactory {
// 是否支持处理当前指定方法
boolean supportsMethod(Method method);
// 为当前指定类的指定方法创建一个 ApplicationListener
ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method);
}
默认情况下 EventListenerFactory 有 DefaultEventListenerFactory 和 TransactionalEventListenerFactory 两个实现类。我们这里看的是 @TransactionalEventListener 注解,所以来看一下 TransactionalEventListenerFactory 的实现,如下:
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
...
@Override
public boolean supportsMethod(Method method) {
// 判断当前方式是否被 TransactionalEventListener 注解修饰
return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
}
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
// 创建一个 TransactionalApplicationListenerMethodAdapter 实现类
return new TransactionalApplicationListenerMethodAdapter(beanName, type, method);
}
}
这里我们可以了解到 TransactionalEventListenerFactory 会判断:如果方法被TransactionalEventListener 注解修饰,则会为其创建一个 TransactionalApplicationListenerMethodAdapter类作为 ApplicationListener。
2.3 TransactionalApplicationListenerMethodAdapter
TransactionalApplicationListenerMethodAdapter 结构如下:
这里我们简单看一下 TransactionalApplicationListenerMethodAdapter#onApplicationEvent 的实现
@Override
public void onApplicationEvent(ApplicationEvent event) {
// 如果当前存在事务. 则通过 TransactionSynchronizationManager 注册TransactionalApplicationListenerSynchronization
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
// 没有事务,判断是否执行事件逻辑
else if (this.annotation.fallbackExecution()) {
// ...
processEvent(event);
}
else {
... 日志打印
}
}
这里我们看到
- 如果当前线程存在活跃事务, 则通过 TransactionSynchronizationManager 注册一个 TransactionalApplicationListenerSynchronization 实例。当事务到达指定阶段后会触发TransactionalApplicationListenerSynchronization 中的阶段方法,而这些方法中则还会调用 TransactionalApplicationListenerMethodAdapter#processEvent 方法。
- 如果当前线程不存在活跃事务,则根据 TransactionalEventListener#fallbackExecution 判断,如果为 true,则执行 TransactionalApplicationListenerMethodAdapter#processEvent。其中 processEvent 的实现在其父类 ApplicationListenerMethodAdapter中。
下面我们来详细看一看
2.3.1 TransactionalApplicationListenerSynchronization
TransactionalApplicationListenerSynchronization 的实现如下,其中主要实现了 beforeCommit 和 afterCompletion 两个方法。
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
...
// 事务提交前判断
@Override
public void beforeCommit(boolean readOnly) {
// 如果指定阶段为事务提交前,则执行 processEventWithCallbacks
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}
// 事务提交后判断
@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
// 如果执行阶段为事务提交后 && 事务正常提交
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
// 如果执行阶段为事务回滚后 && 事务发生回滚
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
// 如果执行阶段为事务结束 && 事务发生回滚
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
private void processEventWithCallbacks() {
// 执行回调类的回调预处理方法
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
// 执行监听事件,这里的 listener 即为上层的 TransactionalApplicationListenerMethodAdapter 实例对象
this.listener.processEvent(this.event);
}
catch (RuntimeException | Error ex) {
// 执行回调类的回调后置处理方法
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
throw ex;
}
// 执行回调类的回调后置处理方法
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}
}
2.3.2 ApplicationListenerMethodAdapter#processEvent
ApplicationListenerMethodAdapter#processEvent 实现如下,逻辑比较简单,这里不再赘述。
public void processEvent(ApplicationEvent event) {
// 解析 method 的参数。method 指的是被 TransactionalEventListener 注解修饰的方法
Object[] args = resolveArguments(event);
// 判断是否可以处理。根据注解 TransactionalEventListener#condition 属性判断
if (shouldHandle(event, args)) {
// 反射调用 指定的方法,以 args 为入参
Object result = doInvoke(args);
if (result != null) {
// 如果监听方法有返回值,则对返回值处理。会将返回值作为 event 再次发送
handleResult(result);
}
else {
logger.trace("No result object given - no result to handle");
}
}
}
以上:如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正