EventListener
JDK
JDK1.1开始就提供EventListener,一个标记接口,源码如下:
/**
* A tagging interface that all event listener interfaces must extend.
*/
public interface EventListener {
}
JDK提供的java.util.EventObject
:
public class EventObject implements Serializable {
protected transient Object source;
}
用户需要定义继承自EventObject的事件对象,然后定义具体的Listener接口继承EventListener。事件源通过添加监听器(addXXXListener方法)手动管理事件的订阅和发布,需要用户手动调用监听器的回调方法,事件管理逻辑往往由开发者编码完成。
典型场景:主要用于早期GUI应用开发(如AWT和Swing),用于组件间的事件通信。
优点:轻量级,易于理解;适合简单场景或单一模块。
缺点:
- 需显式注册和管理事件监听器,手动处理事件流转,容易导致代码复杂度增加;
- 缺乏高级特性,如事件异步处理、条件过滤等。
Spring
@EventListener
当在实现某些特定业务逻辑时,通常可通过发送事件的方式实现解耦,这也是观察者模式的一种体现。从Spring 4.2开始提供注解@EventListner,不再需要单独编写监听器类,只需在Spring Bean方法上标记@EventListener即可。
源码如下:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
@AliasFor("classes")
Class<?>[] value() default {};
@AliasFor("value")
Class<?>[] classes() default {};
String condition() default "";
}
解读:
- value和classes:作用相同,表示监听的一个或一组事件,用于支持方法中同一个父类的事件;
- condition:支持Spring EL表达式,用来做Event中的变量或者方法判断。
Event的整个生命周期,从Publisher发出,经过applicationContext容器通知到EventListener,都是发生在单个Spring容器中。
事件可直接使用ApplicationEvent,或继承它。源码如下:
public abstract class ApplicationEvent extends java.util.EventObject {
// 事件发生时间
private final long timestamp;
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
public ApplicationEvent(Object source, Clock clock) {
super(source);
this.timestamp = clock.millis();
}
}
示例
多个监听器,监听Account创建,完成不同的业务逻辑。
创建Event事件监听
/**
* 账号监听,处理账号创建成功的后续逻辑
*/
@Component
public class AccountListener {
@EventListener
@Async
public void processAccountCreatedEvent1(AccountCreatedEvent event) {
// 1. 发送邮件、短信
}
@EventListener
@Order(100)
public void processAccountCreatedEvent2(AccountCreatedEvent event) {
// 2. 添加积分等,@Order(100)用来设定执行顺序
}
}
使用ApplicationEventPublisher发送事件:
@Autowired
private ApplicationEventPublisher publisher;
public boolean save(Account account) {
// 数据库保存成功
if (true) {
publisher.publishEvent(new AccountCreatedEvent(account));
}
return false;
}
一个发布者可对应多个监听者:
@EventListener(value = {AccountCreatedEvent.class, AccountUpdatedEvent.class}, condition = "#event.account.age > 10")
public void processAccountCreatedEvent2(AccountEvent event) {
// 业务逻辑
}
监听执行顺序:可使用@Order(100)
来标记事件的执行顺序,异步情况下只保证按顺序将监听器丢入进线程池,具体执行顺序得看线程。
监听异步执行:使用@Async标记即可,前提条件:使用@EnableAsync开启Spring异步。
优点:
- 更加简洁,方法注解即声明监听;
- 支持异步处理,条件过滤,功能更强大;
- 与其他Spring特性(如AOP和事务)集成更好。
缺点:
- 不如ApplicationListener明确(可能难以追踪监听器逻辑);
- 依赖注解,可能不适合部分代码风格偏向接口设计的团队。
ApplicationListener
Spring提供两种事件监听机制:
- 基于@EventListener注解:
- 基于ApplicationListener泛型接口:早期提供,实现onApplicationEvent方法
通过上下文来发布一个事件,监听器收到订阅的事件作相应的处理,Spring提供的方便高效的事件驱动模型。
ApplicationListener源码:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends java.util.EventListener {
void onApplicationEvent(E event);
default boolean supportsAsyncExecution() {
return true;
}
static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
return event -> consumer.accept(event.getPayload());
}
}
简单使用:
@Component
public class MyEventListener implements ApplicationListener<MyEvent> {
@Override
public void onApplicationEvent(MyEvent event) {
System.out.println("Received event: " + event.getMessage());
}
}
优点:
- 明确,适合固定、简单的监听逻辑;
- 提供对所有事件的统一入口,可基于事件类型实现复杂的分发逻辑。
缺点:
- 相对冗长,代码侵入性高(必须实现接口);
- 不支持直接使用条件过滤和异步处理;
- 扩展性和灵活性低。
@EventListener与ApplicationListener
特性 | ApplicationListener | @EventListener |
---|---|---|
定义方式 | 实现ApplicationListener接口 | 使用@EventListener注解 |
写法复杂度 | 必须实现接口 | 注解方式更简洁 |
类型绑定 | 泛型绑定到特定事件类型 | 自动通过方法参数类型推断 |
条件过滤 | 不支持 | 支持(condition属性) |
异步处理 | 需手动实现多线程 | 支持@Async开箱即用 |
事务集成 | 支持,但需手动配置 | 与Spring的事务注解自然集成 |
适用场景 | 简单、固定事件监听逻辑 | 更现代化、复杂事件管理逻辑 |
引入版本 | 早期版本已有 | Spring 4.2+ |
JDK与Spring
特性 | JDK EventListener | Spring @EventListener |
---|---|---|
触发方式 | 显式调用监听器 | 基于事件发布自动触发 |
实现方式 | 通过接口 | 通过注解声明 |
事件管理 | 手动注册、调用 | 自动管理(基于Spring容器) |
适用场景 | 简单GUI或模块内部通信 | 企业级应用、跨模块解耦 |
异步支持 | 不支持 | 支持 |
条件过滤 | 不支持 | 支持 |
事务集成 | 不支持 | 支持 |
复杂性 | 简单,手动管理 | 较高,但更自动化和灵活 |
EventBus
使用异步的方式来发送事件,或触发另外一个动作,即Publish/Subscribe Event。
EventBus有不同的实现框架,一般都指Guava EventBus。
框架
Guava EventBus包路径下:
类之间的关系如:
EventBus组成部分:
- EventBus、AsyncEventBus:事件发送器
- Event:事件承载单元
- SubscriberRegistry:订阅者注册器,将订阅者注册到Event上,即将有注解Subscribe的方法和Event绑定起来
- Dispatcher:事件分发器,将事件的订阅者调用来执行
- Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅
原理
EventBus是基于注册监听的方式来运行的,首先需将EventBus实例化,然后才会有事件及监听者:
// 同步
EventBus eventBus = new EventBus();
注册监听者,底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个Event可以对应多个Subscribe的方法):
eventBus.register(eventListener);
在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常:
try {
eventBus.post(new LoginEvent("user", "pass"));
} catch (Exception e) {
// log
}
内部的订阅通知模型,无需使用事件+事件listener模型,只有一个事件类。
示例
一个简单的实例,值得一提的是,注册和发布事件,与消费事件不在一个类里:
// 事件类中方法以@Subscribe注解
class EventBusChangeRecorder {
@Subscribe
public void recordCustomerChange(ChangeEvent e) {
// 业务逻辑
System.out.println("事件触发");
}
}
// 创建事件总线
EventBus eventBus = new EventBus();
// 注册事件
eventBus.register(new EventBusChangeRecorder());
ChangeEvent event = new ChangeEvent(new EventBusChangeRecorder());
// 发布事件
eventBus.post(event);
// 需要异步执行可使用EventBus的子类AsyncEventBus
另外,EventBus也可作为Spring Bean使用,可被注入:
@Service
public class TestService implements InitializingBean {
@Resource
private MyListener myListener;
@Resource
private EventBus eventBus;
public void postEvent() {
eventBus.post(new LoginEvent("johnny", "success"));
}
@Override
public void afterPropertiesSet() throws Exception {
eventBus.register(myListener);
}
}
源码
EventBus的register方法:
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
事件发送:执行指定事件类型的订阅者(包含method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法,如果事件没有监听者,就当作死亡事件来对待。EventBus的post方法:
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
EventBus的dispatcher为PerThreadQueuedDispatcher,其dispatch方法如下:
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
dispatchEvent方法:
// Dispatches event to this subscriber using the proper executor.
final void dispatchEvent(final Object event) {
executor.execute(
(Runnable) () -> {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
});
}
execute方法由Executor来执行。EventBus的executor为MoreExecutors.directExecutor()
:
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
}
其execute方法直接执行线程的run方法,即同步调用run方法。另外,invokeSubscriberMethod方法如下:
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException | IllegalAccessException | InvocationTargetException e) {
// 省略catch代码
}
}
因此,整个执行过程如下:
整个过程都是同步方式执行,EventBus是同步的。
AsyncEventBus
AsyncEventBus是异步EventBus,其dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池,如
@Configuration
public class ConfigBean {
@Bean
public EventBus executorService() {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, workQueue);
return new AsyncEventBus(executor);
}
}
运行流程如下:
AllowConcurrentEvents
Guava提供的注解:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ElementTypesAreNonnullByDefault
public @interface AllowConcurrentEvents {
}
注解判断逻辑位于Subscriber.isDeclaredThreadSafe
方法内:
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
此方法被create()
方法调用:
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
解读:如果订阅者方法上有注解@AllowConcurrentEvents,则返回Subscriber(异步),否则,返回SynchronizedSubscriber(同步)。即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中是串行执行,会影响性能。
SynchronizedSubscriber是同步的,从其类名可知,从其实现的invokeSubscriberMethod方法里也可看出:
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
拓展
EventListener和EventBus
相同点:都是基于事件驱动模式,都可用于实现解耦的组件通信。
不同点:
- 功能:
- EventListener:订阅事件并执行操作的组件;粒度更细,聚焦于局部业务逻辑;一般是自定义类或方法,绑定到特定事件类型;
- EventBus:管理事件的发布、订阅、流转;侧重于全局管理事件,通常是单例模式或共享对象。
- 实现方式:
- EventListener:Spring的@EventListener是一种更广义的事件监听机制,可配合ApplicationEvent或自定义事件;
- EventBus:Guava EventBus通过注解定义事件处理器,自动绑定到特定事件类型。
联系:EventListener通常依赖EventBus进行注册,接收由EventBus发布的事件。
EventBus对比MQ
特性 | EventBus | MQ |
---|---|---|
使用范围 | 单JVM | 跨进程、分布式 |
通信模式 | 内存中发布-订阅 | 发布-订阅、点对点 |
可靠性 | 无内置持久化或重试机制 | 支持持久化、重试、消息确认 |
延迟 | 低延迟 | 相对稍高(取决于网络和队列机制) |
复杂性 | 简单,适合轻量场景 | 较高,需要额外运维 |
Guava EventBus现状
- 局限性:
基于内存的轻量级工具,仅适用于单JVM内的事件管理;在分布式跨服务场景下,无法满足可靠性、持久性和扩展性要求。 - 依然适用的场景:
- 轻量化需求:在小型项目中,快速实现解耦的优秀选择;
- 单JVM内部事件管理:用于模块间事件流转(如前端事件或服务内异步操作);
- 开发测试:在测试环境中模拟事件驱动的逻辑。
Netflix EventBus
除了Guava EventBus外,Netflix也提供一套框架:
<dependency>
<groupId>com.netflix.netflix-commons</groupId>
<artifactId>netflix-eventbus</artifactId>
<version>0.3.0</version>
</dependency>
参考
- EventBus原理深度解析