文章目录
- Pre
- 设计
- Code
- Bus接口
- 自定义注解 @Subscribe
- 同步EventBus
- 异步EventBus
- Subscriber注册表Registry
- Event广播Dispatcher
- 测试
- 简单的Subscriber
- 同步Event Bus
- 异步Event Bus
- 小结
Pre
我们在日常的工作中,都会使用到MQ这种组件, 某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息。
如图所示【消息中间件的消息订阅与发布】
消息中间件的核心作用是提供系统之间的异步消息处理机制。它可以在一个系统完成操作后,通过提交消息到消息中间件,触发其他依赖系统的后续处理,而不需要等待后续处理完全结束。
使用消息中间件的好处有:
- 提高系统处理效率,系统之间可以异步并行处理
- 降低系统耦合,通过消息进行解耦
- 提高系统故障隔离能力,一个系统故障不会影响其他系统
今天我们来实现一个Java进程内部的消息中间件Event Bus,它可以用于进程内不同组件之间的异步消息通信。
设计
- Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event
- register方法用来注册Event接收者(Subscriber)接受响应事件
- EventBus采用同步的方式推送Event
- AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event
- Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法用注解@Subscribe来标识。
- Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber
Code
Bus接口
package com.artisan.busevent.intf;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
* @desc: Bus接口定义了EventBus的所有使用方法
*/
public interface Bus {
/**
* 将某个对象注册到Bus上,从此之后该类就成为Subscriber了
*/
void register(Object subscriber);
/**
* 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息
*/
void unregister(Object subscriber);
/**
* 提交Event到默认的topic
*/
void post(Object event);
/**
* 提交Event到指定的topic
*/
void post(Object Event, String topic);
/**
* 关闭该bus
*/
void close();
/**
* 返回Bus的名称标识
*/
String getBusName();
}
Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。
-
register(Object subscriber)
:将某个对象实例注册给Event Bus -
unregister(Object subscriber
):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除 -
post(Object event)
:提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic -
post(Object Event,String topic)
:提交Event的同时指定了topic -
close()
:销毁该Event Bus -
getBusName()
:返回该Event Bus的名称
自定义注解 @Subscribe
注册对象给Event Bus的时候需要指定接收消息时的回调方法,采用注解的方式进行Event回调
package com.artisan.busevent.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
String topic() default "default-topic";
}
@Subscribe
要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic
)
同步EventBus
同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式.
package com.artisan.busevent.impl;
import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventExceptionHandler;
import java.util.concurrent.Executor;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class EventBus implements Bus {
/**
* 用于维护Subscriber的注册表
*/
private final Registry registry = new Registry();
/**
* Event Bus的名字
*/
private String busName;
/**
* 默认的Event Bus的名字
*/
private final static String DEFAULT_BUS_NAME = "default";
/**
* 默认的topic的名字
*/
private final static String DEFAULT_TOPIC = "default-topic";
/**
* 用于分发广播消息到各个Subscriber的类
*/
private final Dispatcher dispatcher;
public EventBus() {
this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
/**
* @param busName
*/
public EventBus(String busName) {
this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {
this.busName = busName;
this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);
}
public EventBus(EventExceptionHandler exceptionHandler) {
this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
/**
* 将注册Subscriber的动作直接委托给Registry
*
* @param subscriber
*/
@Override
public void register(Object subscriber) {
this.registry.bind(subscriber);
}
/**
* 解除注册同样委托给Registry
*
* @param subscriber
*/
@Override
public void unregister(Object subscriber) {
this.registry.unbind(subscriber);
}
/**
* 提交Event到默认的topic
*
* @param event
*/
@Override
public void post(Object event) {
this.post(event, DEFAULT_TOPIC);
}
/**
* 提交Event到指定的topic,具体的动作是由Dispatcher来完成的
*
* @param event
* @param topic
*/
@Override
public void post(Object event, String topic) {
this.dispatcher.dispatch(this, registry, event, topic);
}
/**
* 关闭销毁Bus
*/
@Override
public void close() {
this.dispatcher.close();
}
/**
* 返回Bus的名称
*
* @return
*/
@Override
public String getBusName() {
return this.busName;
}
}
-
EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
-
registry和unregister都是通过Subscriber注册表来完成的。
-
Event的提交则是由Dispatcher来完成的。
-
Executor是使用JDK中的Executor接口,自定义的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
异步EventBus
如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus 。
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可
package com.artisan.busevent.impl;
import com.artisan.busevent.intf.EventExceptionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class AsyncEventBus extends EventBus {
AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
super(busName, exceptionHandler, executor);
}
public AsyncEventBus(String busName, ThreadPoolExecutor executor) {
this(busName, null, executor);
}
public AsyncEventBus(ThreadPoolExecutor executor) {
this("default-async", null, executor);
}
public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
this("default-async", exceptionHandler, executor);
}
}
重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor 。
Subscriber注册表Registry
注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口
package com.artisan.busevent.impl;
import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.relations.Subscriber;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
class Registry {
/**
* 存储Subscriber集合和topic之间关系的map
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();
public void bind(Object subscriber) {
//获取Subscriber Object的方法集合然后进行绑定
List<Method> subscribeMethods = getSubscribeMethods(subscriber);
subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
}
public void unbind(Object subscriber) {
//unbind为了提高速度,只对Subscriber进行失效操作
subscriberContainer.forEach((key, queue) ->
queue.forEach(s ->
{
if (s.getSubscribeObject() == subscriber) {
s.setDisable(true);
}
}));
}
public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
return subscriberContainer.get(topic);
}
private void tierSubscriber(Object subscriber, Method method) {
final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
String topic = subscribe.topic();
//当某topic没有Subscriber Queue的时候创建一个
subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
//创建一个Subscriber并且加入Subscriber列表中
subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
}
private List<Method> getSubscribeMethods(Object subscriber) {
final List<Method> methods = new ArrayList<>();
Class<?> temp = subscriber.getClass();
//不断获取当前类和父类的所有@Subscribe方法
while (temp != null) {
//获取所有的方法
Method[] declaredMethods = temp.getDeclaredMethods();
//只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法
Arrays.stream(declaredMethods)
.filter(m -> m.isAnnotationPresent(Subscribe.class)
&& m.getParameterCount() == 1
&& m.getModifiers() == Modifier.PUBLIC)
.forEach(methods::add);
temp = temp.getSuperclass();
}
return methods;
}
}
由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,比如
/**
*非常普通的对象
*/
public class SimpleObject
{
/**
*subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数
*/
@Subscribe(topic = "artisan-topic")
public void test2(Integer x)
{
}
@Subscribe(topic = "test-topic")
public void test3(Integer x)
{
}
}
SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event
Event广播Dispatcher
Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法.
package com.artisan.busevent.impl;
import com.artisan.busevent.relations.Subscriber;
import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventContext;
import com.artisan.busevent.intf.EventExceptionHandler;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class Dispatcher {
private final Executor executorService;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;
private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
this.executorService = executorService;
this.exceptionHandler = exceptionHandler;
}
public void dispatch(Bus bus, Registry registry, Object event, String topic) {
//根据topic获取所有的Subscriber列表
ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
if (null == subscribers) {
if (exceptionHandler != null) {
exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),
new BaseEventContext(bus.getBusName(), null, event));
}
return;
}
//遍历所有的方法,并且通过反射的方式进行方法调用
subscribers.stream()
.filter(subscriber -> !subscriber.isDisable())
.filter(subscriber ->
{
Method subscribeMethod = subscriber.getSubscribeMethod();
Class<?> aClass = subscribeMethod.getParameterTypes()[0];
return (aClass.isAssignableFrom(event.getClass()));
}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
}
private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
Method subscribeMethod = subscriber.getSubscribeMethod();
Object subscribeObject = subscriber.getSubscribeObject();
executorService.execute(() ->
{
try {
subscribeMethod.invoke(subscribeObject, event);
} catch (Exception e) {
if (null != exceptionHandler) {
exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));
}
}
});
}
public void close() {
if (executorService instanceof ExecutorService) {
((ExecutorService) executorService).shutdown();
}
}
static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
return new Dispatcher(executor, exceptionHandler);
}
static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
}
static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
}
/**
* 顺序执行的ExecutorService
*/
private static class SeqExecutorService implements Executor {
private final static SeqExecutorService INSTANCE = new SeqExecutorService();
@Override
public void execute(Runnable command) {
command.run();
}
}
/**
* 每个线程负责一次消息推送
*/
private static class PreThreadExecutorService implements Executor {
private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
/**
* 默认的EventContext实现
*/
private static class BaseEventContext implements EventContext {
private final String eventBusName;
private final Subscriber subscriber;
private final Object event;
private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
this.eventBusName = eventBusName;
this.subscriber = subscriber;
this.event = event;
}
@Override
public String getSource() {
return this.eventBusName;
}
@Override
public Object getSubscriber() {
return subscriber != null ? subscriber.getSubscribeObject() : null;
}
@Override
public Method getSubscribe() {
return subscriber != null ? subscriber.getSubscribeMethod() : null;
}
@Override
public Object getEvent() {
return this.event;
}
}
}
在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent接口。
测试
简单的Subscriber
package com.artisan.busevent.consumer;
import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Slf4j
public class SubscriberA {
/**
* 消费默认主题的数据 ,接受的数据类型为 String类型
*
* @param message
*/
@SneakyThrows
@Subscribe
public void consumerDefaultTopic(String message) {
log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getName());
log.info("SubscriberA-->consumerDefaultTopic(模拟执行5秒)-->收到Message(字符串)--> {}", message);
TimeUnit.SECONDS.sleep(5);
log.info("-----------------------consumerDefaultTopic OVER---------------------------------");
}
/**
* 消费 test 的数据 , 接受的数据类型 为 artisan对象
*
* @param artisan
*/
@SneakyThrows
@Subscribe(topic = "test")
public void consumerSpecTopic(Artisan artisan) {
log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getName());
log.info("SubscriberA-->consumerSpecTopic(模拟执行10秒)-->收到Message-(对象)-> {}", artisan);
TimeUnit.SECONDS.sleep(10);
log.info("-----------------------consumerSpecTopic OVER---------------------------------");
}
}
package com.artisan.busevent.consumer;
import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Slf4j
public class SubscriberB {
/**
* 消费默认主题的数据
* @param message
*/
@SneakyThrows
@Subscribe
public void consumerDefaultTopic(String message) {
log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getName());
log.info("SubscriberB-->consumerDefaultTopic(模拟执行3秒)-->收到Message(字符串)--> {}", message);
TimeUnit.SECONDS.sleep(3);
log.info("-----------------------consumerDefaultTopic OVER---------------------------------");
}
/**
* 消费 test 主题的数据
* @param artisan
*/
@SneakyThrows
@Subscribe(topic = "test")
public void consumerSpecTopic(Artisan artisan) {
log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getName());
log.info("SubscriberB-->consumerSpecTopic(模拟执行5秒)-->收到Message(对象)--> {} ", artisan);
TimeUnit.SECONDS.sleep(5);
log.info("-----------------------consumerSpecTopic OVER---------------------------------");
}
}
package com.artisan.busevent.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Data
@NoArgsConstructor
@AllArgsConstructor()
public class Artisan {
private String name;
private Integer age;
private List<String> hobbies;
}
同步Event Bus
package com.artisan.busevent.producer;
import com.artisan.busevent.consumer.SubscriberA;
import com.artisan.busevent.consumer.SubscriberB;
import com.artisan.busevent.entity.Artisan;
import com.artisan.busevent.impl.EventBus;
import com.artisan.busevent.intf.Bus;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Slf4j
public class EventBus_SyncTest {
@Test
public void testSync() {
Bus bus = new EventBus("TestBus");
bus.register(new SubscriberA());
bus.register(new SubscriberB());
log.info("--------默认 主题 ----");
// 默认主题
bus.post("Hello 小工匠");
log.info("--------test 主题 ----");
// 指定topic
bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test");
}
}
异步Event Bus
package com.artisan.busevent.producer;
import com.artisan.busevent.consumer.SubscriberA;
import com.artisan.busevent.consumer.SubscriberB;
import com.artisan.busevent.entity.Artisan;
import com.artisan.busevent.impl.AsyncEventBus;
import com.artisan.busevent.impl.EventBus;
import com.artisan.busevent.intf.Bus;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Slf4j
public class EventBus_AsyncTest {
@Test
public void testSync() throws InterruptedException {
Bus bus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
bus.register(new SubscriberA());
bus.register(new SubscriberB());
log.info("--------默认 主题 ----");
// 默认主题
bus.post("Hello 小工匠");
log.info("--------test 主题 ----");
// 指定topic
bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test");
TimeUnit.SECONDS.sleep(20);
}
}
小结
EventBus有点类似于GOF设计模式中的监听者模式,但是EventBus提供的功能更加强大,使用起来也更加灵活,EventBus中的Subscriber不需要继承任何类或者实现任何接口,在使用EventBus时只需要持有Bus的引用即可。
在EventBus的设计中有三个非常重要的角色(Bus、Registry和Dispatcher),
- Bus主要提供给外部使用的操作方法,
- Registry注册表用来整理记录所有注册在EventBus上的Subscriber,
- Dispatcher主要负责对Subscriber消息进行推送(用反射的方式执行方法),但是考虑到程序的灵活性,Dispatcher方法中又提供了Executor的多态方式。