事件驱动模式
举个例子🌰
大部分软件或者APP都有会有会员系统,当我们注册为会员时,商家一般会把我们拉入会员群、给我们发优惠券、推送欢迎语什么的。
值得注意的是:
- 注册成功后才会产生后面的这些动作;
- 注册成功后的这些动作没有先后执行顺序之分;
- 注册成功后的这些动作的执行结果不能互相影响;
传统写法
public Boolean doRegisterVip(){
//1、注册会员
registerVip();
//2、入会员群
joinMembershipGroup();
//3、发优惠券
issueCoupons();
//4、推送消息
sendWelcomeMsg();
}
这样的写法将所有的动作都耦合在doRegisterVip方法中,首先执行效率低下,其次耦合度太高,最后不好扩展。那么如何优化这种逻辑呢?
事件驱动模式原理介绍🍓
Spring的事件驱动模型由三部分组成:
事件:用户可自定义事件类和相关属性及行为来表述事件特征,Spring4.2之后定义事件不需要再显式继承ApplicationEvent类,直接定义一个bean即可,Spring会自动通过PayloadApplicationEvent来包装事件。
事件发布者:在Spring中可通过ApplicationEventPublisher把事件发布出去,这样事件内容就可以被监听者消费处理。
事件监听者:ApplicationListener,监听发布事件,处理事件发生之后的后续操作。
原理图如下:
代码实现
1. 定义基本元素
事件发布者:EventEngine.java、EventEngineImpl.java
package com.example.event.config;
/**
* 事件引擎
*/
public interface EventEngine {
/**
* 发送事件
*
* @param event 事件
*/
void publishEvent(BizEvent event);
}
package com.example.event.config;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.springframework.util.CollectionUtils;
/**
* 事件引擎实现类
*/
public class EventEngineImpl implements EventEngine {
/**
* 异步执行器。也系统需要自行定义线程池
*/
private Executor bizListenerExecutor;
/**
* 是否异步,默认为false
*/
private boolean async;
/**
* 订阅端 KEY是TOPIC,VALUES是监听器集合
*/
private Map<String, List<BizEventListener>> bizSubscribers = new HashMap<>(16);
@Override
public void publishEvent(BizEvent event) {
List<BizEventListener> listeners = bizSubscribers.get(event.getTopic());
if (CollectionUtils.isEmpty(listeners)) {
return;
}
for (BizEventListener bizEventListener : listeners) {
if (bizEventListener.decide(event)) {
//异步执行的话,放入线程池
if (async) {
bizListenerExecutor.execute(new EventSubscriber(bizEventListener, event));
} else {
bizEventListener.onEvent(event);
}
}
}
}
/**
* Setter method for property <tt>bizListenerExecutor</tt>.
*
* @param bizListenerExecutor value to be assigned to property bizListenerExecutor
*/
public void setBizListenerExecutor(Executor bizListenerExecutor) {
this.bizListenerExecutor = bizListenerExecutor;
}
/**
* Setter method for property <tt>bizSubscribers</tt>.
*
* @param bizSubscribers value to be assigned to property bizSubscribers
*/
public void setBizSubscribers(Map<String, List<BizEventListener>> bizSubscribers) {
this.bizSubscribers = bizSubscribers;
}
/**
* Setter method for property <tt>async</tt>.
*
* @param async value to be assigned to property async
*/
public void setAsync(boolean async) {
this.async = async;
}
}
事件:BizEvent.java
package com.example.event.config;
import java.util.EventObject;
/**
* 业务事件
*/
public class BizEvent extends EventObject {
/**
* Topic
*/
private final String topic;
/**
* 业务id
*/
private final String bizId;
/**
* 数据
*/
private final Object data;
/**
* @param topic 事件topic,用于区分事件类型
* @param bizId 业务ID,标识这一次的调用
* @param data 事件传输对象
*/
public BizEvent(String topic, String bizId, Object data) {
super(data);
this.topic = topic;
this.bizId = bizId;
this.data = data;
}
/**
* Getter method for property <tt>topic</tt>.
*
* @return property value of topic
*/
public String getTopic() {
return topic;
}
/**
* Getter method for property <tt>id</tt>.
*
* @return property value of id
*/
public String getBizId() {
return bizId;
}
/**
* Getter method for property <tt>data</tt>.
*
* @return property value of data
*/
public Object getData() {
return data;
}
}
事件监听者:EventSubscriber.java
package com.example.event.config;
/**
* 事件监听者。注意:此时已经没有线程上下文,如果需要请修改构造函数,显示复制上下文信息
*/
public class EventSubscriber implements Runnable {
/**
* 业务监听器
**/
private BizEventListener bizEventListener;
/**
* 业务事件
*/
private BizEvent bizEvent;
/**
* @param bizEventListener 事件监听者
* @param bizEvent 事件
*/
public EventSubscriber(BizEventListener bizEventListener, BizEvent bizEvent) {
super();
this.bizEventListener = bizEventListener;
this.bizEvent = bizEvent;
}
@Override
public void run() {
bizEventListener.onEvent(bizEvent);
}
}
2. 其他组件
业务事件监听器:BizEventListener.java
package com.example.event.config;
import java.util.EventListener;
/**
* 业务事件监听器
*
*/
public interface BizEventListener extends EventListener {
/**
* 是否执行事件
*
* @param event 事件
* @return
*/
public boolean decide(BizEvent event);
/**
* 执行事件
*
* @param event 事件
*/
public void onEvent(BizEvent event);
}
事件引擎topic:EventEngineTopic.java
package com.example.event.config;
/**
* 事件引擎topic,用于区分事件类型
*/
public class EventEngineTopic {
/**
* 入会员群
*/
public static final String JOIN_MEMBERSHIP_GROUP = "joinMembershipGroup";
/**
* 发优惠券
*/
public static final String ISSUE_COUPONS = "issueCoupons";
/**
* 推送消息
*/
public static final String SEND_WELCOME_MSG = "sendWelcomeMsg";
}
3. 监听器实现
优惠券处理器:CouponsHandlerListener.java
package com.example.event.listener;
import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;
/**
* 优惠券处理器
*/
@Component
public class CouponsHandlerListener implements BizEventListener {
@Override
public boolean decide(BizEvent event) {
return true;
}
@Override
public void onEvent(BizEvent event) {
System.out.println("优惠券处理器:十折优惠券已发放");
}
}
会员群处理器:MembershipHandlerListener.java
package com.example.event.listener;
import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;
/**
* 会员群处理器
*/
@Component
public class MembershipHandlerListener implements BizEventListener {
@Override
public boolean decide(BizEvent event) {
return true;
}
@Override
public void onEvent(BizEvent event) {
System.out.println("会员群处理器:您已成功加入会员群");
}
}
消息推送处理器:MsgHandlerListener.java
package com.example.event.listener;
import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;
/**
* 消息推送处理器
*/
@Component
public class MsgHandlerListener implements BizEventListener {
@Override
public boolean decide(BizEvent event) {
return true;
}
@Override
public void onEvent(BizEvent event) {
System.out.println("消息推送处理器:欢迎成为会员!!!");
}
}
事件驱动引擎配置:EventEngineConfig.java
package com.example.event.listener;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.example.event.config.BizEventListener;
import com.example.event.config.EventEngine;
import com.example.event.config.EventEngineImpl;
import com.example.event.config.EventEngineTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
/**
* 事件驱动引擎配置
*/
@Configuration
public class EventEngineConfig {
/**
* 线程池异步处理事件
*/
private static final Executor EXECUTOR = new ThreadPoolExecutor(20, 50, 10, TimeUnit.MINUTES,
new LinkedBlockingQueue(500), new CustomizableThreadFactory("EVENT_ENGINE_POOL"));
@Bean("eventEngineJob")
public EventEngine initJobEngine(CouponsHandlerListener couponsHandlerListener,
MembershipHandlerListener membershipHandlerListener,
MsgHandlerListener msgHandlerListener) {
Map<String, List<BizEventListener>> bizEvenListenerMap = new HashMap<>();
//注册优惠券事件
bizEvenListenerMap.put(EventEngineTopic.ISSUE_COUPONS, Arrays.asList(couponsHandlerListener));
//注册会员群事件
bizEvenListenerMap.put(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, Arrays.asList(membershipHandlerListener));
//注册消息推送事件
bizEvenListenerMap.put(EventEngineTopic.SEND_WELCOME_MSG, Arrays.asList(msgHandlerListener));
EventEngineImpl eventEngine = new EventEngineImpl();
eventEngine.setBizSubscribers(bizEvenListenerMap);
eventEngine.setAsync(true);
eventEngine.setBizListenerExecutor(EXECUTOR);
return eventEngine;
}
}
4. 测试类
TestController.java
package com.example.event.controller;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Resource;
import com.example.event.config.BizEvent;
import com.example.event.config.EventEngine;
import com.example.event.config.EventEngineTopic;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Resource(name = "eventEngineJob")
private EventEngine eventEngine;
@GetMapping("/doRegisterVip")
public String doRegisterVip(@RequestParam(required = true) String userName,
@RequestParam(required = true) Integer age) {
Map<String, Object> paramMap = new HashMap<>(16);
paramMap.put("userName", userName);
paramMap.put("age", age);
//1、注册会员,这里不实现了
System.out.println("注册会员成功");
//2、入会员群
eventEngine.publishEvent(
new BizEvent(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, UUID.randomUUID().toString(), paramMap));
//3、发优惠券
eventEngine.publishEvent(
new BizEvent(EventEngineTopic.ISSUE_COUPONS, UUID.randomUUID().toString(), paramMap));
//4、推送消息
eventEngine.publishEvent(
new BizEvent(EventEngineTopic.SEND_WELCOME_MSG, UUID.randomUUID().toString(), paramMap));
return "注册会员成功";
}
}
项目代码结构
调用接口
http://localhost:8080/test/doRegisterVip?userName=zhangsan&age=28