一、前言
简介: 在项目实际开发过程中,我们有很多这样的业务场景:一个事务中处理完一个业务逻辑后需要跟着处理另外一个业务逻辑,伪码大致如下:
@Service
public class ProductServiceImpl {
...
public void saveProduct(Product product) {
productMapper.saveOrder(product);
notifyService.notify(product);
}
...
}
很简单并且很常见的一段业务逻辑:首先将产品先保存数据库,然后发送通知。
某一天你们可能需要把新增的产品存到Es中,这时候也需要代码可能变成这样:
@Service
public class ProductServiceImpl {
...
public void saveProduct(Product product) {
productMapper.saveProduct(product);
esService.saveProduct(product)
notifyService.notify(product);
}
...
}:
随着业务需求的变化,代码也需要跟着一遍遍的修改。而且还会存在另外一个问题,如果通知系统挂了,那就不能再新增产品了。
对于上面这种情况非常适合引入消息中间件(消息队列)来对业务进行解耦,但并非所有的业务系统都会引入消息中间件(引入会第三方架构组件会带来很大的运维成本)。
Spring提供了事件驱动机制可以帮助我们实现这一需求。
Spring事件驱动
spring事件驱动由3个部分组成
ApplicationEvent:表示事件本身,自定义事件需要继承该类,用来定义事件
ApplicationEventPublisher:事件发送器,主要用来发布事件
ApplicationListener:事件监听器接口,监听类实现ApplicationListener 里onApplicationEvent方法即可,也可以在方法上增加@EventListener以实现事件监听。
实现Spring事件驱动一般只需要三步:
自定义需要发布的事件类,需要继承ApplicationEvent类
使用ApplicationEventPublisher来发布自定义事件
使用@EventListener来监听事件
这里需要特别注意一点,默认情况下事件是同步的。即事件被publish后会等待Listener的处理。如果发布事件处的业务存在事务,监听器处理也会在相同的事务中。如果需要异步处理事件,可以onApplicationEvent方法上加@Aync支持异步或在有@EventListener的注解方法上加上@Aync。
源码实战
• 创建事件
public class ProductEvent extends ApplicationEvent {
public ProductEvent(Product product) {
super(product);
}
}
• 发布事件
@Service
public class ProductServiceImpl implements IproductService {
...
@Autowired
private ApplicationEventPublisher publisher;
@Override
@Transactional(rollbackFor = Exception.class)
public void saveProduct(Product product) {
productMapper.saveProduct(product);
//事件发布
publisher.publishEvent(product);
}
...
}
• 事件监听
@Slf4j
@AllArgsConstructor
public class ProductListener {
private final NotifyService notifyServcie;
@Async
@Order
@EventListener(ProductEvent.class)
public void notify(ProductEvent event) {
Product product = (Product) event.getSource();
notifyServcie.notify(product, "product");
}
}
• 在SpringBoot启动类上增加@EnableAsync 注解
@Slf4j
@EnableSwagger2
@SpringBootApplication
@EnableAsync
public class ApplicationBootstrap {
...
}
• 使用了Async后会使用默认的线程池SimpleAsyncTaskExecutor,一般我们会在项目中自定义一个线程池。
@Configuration
public class ExecutorConfig {
/** 核心线程数 */
private int corePoolSize = 10;
/** 最大线程数 */
private int maxPoolSize = 50;
/** 队列大小 */
private int queueCapacity = 10;
/** 线程最大空闲时间 */
private int keepAliveSeconds = 150;
@Bean("customExecutor")
public Executor myExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("customExecutor-");
executor.setKeepAliveSeconds(keepAliveSeconds);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
SpringBoot 的事件是基于Spring的事件,所以我们先介绍Spring事件原理。
明白了使用,我们再来看看原理:
1.Spring 事件原理
核心类:
ApplicationEventMulticaster,事件派发器。
ApplicationListener,事件监听类。
ApplicationEvent,事件类。
事件派发器派发事件,事件监听类监听派发的事件。
1.ApplicationEventMulticaster事件派发器的注册时机,何时被注册到Spring容器内部的?
我们找到Spring容器创建Bean的流程中,AbstractApplicationContext#refresh这个方法里:
// Initialize message source for this context.
initMessageSource();
// 在这一步骤中,初始化了事件派发器
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
initApplicationEventMulticaster方法:
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
...
// 这一步骤创建了派发器
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
// 注册到单例池里面
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
...
}
2.当我们调用了publishEvent(ApplicationEvent event);监听类怎么就会执行了呢?
我们点进该方法里面AbstractApplicationContext#publishEvent():
// 会调用ApplicationEventMulticaster的multicastEvent()方法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
ApplicationEventMulticaster#multicastEvent():
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 从派发器里面获取线程池对象
Executor executor = getTaskExecutor();
// getApplicationListeners(event, type) 或获取该事件对象类型的所有监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
// 线程池异步调用
executor.execute(() -> invokeListener(listener, event));
}
else {
// 直接调用
invokeListener(listener, event);
}
}
}
invokeListener方法:
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
// 这个直接调用 listener.onApplicationEvent(event); 方法
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
总结:当我们调用 applicationContext.publishEvent(xxx);方法时,Spring内部会拿到关于此事件对象的所有监听器类对象,直接调用执行。
SpringBoot 启动事件流程
SpringBoot在启动时,会默认发布一些事件,我们可以自定义监听类实现监听该事件,做一些初始化等等操作,一些框架整合就是通过事件监听的方式进行内部的初始化。
值得注意的是SpringBoot对于有些事件的监听是只能通过读取配置里面配置的监听类才能生效,直接注解的方式是无法监听到的!
为什么会这样呢?
因为我们直接加注解的话,是必须要经过Spring容器的洗礼,有些事件的发布是在容器refresh之前做的,所以注解的方式是没办法生效的!
SpringBoot提供了配置文件的方式去配置监听器,那么配置文件中的实现类是何时获取到的呢?
在构造器里面获取的,SPI机制加载实现类。
我们要监听一些容器刷新之前的内部事件只能在spring.factories中指定。
启动时会发布的事件顺序:
- ApplicationStartingEvent: 准备启动SpringBoot环境之前。
- ApplicationEnvironmentPreparedEvent: 环境变量初始化完成,应用启动之前。
- ApplicationContextInitializedEvent:应用初始化器执行后发布
- ApplicationPreparedEvent:应用准备就绪,容器初始化之前发布。
- ApplicationStartedEvent:容器初始化完成之后发布。
- ApplicationReadyEvent:容器已经初始化完成并且已经可以接收Web请求之后发布。
- ApplicationFailedEvent:容器启动失败。
除此之外,在ApplicationPreparedEvent之后和ApplicationStartedEvent之前还将发布以下事件: - ContextRefreshedEvent:容器刷新完成发布。 WebServerInitializedEvent
:在WebServer准备就绪后发送。ServletWebServerInitializedEvent和ReactiveWebServerInitializedEvent分别是servlet和reactive变体。
以上的事件对象都继承 SpringApplicationEvent 类,SpringApplicationEvent 又继承 ApplicationEvent。
public abstract class SpringApplicationEvent extends ApplicationEvent {
private final String[] args;
public SpringApplicationEvent(SpringApplication application, String[] args) {
super(application);
this.args = args;
}
public SpringApplication getSpringApplication() {
return (SpringApplication) getSource();
}
public final String[] getArgs() {
return this.args;
}
}
以上事件的发布类是:EventPublishingRunListener。
EventPublishingRunListener 类是SpringBoot类似通过SPI机制加载进来的:
public ConfigurableApplicationContext run(String... args) {
StopWatch stopWatch = new StopWatch();
...
// 这一步加载进来的
SpringApplicationRunListeners listeners = getRunListeners(args);
listeners.starting();
}
EventPublishingRunListener 实现了 SpringApplicationRunListener接口,该接口里面规范了SpringBoot启动时机的事件发布流程。
public class EventPublishingRunListener implements SpringApplicationRunListener, Ordered {
private final SpringApplication application;
// 事件派发器对象
private final SimpleApplicationEventMulticaster initialMulticaster;
....
}
参考文献:
1、https://blog.csdn.net/qq_33549942/article/details/122992865
2、https://developer.aliyun.com/article/829271