文章目录
- EventBus简介
- DDD领域事件架构简析
- 快速入门
- pom依赖
- bean配置
- Publisher
- Subscriber
- 设计原理
- Publisher
- Subscriber
事件总线(EventBus),设计初衷是解耦系统模块,将系统中的各类业务操作抽象为事件模型,我们把产生事件的部分称之为事件的发送者(Publisher),消费事件的部分称之为订阅者(Subcriber)。
EventBus简介
EventBus是一个publisher/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。
作为一个消息总线主要有三个组成部分:
- 事件(Event):可以是任意类型的对象。通过事件的发布者将事件进行传递。
- 事件订阅者(Subscriber):接收特定的事件。
- 事件发布者(Publisher):用于通知 Subscriber 有事件发生。可以在任意线程任 意位置发送事件。
DDD领域事件架构简析
- 领域事件是领域驱动设计(Domain Driven Design)中的一个概念,它用来表示领域中发生的事件,目的是捕获我们所建模的领域中所发生过的各类事件
- 领域事件忽略不相关的领域活动,明确业务需求,使得业务开发者只关注和跟踪有业务需要的事件或希望被通知的事件。
- 结合领域事件和EventBus,目前GM2的EventBus架构模型
快速入门
pom依赖
<!-- <gm2-common.version>1.0.2-SNAPSHOT</gm2-common.version> -->
<dependency>
<groupId>com.dzg.gm2.common</groupId>
<artifactId>gm2-ddd-event</artifactId>
<version>${gm2-common.version}</version>
</dependency>
bean配置
@Configuration
@EnableEvent
public class EventConfiguration {
@Bean("asyncEventBus")
public EventBus asyncEventBus() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), new NameThreadFactory("async-event-bus-thread"),
new ThreadPoolExecutor.CallerRunsPolicy())
DefaultEventBus eventBus = new DefaultEventBus(threadPoolExecutor);
eventBus.setOrder(2);
return eventBus;
}
}
Publisher
@Service
public class DemoDomainService {
@Resource
private EventCenter eventCenter;
public Result<SeaOrder> commitOrderDraft(SeaOrder orderDraft, UserOperatorContext operator) {
Result<SeaOrder> seaOrderResult = super.commitOrderDraft(orderDraft, operator);
if (orderDraft.getOrderId() != null) {
// 发布事件
eventCenter.publish(CommitOrderDraftDomainEvent.builder().order(orderDraft).build());
}
return seaOrderResult;
}
}
Subscriber
-- 基于@TransactionalEventListener 将领域事件绑定在事务提交之后执行
@EventHandlers(eventBus = EventConstant.TRANSACTION_EVENT_BUS)
public class DemoDomainServiceEventHandler {
@UseEventHandler
public void commitOrderDraft(CommitOrderDraftDomainEvent event) {
}
}
-- 同步处理事件
@EventHandlers(eventBus = EventConstant.DEFAULT_EVENT_BUS)
public class DemoDomainServiceEventHandler {
@UseEventHandler
public void commitOrderDraft(CommitOrderDraftDomainEvent event) {
}
}
-- 异步处理事件
@EventHandlers(eventBus = "asyncEventBus")
public class DemoDomainServiceEventHandler {
@UseEventHandler
public void commitOrderDraft(CommitOrderDraftDomainEvent event) {
}
}
-- MQ处理事件 RocketMqEvent里面需要指订Topic与Tag
@EventHandlers(eventBus = EventConstant.ROCKET_EVENT_BUS)
public class DemoDomainServiceEventHandler {
@UseEventHandler
public void commitOrderDraft(RocketMqEvent event) {
}
}
设计原理
Publisher
- 在启动容器时,gm2-event库里的InitEventBusPostProcessor会对spring管理的bean进行扫描,如果发现有bean实现了接口EventBus时,就会将bean注册到EventCenter,代码如下(只显示主要逻辑)
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof EventBus) {
eventCenter.register((EventBus)bean);
}
return bean;
}
Subscriber
- 在启动容器时,gm2-event库里的EventHandlersPostProcessor会对spring管理的bean进行扫描,如果发现有bean类上面有EventHandlers注解时,,就会将进一步判断该类下面是否有标有UseEventHandler注解的方法,代码如下(只显示主要逻辑)
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
EventHandlers eventHandlers = AnnotationUtils.findAnnotation(bean.getClass(), EventHandlers.class);
if (Objects.nonNull(eventHandlers)) {
ImmutableList<Method> annotatedMethods = EventHandlerRegistry.getAnnotatedMethods(bean.getClass());
for (Method method : annotatedMethods) {
UseEventHandler useEventHandler = AnnotationUtils.findAnnotation(method, UseEventHandler.class);
if (Objects.nonNull(useEventHandler)) {
// 事件名称和 evnet bus名称
String eventBusName = useEventHandler.eventBus();
if (StringUtils.isEmpty(eventBusName)) {
eventBusName = eventHandlers.eventBus();
}
if (StringUtils.isEmpty(eventBusName)) {
eventBusName = EventConstant.DEFAULT_EVENT_BUS;
}
EventBus eventBus = (EventBus)this.applicationContext.getBean(eventBusName);
eventBus.register(new MethodEventHandlerWrapper(bean, method, method.getParameterTypes()[0]),
method.getParameterTypes()[0]);
}
}
}
return bean;
}
- 在启动容器时,gm2-event库里的UseEventHandlerPostProcessor会对spring管理的bean进行扫描,如果发现有bean类上面有UseEventHandler注解时,就会将bean注册到EventCenter,代码如下(只显示主要逻辑)
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
UseEventHandler useEventHandler = AnnotationUtils.findAnnotation(bean.getClass(), UseEventHandler.class);
if (Objects.nonNull(useEventHandler)) {
if (bean instanceof EventHandler) {
// 注解定义的bus名称和eventName
String eventBusName = useEventHandler.eventBus();
if (StringUtils.isEmpty(eventBusName)) {
eventBusName = EventConstant.DEFAULT_EVENT_BUS;
}
EventBus eventBus = (EventBus) this.applicationContext.getBean(eventBusName);
eventBus.register((EventHandler) bean, getActualType((EventHandler)bean));
}
}
return bean;
}