模块封装
shenyu-disruptor
定义了DisruptorProvider
、DisruptorProviderManage
、DataEvent
、QueueConsumerFactory
、DisrutporThreadFactory
等一系列通用接口
该模块的搭建了一个disruptor的初始化框架,
DisruptorProviderManage
提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer
的成员变量当中,有QueueConsumer
进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory
创建QueueConsumerExecutor
进行消息的处理,QueueConsumerExecutor
可以拿到消息,是具体的操作。而在DisruptorProviderManage
对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor
其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory
接口,用来创建自己的实现的QueueConsumerExecutor
,将工厂类用做DisruptorProviderManage
的构造参数,获得对象,之后调用DisruptorProviderManage
对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage
对象获取provider,进行消息的发布和disruptor的关闭。
项目启动
RegisterClientServerDisruptorPublisher#start
,启动DisruptorProviderManage
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));
providerManage = new DisruptorProviderManage<>(factory);
providerManage.startup();
}
DisruptorProviderManage#startup(boolean)
,初始化Disruptor
配置。
public void startup(final boolean isOrderly) {
OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
int newConsumerSize = this.consumerSize;
EventFactory<DataEvent<T>> eventFactory;
if (isOrderly) {
newConsumerSize = 1;
eventFactory = new OrderlyDisruptorEventFactory<>();
} else {
eventFactory = new DisruptorEventFactory<>();
}
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
size,
DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
ProducerType.MULTI,
new BlockingWaitStrategy());
@SuppressWarnings("all")
QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
for (int i = 0; i < newConsumerSize; i++) {
consumers[i] = new QueueConsumer<>(executor, consumerFactory);
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
}
发布事件
ShenyuClientRegisterEventPublisher#publishEvent
,发布事件
public void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
DisruptorProvider#onData
,调用ringBuffer
处理数据
public void onData(final T data) {
if (isOrderly) {
throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");
}
try {
ringBuffer.publishEvent(translatorOneArg, data);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
QueueConsumer#onEvent
,处理数据
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
ThreadPoolExecutor executor = orderly(t);
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
executor.execute(queueConsumerExecutor);
}
}
创建QueueConsumerExecutor
,获取所有的getSubscribers
,进行分组。
@Override
public QueueConsumerExecutor<Collection<DataTypeParent>> create() {
Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers()
.stream()
.map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e)
.collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));
return new RegisterServerConsumerExecutor(maps);
}
处理事件
RegisterServerConsumerExecutor#run
,线程执行,获取对应的ExecutorSubscriber
,调用executor
@Override
public void run() {
Collection<DataTypeParent> results = getData()
.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
selectExecutor(results).executor(results);
}
private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
final Optional<DataTypeParent> first = list.stream().findFirst();
return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
}
相关博客
-
【开源项目】Disruptor框架介绍及快速入门
-
【源码解析】Disruptor框架的源码解析