使用背景
在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。
例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码,但是在用户保存时出现提交事务异常,数据库进行回滚,用户实际没有注册成功,但是用户却收到了激活码,此时,正确的是应该在用户注册保存事务提交完成之后,然后发送激活码。
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
使用注解@TransactionalEventListener
demo展示
事务监听器
@Component
public class TransactionListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handler(TransactionEvent transactionEvent) {
System.out.println(transactionEvent.getSource());
}
}
业务代码
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void saveUser() {
User user = new User();
userMapper.insert(user);
eventPublisher.publishEvent(newTransactionEvent("事务提交后发布事件1"));
}
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
源码解析
EventListenerMethodProcessor
EventListenerMethodProcessor
用来解析带有带有@EventListener
注解的方法。遍历类上的方法,判断工厂是否支持,用对应的工厂生成监听器。
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
!isSpringContainerClass(targetType)) {
//...
if (CollectionUtils.isEmpty(annotatedMethods)) {
//...
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
beanName + "': " + annotatedMethods);
}
}
}
}
TransactionalEventListenerFactory
仅支持TransactionalEventListener
注解,生成ApplicationListenerMethodTransactionalAdapter
的对象。
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
private int order = 50;
public TransactionalEventListenerFactory() {
}
public void setOrder(int order) {
this.order = order;
}
public int getOrder() {
return this.order;
}
public boolean supportsMethod(Method method) {
return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
}
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
在AbstractTransactionManagementConfiguration
会引入TransactionalEventListenerFactory
。
@Bean(
name = {"org.springframework.transaction.config.internalTransactionalEventListenerFactory"}
)
@Role(2)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
return new TransactionalEventListenerFactory();
}
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
ApplicationListenerMethodTransactionalAdapter
发布事件,主要是创建了TransactionSynchronization
,注册到了TransactionSynchronizationManager
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = this.createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && this.logger.isWarnEnabled()) {
this.logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
this.processEvent(event);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("No transaction is active - skipping " + event);
}
}
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
事务提交
TransactionSynchronizationUtils#invokeAfterCompletion
,事务提交会遍历TransactionSynchronization
执行afterCompletion
方法
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations, int completionStatus) {
if (synchronizations != null) {
Iterator var2 = synchronizations.iterator();
while(var2.hasNext()) {
TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();
try {
synchronization.afterCompletion(completionStatus);
} catch (Throwable var5) {
logger.error("TransactionSynchronization.afterCompletion threw exception", var5);
}
}
}
ApplicationListenerMethodTransactionalAdapter.TransactionSynchronizationEventAdapter#afterCompletion
,调用事件监听器的processEvent
方法,会反射调用被@TransactionalEventListener
修饰的方法。
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == 0) {
this.processEvent();
} else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == 1) {
this.processEvent();
} else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
this.processEvent();
}
}
protected void processEvent() {
this.listener.processEvent(this.event);
}
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
使用TransactionSynchronizationManager
和 TransactionSynchronizationAdapter
demo展示
@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
public User saveUser(User user) {
// 保存用户
userDao.save(user);
final int userId = user.getId();
// 兼容无论是否有事务
if(TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
jmsProducer.sendEmail(userId);
}
});
} else {
jmsProducer.sendEmail(userId);
}
}
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作
标题复制10行,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作【Spring事务】Spring事务事件控制,解决业务异步操作,并且每行大于10个字符【Spring事务】Spring事务事件控制,解决业务异步操作