介绍
EventBus
是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus
是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。
Java案例
引入依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
定义消息类
@Data
@AllArgsConstructor
public class TestEvent {
String message;
}
定义接收者
@Slf4j
public class DevmEventListener {
@Subscribe
public void handleTestEvent(TestEvent testEvent) {
log.info("信息处理={}", testEvent);
}
@Subscribe
public void greet(String greetInfo) {
log.info("hello world {}", greetInfo);
}
}
发送消息
public class SimpleMain {
public static void main(String[] args) {
EventBus eventBus = new EventBus("test");
DevmEventListener listener = new DevmEventListener();
eventBus.register(listener);
eventBus.post(new TestEvent("200"));
eventBus.post("charles");
}
}
SpringBoot示例
定义注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
}
定义事件
@Data
@Builder
public class ViewEvent {
private String shortUrl;
}
事件监听者
@Component
@Slf4j
@EventBusListener
public class ViewEventListener {
@Subscribe
public void viewed(ViewEvent event) {
String shortUrl = event.getShortUrl();
log.info("url被访问" + shortUrl);
}
}
定义EventBusCenter
,获取所有的带有EventBusListener
注册到AsyncEventBus
和EventBus
,并且提供发布事件的接口。
@Component
public class EventBusCenter implements ApplicationContextAware {
private ApplicationContext applicationContext;
// 管理同步事件
private EventBus syncEventBus = new EventBus();
// 管理异步事件
private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());
public void postSync(Object event) {
syncEventBus.post(event);
}
public void postAsync(Object event) {
asyncEventBus.post(event);
}
@PostConstruct
public void init() {
// 获取所有带有 @EventBusListener 的 bean,将他们注册为监听者
Map<String, Object> objectMap = applicationContext.getBeansWithAnnotation(EventBusListener.class);
List<Object> listeners = objectMap.entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList());
for (Object listener : listeners) {
asyncEventBus.register(listener);
syncEventBus.register(listener);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
发送消息
@RestController
@Slf4j
public class TestController {
@Autowired
private EventBusCenter eventBusCenter;
@GetMapping("/t1")
public void t1() {
eventBusCenter.postAsync(ViewEvent.builder().shortUrl("hh").build());
}
}
框架示例
- 引入依赖
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>eventbus-aop-starter</artifactId>
<version>2.0.15</version>
</dependency>
- 启动类添加注解
@EnableEventBus
- 添加监听者
@EventBus(async = false)
@Service
@Slf4j
public class MySubscriber1 {
@Subscribe
public void subscribe(String event) {
log.info("子线程接收异步事件 - {},String类型", event);
}
}
- 发布事件
@RestController
@Slf4j
public class TestController {
@Autowired(required = false)
private EventControllerFactory eventControllerFactory;
@GetMapping("/publish")
public void publish() {
if (eventControllerFactory != null) {
log.info("发送事件...");
// 异步模式下(默认),子线程中收到派发的事件
eventControllerFactory.getAsyncController().post("ASync Event String Format");
// 同步模式下,主线程中收到派发的事件
// 事件派发接口中eventControllerFactory.getSyncController(identifier)必须和@EnableEventBus参数保持一致,否则会收不到事件
eventControllerFactory.getSyncController().post("Sync Event String Format");
}
}
}
原理解析
在eventbus-aop-starter
包中有spring.factories
com.nepxion.eventbus.annotation.EnableEventBus=\
com.nepxion.eventbus.configuration.EventConfiguration
EventConfiguration
注入了EventControllerFactory
和EventBeanPostProcessor
。
@Configuration
public class EventConfiguration {
//...
@Bean
public EventControllerFactory eventControllerFactory() {
return new EventControllerFactory();
}
@Bean
public EventBeanPostProcessor eventBeanPostProcessor() {
return new EventBeanPostProcessor();
}
@Bean
public EventContextClosedHandler eventContextClosedHandler() {
return new EventContextClosedHandler();
}
}
EventBeanPostProcessor
后置处理器,获取所有的监听者,注册到对应的EventController
包装里面的EventBus
public class EventBeanPostProcessor implements BeanPostProcessor {
@Autowired
private EventControllerFactory eventControllerFactory;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().isAnnotationPresent(EventBus.class)) {
EventBus eventBusAnnotation = bean.getClass().getAnnotation(EventBus.class);
String identifier = eventBusAnnotation.identifier();
boolean async = eventBusAnnotation.async();
eventControllerFactory.getController(identifier, async).register(bean);
}
return bean;
}
}
EventControllerFactory
,默认获取的是identifier=EventConstant.SHARED_CONTROLLER
对应的EventController
,从ConcurrentHashMap
获取,没有就创建。如果是异步的,创建的是AsyncEventBus
。
public final class EventControllerFactory {
@Autowired
private ThreadPoolFactory threadPoolFactory;
private volatile Map<String, EventController> syncControllerMap = new ConcurrentHashMap<String, EventController>();
private volatile Map<String, EventController> asyncControllerMap = new ConcurrentHashMap<String, EventController>();
public EventController getAsyncController() {
return getAsyncController(EventConstant.SHARED_CONTROLLER);
}
public EventController getAsyncController(String identifier) {
return getController(identifier, true);
}
public EventController getController(String identifier, boolean async) {
return getController(identifier, async ? EventType.ASYNC : EventType.SYNC);
}
public EventController getController(String identifier, EventType type) {
switch (type) {
case SYNC:
EventController syncEventController = syncControllerMap.get(identifier);
if (syncEventController == null) {
EventController newEventController = createSyncController(identifier);
syncEventController = syncControllerMap.putIfAbsent(identifier, newEventController);
if (syncEventController == null) {
syncEventController = newEventController;
}
}
return syncEventController;
case ASYNC:
EventController asyncEventController = asyncControllerMap.get(identifier);
if (asyncEventController == null) {
EventController newEventController = createAsyncController(identifier, threadPoolFactory.getThreadPoolExecutor(identifier));
asyncEventController = asyncControllerMap.putIfAbsent(identifier, newEventController);
if (asyncEventController == null) {
asyncEventController = newEventController;
}
}
return asyncEventController;
}
return null;
}
public EventController createSyncController(String identifier) {
return new EventControllerImpl(new EventBus(identifier));
}
public EventController createAsyncController(String identifier, Executor executor) {
return new EventControllerImpl(new AsyncEventBus(identifier, executor));
}
EventControllerImpl
实现了EventController
,可以注册监听器,发布事件
public class EventControllerImpl implements EventController {
private EventBus eventBus;
public EventControllerImpl(EventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public void register(Object object) {
eventBus.register(object);
}
@Override
public void unregister(Object object) {
eventBus.unregister(object);
}
@Override
public void post(Object event) {
eventBus.post(event);
}
}