文章目录
- EDA 概述
- 初体验
- Event
- Event Handlers
- Event Loop
- 如何设计一个Event-Driven框架
- 同步EDA框架设计
- Message
- Channel
- Dynamic Router
- Event
- EventDispatcher
- 测试
- 同步EDA架构类图
- 异步EDA框架设计
- 抽象基类 AsyncChannel
- AsyncEventDispatcher 并发分发消息
- 测试
EDA 概述
EDA(Event-Driven Architecture)是一种实现组件之间松耦合、易扩展的架构方式。
一个最简单的EDA设计需要包含如下几个组件:
-
Events:需要被处理的数据。
-
Event Handlers:处理Events的方式方法。
-
Event Loop:维护Events和Event Handlers之间的交互流程。
举个例子 :
Event A将被Handler A处理,而Event B将被Handler B处理,这一切的分配都是由Event Loop所控制的。
Events是EDA中的重要角色,一个Event至少需要包含两个属性:类型和数据
- Event的类型决定了它会被哪个Handler处理
- Data 是在Handler中代加工的材料
初体验
Event
package com.artisan.eventdriven.event;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
* @desc:Event只包含了该Event所属的类型和所包含的数据
*/
public class Event {
private final String type;
private final String data;
public Event(String type, String data) {
this.type = type;
this.data = data;
}
public String getType() {
return type;
}
public String getData() {
return data;
}
}
Event Handlers
Event Handlers主要用于处理Event,比如一些filtering或者transforming数据的操作等,下面我们写两个比较简单的方法
/**
* 用于处理A 类型的Event
*
* @param e
*/
public static void handleEventA(Event e) {
System.out.println(e.getData().toLowerCase());
}
/**
* 用于处理B类型的Event
*
* @param e
*/
public static void handleEventB(Event e) {
System.out.println(e.getData().toUpperCase());
}
- handleEventA方法只是简单地将Event中的data进行了lowerCase之后的输出
- handleEventB方法也是足够的简单,直接将Event中的字符串数据变成大写进行了控制台输出
Event Loop
Event Loop处理接收到的所有Event,并且将它们分配给合适的Handler去处理
Event e;
while (!events.isEmpty()) {
//从消息队列中不断移除,根据不同的类型进行处理
e = events.remove();
switch (e.getType()) {
case "A":
handleEventA(e);
break;
case "B":
handleEventB(e);
break;
}
}
在EventLoop中,每一个Event都将从Queue中移除出去,通过类型匹配交给合适的Handler去处理。
package com.artisan.eventdriven.demo;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
import com.artisan.eventdriven.event.Event;
import java.util.LinkedList;
import java.util.Queue;
public class FooEventDrivenExample {
/**
* 用于处理A 类型的Event
*
* @param e
*/
public static void handleEventA(Event e) {
System.out.println(e.getData().toLowerCase());
}
/**
* 用于处理B类型的Event
*
* @param e
*/
public static void handleEventB(Event e) {
System.out.println(e.getData().toUpperCase());
}
public static void main(String[] args) {
Queue<Event> events = new LinkedList<>();
events.add(new Event("A", "Hello"));
events.add(new Event("A", "I am Event A"));
events.add(new Event("B", "I am Event B"));
events.add(new Event("B", "World"));
Event e;
while (!events.isEmpty()) {
//从消息队列中不断移除,根据不同的类型进行处理
e = events.remove();
switch (e.getType()) {
case "A":
handleEventA(e);
break;
case "B":
handleEventB(e);
break;
}
}
}
}
虽然这个EDA的设计足够简单,但是通过它我们可以感受到EDA中三个重要组件之间的交互关系。
如何设计一个Event-Driven框架
一个基于事件驱动的架构设计,总体来讲会涉及如下几个重要组件:
- 事件消息(Event)
- 针对该事件的具体处理器(Handler)
- 接受事件消息的通道(上个Demo中的queue)
- 以及对事件消息如何进行分配(Event Loop)
同步EDA框架设计
我们先设计开发一个高度抽象的同步EDA框架,后续再考虑增加异步功能
Message
在基于Message的系统中,每一个Event也可以被称为Message,Message是对Event更高一个层级的抽象,每一个Message都有一个特定的Type用于与对应的Handler做关联
package com.artisan.eda.intf;
public interface Message {
/**
* 返回Message的类型
*/
Class<? extends Message> getType();
}
Channel
第二个比较重要的概念就是Channels,Channel主要用于接受来自Event Loop分配的消息,每一个Channel负责处理一种类型的消息(当然这取决于我们对消息如何进行分配)
package com.artisan.eda.intf;
public interface Channel<E extends Message> {
/**
* dispatch方法用于负责Message的调度
*/
void dispatch(E message);
}
Dynamic Router
Router的作用类似于上面的Event Loop,其主要是帮助Event找到合适的Channel并且传送给它
package com.artisan.eda.intf;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public interface DynamicRouter<E extends Message> {
/**
* 针对每一种Message类型注册相关的Channel,只有找到合适的Channel该Message才会被处理
*/
void registerChannel(Class<? extends E> messageType,
Channel<? extends E> channel);
/**
* 为相应的Channel分配Message
*/
void dispatch(E message);
}
Router如何知道要将Message分配给哪个Channel呢?换句话说,Router需要了解到Channel的存在,因此registerChannel()方法的作用就是将相应的Channel注册给Router,dispatch方法则是根据Message的类型进行路由匹配。
Event
Event是对Message的一个最简单的实现,在以后的使用中,将Event直接作为其他Message的基类即可(这种做法有点类似于适配器模式)
package com.artisan.eda.event;
import com.artisan.eda.intf.Message;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class Event implements Message {
@Override
public Class<? extends Message> getType() {
return getClass();
}
}
EventDispatcher
EventDispatcher是对DynamicRouter的一个最基本的实现,适合在单线程的情况下进行使用,因此不需要考虑线程安全的问题
package com.artisan.eda.router;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
import com.artisan.eda.intf.Channel;
import com.artisan.eda.intf.DynamicRouter;
import com.artisan.eda.intf.Message;
import java.util.HashMap;
import java.util.Map;
/**
* EventDispatcher不是一个线程安全的类
* @author artisan
*/
public class EventDispatcher implements DynamicRouter<Message> {
/**
* 用于保存Channel和Message之间的关系
*/
private final Map<Class<? extends Message>, Channel> routerTable;
public EventDispatcher() {
//初始化RouterTable,但是在该实现中,我们使用HashMap作为路由表
this.routerTable = new HashMap<>();
}
@Override
public void dispatch(Message message) {
if (routerTable.containsKey(message.getType())) {
//直接获取对应的Channel处理Message
routerTable.get(message.getType()).dispatch(message);
} else
throw new MessageMatcherException("Can't match the channel for [" + message.getType() + "] type");
}
@Override
public void registerChannel(Class<? extends Message> messageType,
Channel<? extends Message> channel) {
this.routerTable.put(messageType, channel);
}
}
在EventDispatcher中有一个注册表routerTable,主要用于存放不同类型Message对应的Channel,如果没有与Message相对应的Channel,则会抛出无法匹配的异常。
package com.artisan.eda.exceptions;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class MessageMatcherException extends RuntimeException {
public MessageMatcherException(String message) {
super(message);
}
}
测试
package com.artisan.eventdriven.eda;
import com.artisan.eda.event.Event;
import com.artisan.eda.intf.Channel;
import com.artisan.eda.router.EventDispatcher;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EventDispatcherExample
{
/**
* InputEvent中定义了两个属性X和Y,主要用于在其他Channel中的运算
*/
static class InputEvent extends Event
{
private final int x;
private final int y;
public InputEvent(int x, int y)
{
this.x = x;
this.y = y;
}
public int getX()
{
return x;
}
public int getY()
{
return y;
}
}
/**
* 用于存放结果的Event
*/
static class ResultEvent extends Event
{
private final int result;
public ResultEvent(int result)
{
this.result = result;
}
public int getResult()
{
return result;
}
}
/**
* 处理ResultEvent的Handler(Channel),只是简单地将计算结果输出到控制台
*/
static class ResultEventHandler implements Channel<ResultEvent>
{
@Override
public void dispatch(ResultEvent message)
{
System.out.println("The result is:" + message.getResult());
}
}
/**
* InputEventHandler需要向Router发送Event,因此在构造的时候需要传入Dispatcher
*/
static class InputEventHandler implements Channel<InputEvent>
{
private final EventDispatcher dispatcher;
public InputEventHandler(EventDispatcher dispatcher)
{
this.dispatcher = dispatcher;
}
/**
*将计算的结果构造成新的Event提交给Router
*/
@Override
public void dispatch(InputEvent message)
{
System.out.printf("X:%d,Y:%d\n", message.getX(), message.getY());
int result = message.getX() + message.getY();
dispatcher.dispatch(new ResultEvent(result));
}
}
public static void main(String[] args)
{
//构造Router
EventDispatcher dispatcher = new EventDispatcher();
//将Event和Handler(Channel)的绑定关系注册到Dispatcher
dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
dispatcher.dispatch(new InputEvent(1, 2));
}
}
由于所有的类都存放于一个文件中,因此看起来测试代码比较多,其实结构还是非常清晰的,
- InputEvent是一个Message,它包含了两个Int类型的属性,
- 而InputEventHandler是对InputEvent消息的处理,接收到了InputEvent消息之后,分别对X和Y进行相加操作,然后将结果封装成ResultEvent提交给EventDispatcher,
- ResultEvent相对比较简单,只包含了计算结果的属性,ResultEventHandler则将计算结果输出到控制台上。
通过上面这个例子的运行会发现,不同数据的处理过程之间根本无须知道彼此的存在,一切都由EventDispatcher这个Router来控制,它会给你想要的一切,这是一种稀疏耦合(松耦合)的设计
EDA的设计除了松耦合特性之外,扩展性也是非常强的,比如Channel非常容易扩展和替换,另外由于Dispatcher统一负责Event的调配,因此在消息通过Channel之前可以进行很多过滤、数据验证、权限控制、数据增强(Enhance)等工作。
同步EDA架构类图
异步EDA框架设计
上面的同步EDA框架,在应对高并发的情况下还是存在一些问题的,具体如下。
-
EventDispatcher不是线程安全的类,在多线程的情况下,registerChannel方法会引起数据不一致的问题。
-
就目前而言,我们实现的所有Channel都无法并发消费Message,比如InputEventHandler只能逐个处理Message,低延迟的消息处理还会导致Dispatcher出现积压。
抽象基类 AsyncChannel
我们继续对EDA框架进行扩充,使其可支持并发任务的执行,下面定义了一个新的AsyncChannel
作为基类,该类中提供了Message的并发处理能力。
package com.artisan.eda.async;
import com.artisan.eda.event.Event;
import com.artisan.eda.intf.Channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public abstract class AsyncChannel implements Channel<Event> {
/**
* 在AsyncChannel 中将使用ExecutorService多线程的方式提交给Message
*/
private final ExecutorService executorService;
/**
* 默认构造函数,提供了CPU的核数×2的线程数量
*/
public AsyncChannel() {
this(Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors() * 2));
}
/**
* 用户自定义的ExecutorService
*
* @param executorService
*/
public AsyncChannel(ExecutorService executorService) {
this.executorService = executorService;
}
/**
* 重写dispatch方法,并且用final修饰,避免子类重写
*
* @param message
*/
@Override
public final void dispatch(Event message) {
executorService.submit(() -> this.handle(message));
}
/**
* 提供抽象方法,供子类实现具体的Message处理
*
* @param message
*/
protected abstract void handle(Event message);
/**
* 提供关闭ExecutorService的方法
*/
public void stop() {
if (null != executorService && !executorService.isShutdown())
executorService.shutdown();
}
}
- 为了防止子类在继承AsyncChannel基类的时候重写dispatch方法,用final关键字对其进行修饰(Template Method Design Pattern),
- handle方法用于子类对Message进行具体的处理,
- stop方法则用来停止ExecutorService。
AsyncEventDispatcher 并发分发消息
其次,还需要提供新的EventDispatcher类AsyncEventDispatcher负责以并发的方式dispatch Message,其中Event对应的Channel只能是AsyncChannel类型,并且也对外暴露了shutdown方法
package com.artisan.eda.router;
import com.artisan.eda.async.AsyncChannel;
import com.artisan.eda.event.Event;
import com.artisan.eda.exceptions.MessageMatcherException;
import com.artisan.eda.intf.Channel;
import com.artisan.eda.intf.DynamicRouter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class AsyncEventDispatcher implements DynamicRouter<Event> {
/**
* 使用线程安全的ConcurrentHashMap替换HashMap
*/
private final Map<Class<? extends Event>, AsyncChannel> routerTable;
public AsyncEventDispatcher() {
this.routerTable = new ConcurrentHashMap<>();
}
@Override
public void registerChannel(Class<? extends Event> messageType,
Channel<? extends Event> channel) {
//在AsyncEventDispatcher中,Channel必须是AsyncChannel类型
if (!(channel instanceof AsyncChannel)) {
throw new IllegalArgumentException("The channel must be AsyncChannel Type.");
}
this.routerTable.put(messageType, (AsyncChannel) channel);
}
@Override
public void dispatch(Event message) {
if (routerTable.containsKey(message.getType())) {
routerTable.get(message.getType()).dispatch(message);
} else {
throw new MessageMatcherException("Can't match the channel for ["
+ message.getType() + "] type");
}
}
public void shutdown() {
//关闭所有的Channel以释放资源
routerTable.values().forEach(AsyncChannel::stop);
}
}
在AsyncEventDispatcher中,routerTable使用线程安全的Map定义,在注册Channel的时候,如果其不是AsyncChannel的类型,则会抛出异常。
测试
package com.artisan.eventdriven.eda;
import com.artisan.eda.async.AsyncChannel;
import com.artisan.eda.event.Event;
import com.artisan.eda.router.AsyncEventDispatcher;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class AsyncEventDispatcherExample {
//主要用于处理InputEvent,但是需要继承AsyncChannel
static class AsyncInputEventHandler extends AsyncChannel {
private final AsyncEventDispatcher dispatcher;
AsyncInputEventHandler(AsyncEventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
//不同于以同步的方式实现dispatch,异步的方式需要实现handle
@Override
protected void handle(Event message) {
EventDispatcherExample.InputEvent inputEvent =
(EventDispatcherExample.InputEvent) message;
System.out.printf("X:%d,Y:%d\n", inputEvent.getX(), inputEvent.getY());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = inputEvent.getX() + inputEvent.getY();
dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
}
}
//主要用于处理InputEvent,但是需要继承AsyncChannel
static class AsyncResultEventHandler extends AsyncChannel {
@Override
protected void handle(Event message) {
EventDispatcherExample.ResultEvent resultEvent =
(EventDispatcherExample.ResultEvent) message;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The result is:" + resultEvent.getResult());
}
}
public static void main(String[] args) {
//定义AsyncEventDispatcher
AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();
//注册Event和Channel之间的关系
dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
//提交需要处理的Message
dispatcher.dispatch(new EventDispatcherExample.InputEvent(1, 2));
}
}
当dispatcher分配一个Event的时候,如果执行非常缓慢也不会影响下一个Event被dispatch,这主要得益于我们采用了异步的处理方式(ExecutorService本身存在的任务队列可以允许异步提交一定数量级的数据)