文章目录
- 一、Spring事件
- 二、实现Spring事件
- 1、自定义事件
- 2、事件监听器
- 2.1 实现ApplicationListener接口
- 2.2 @EventListener
- 2.3 @TransactionalEventListener
- 3、事件发布
- 4、异步使用
- 三、EventBus
- 1、事件模式
- 2、EventBus三要素
- 3、同步事件
- 3.1 定义事件类
- 3.2 定义事件监听
- 3.3 测试
- 4、异步事件
- 4.1 定义事件
- 4.2 定义事件监听
- 4.3 测试
- 四、EventBus和Spring Event区别
一、Spring事件
Spring Event(Application Event)其实就是一个观察者设计模式,一个 Bean 处理完成任务后希望通知其它 Bean 或者说一个 Bean 想观察监听另一个Bean 的行为。
Spring的事件(Application Event)为Bean和Bean之间的消息同步提供了支持。当一个Bean处理完成一个任务之后,希望另外一个Bean知道并能做相应的处理,这时我们就需要让另外一个Bean监听当前Bean所发生的事件
Spring的事件需要遵循如下流程:
- 自定义事件,继承ApplicationEvent
- 定义事件监听器
- 使用容器发布事件
二、实现Spring事件
1、自定义事件
自定义一个事件类,继承ApplicationEvent。该事件可以被ApplicationContext通过publishEvent方法进行发送
public class DemoEvent extends ApplicationEvent {
private String msg;
public DemoEvent(Object source, String msg) {
super(source);
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
2、事件监听器
监听器有三种实现方式
- 实现
ApplicationListener
接口 - 使用
@EventListener
注解 - 使用
@TransactionalEventListener
注解
2.1 实现ApplicationListener接口
新建一个类实现 ApplicationListener
接口,并且重写 onApplicationEvent
方法,然后交给Spring管理
@Component
public class DemoListener implements ApplicationListener<DemoEvent> {
//实现ApplicationListener接口,并指定监听的事件类型
@Override
public void onApplicationEvent(DemoEvent event) { //使用onApplicationEvent方法对消息进行接受处理
String msg = event.getMsg();
System.out.println("DemoListener获取到了监听消息:" + msg);
}
}
2.2 @EventListener
将处理事件的方法使用 @EventListener
注解标记,此时 Spring将创建一个ApplicationListener
的bean对象,使用给定的方法处理事件,参数可以指定处理的事件类型,以及处理条件。
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
@AliasFor("classes") Class<?>[] value() default {};
@AliasFor("value") Class<?>[] classes() default {};
String condition() default "";
}
@Component
public class DemoListener {
@EventListener(DemoEvent.class)
public void sendMsg(DemoEvent event) {
String msg = event.getMsg();
System.out.println("DemoListener获取到了监听消息:" + msg);
}
}
2.3 @TransactionalEventListener
@EventListener
和 @TransactionalEventListener
都是 Spring提供的注解,用于处理事件。主要区别在于处理事件的时间和事务的关联性。
@EventListener
:可以应用于任何方法,使得该方法成为一个事件监听器。当一个事件被发布时,所有标记为@EventListener
的方法都会被调用,无论当前是否存在一个活动的事务。 使用@EventListener
注解的方法可能在事务提交之前或之后被调用。@TransactionalEventListener
:该注解允许更精细地控制事件监听器在事务处理过程中的执行时机。@TransactionalEventListener
默认在当前事务提交后才处理事件(TransactionPhase.AFTER_COMMIT
),这可以确保事件处理器只在事务成功提交后才被调用。也可以通过 phase 属性来改变事件处理的时机,例如在事务开始前、事务提交前、事务提交后或者事务回滚
@Component
public class DemoListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, value = DemoEvent.class)
public void messageListener(DemoEvent event) {
String msg = event.getMsg();
System.out.println("DemoListener获取到了监听消息:" + msg);
}
}
3、事件发布
通过spring上下文对象ApplicationContext
的publishEvent
方法即可发布事件
@Component
public class DemoPublisher {
@Autowired
private ApplicationContext applicationContext; //注入ApplicationContext用来发布事件
public void publish(String msg) {
applicationContext.publishEvent(new DemoEvent(this, msg)); // 使用ApplicationContext对象的publishEvent发布事件
}
}
4、异步使用
如果要开启异步,需要在启动类增加 @EnableAsync
注解,Listener 类需要开启异步的方法增加 @Async
注解
@Async
的原理是通过 Spring AOP 动态代理 的方式来实现的。Spring 容器启动初始化bean时,判断类中是否使用了@Async
注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理,在线程调用@Async
注解标注的方法时,会调用代理,执行切入点处理器invoke
方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行。
三、EventBus
- EventBus是一个轻量级的发布/订阅模式的应用模式,相比于各种 MQ 中间件更加简洁、轻量,它可以在单体非分布式的小型应用模块内部使用。
- 我们也可以把它和 MQ 中间件结合起来使用,使用 EventBus 作为当前应用程序接收 MQ 消息的统一入口,然后应用内部基于 EventBus 进行分发订阅,以达到高内聚低耦合的目的(当应用内部需要消费多种不同 MQ 中间件消息时,不需要在当前应用的好多不同代码位置都编写 MQ 消费代码)
1、事件模式
EventBus
默认为同步调用,同一个 EventBus
中注册的多个订阅处理,再事件下发后是被总线串行逐个调用的,如果其中一个方法占用事件较长,则同一个 EventBus
中的其他事件处于等待状态,且发送消息事件的代码调用处也是同步调用等待的状态。同一个 EventBus
对象,不仅仅在同一个 post 调用中串行执行,在多次并发 post 调用时,多个 post 调用之间也是串行等待执行的关系
- 同步事件模式:同步事件模式下,事件的触发和事件的处理在同一个线程中同步处理
- 异步事件模式:异步事件模式下,事件的触发和事件的处理在不同的线程中,事件的处理在一个线程池中
2、EventBus三要素
Event
事件,它可以是任意类型Subscriber
事件订阅者,需要加上注解@Subscribe()
,并且指定线程模型,默认是POSTING
Publisher
事件的发布者。我们可以在任意线程里发布事件,调用post(Object)
方法即可
3、同步事件
3.1 定义事件类
public class MessageEvent {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "MessageEvent{" + "message='" + message + '\'' + '}';
}
}
3.2 定义事件监听
import com.google.common.eventbus.Subscribe;
public class MessageListener {
@Subscribe
public void listen(MessageEvent event) {
System.out.println(Thread.currentThread().getName() + " : " + event);
}
}
3.3 测试
import com.google.common.eventbus.EventBus;
public class SyncEventTest {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new MessageListener());
MessageEvent messageEvent = new MessageEvent();
messageEvent.setMessage("你好!");
for (int i = 0; i < 100; i++) {
eventBus.post(messageEvent);
}
}
}
测试结果
从结果可以看出,事件的触发和事件的处理都是在同一个线程中。
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
main : MessageEvent{message='你好!'}
......
4、异步事件
4.1 定义事件
public class MessageEvent {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "MessageEvent{" + "message='" + message + '\'' + '}';
}
}
4.2 定义事件监听
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
public class MessageListener {
/**
* 注解@AllowConcurrentEvents是用来标识当前订阅者是线程安全的
* Guava会对listener对象,遍历其带有@Subscribe注解的所有方法,然后对针对每一个listener对象和method方法,标识唯一一个订阅者
* 找到唯一识别的观察者后,会对该观察者进行包装wrap,包装成一个EventSubscriber对象,
* 对于没有@AllowConcurrentEvents注解的方法,会被包装成SynchronizedEventSubscriber,即同步订阅者对象。
* 同步订阅者对象在处理事件时是使用了synchronized,强同步锁!
* 总结:
* 如果当前观察者(method)是线程安全的thread-safe,建议增加注解@AllowConcurrentEvents,以减少同步开销。
* 对于使用的是非异步(AsyncEventBus),也建议增加@AllowConcurrentEvents,因为不需要进行同步。
*/
@AllowConcurrentEvents
@Subscribe
public void listen(MessageEvent event) {
System.out.println(Thread.currentThread().getName() + " : " + event);
}
}
4.3 测试
import com.google.common.eventbus.AsyncEventBus;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncEventTest {
private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();
private static final int ALIVE_TIME = 60;
private static final int CACHE_SIZE = 1000;
public static void main(String[] args) {
AsyncEventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(CORE_SIZE, CACHE_SIZE << 1, ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(CACHE_SIZE)));
eventBus.register(new MessageListener());
MessageEvent messageEvent = new MessageEvent();
messageEvent.setMessage("你好!");
for (int i = 0; i < 100; i++) {
eventBus.post(messageEvent);
}
}
}
测试结果
从结果可以看出,异步事件模式下,事件的触发和处理是在不同的线程中的,事件的处理是在单独的线程池中进行处理
pool-1-thread-2 : MessageEvent{message='你好!'}
pool-1-thread-3 : MessageEvent{message='你好!'}
pool-1-thread-4 : MessageEvent{message='你好!'}
pool-1-thread-5 : MessageEvent{message='你好!'}
pool-1-thread-6 : MessageEvent{message='你好!'}
pool-1-thread-7 : MessageEvent{message='你好!'}
pool-1-thread-8 : MessageEvent{message='你好!'}
pool-1-thread-9 : MessageEvent{message='你好!'}
pool-1-thread-10 : MessageEvent{message='你好!'}
.......
四、EventBus和Spring Event区别
项目 | 事件 | 发布者 | 发布方法 | 是否异步 | 监听者 | 注册方式 |
---|---|---|---|---|---|---|
EventBus | 任意对象 | EventBus | EventBus#post | 是 | 注解Subscribe的方法 | 手动注册EventBus#register |
Spring Event | 任意对象 | ApplicationEventPublisher | ApplicationEventPublisher#publishEvent | 支持同步异步 | 注解EventListener的方法 | 系统注册 |