Disruptor
初始化
初始化Disruptor
实例
//单线程模式,获取额外的性能
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
创建RingBuffer
实例
@Deprecated
public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}
private Disruptor(RingBuffer<T> ringBuffer, Executor executor) {
this.consumerRepository = new ConsumerRepository();
this.started = new AtomicBoolean(false);
this.exceptionHandler = new ExceptionHandlerWrapper();
this.ringBuffer = ringBuffer;
this.executor = executor;
}
RingBuffer#create
,根据producerType
来创建不同的Producer
。
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch(producerType) {
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
RingBuffer#createSingleProducer()
。创建SingleProducerSequencer
实例和RingBuffer
实例
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer(factory, sequencer);
}
创建SingleProducerSequencer
实例
AbstractSequencer
是SingleProducerSequencer
的父类,初始化bufferSize
和waitStrategy
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
} else if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
} else {
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
}
创建SingleProducerSequencer
实例时还初始化了一个成员变量cursor
protected final Sequence cursor = new Sequence(-1L);
给cursor
赋值了一个Sequence
实例对象,Sequence
是标识RingBuffer
环形数组的下标,同时生产者和消费者也会同时维护各自的Sequence
。最重要的是,Sequence
通过填充CPU缓存行避免了伪共享带来的性能损耗
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long VALUE_OFFSET;
public Sequence() {
this(-1L);
}
public Sequence(long initialValue) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
}
class RhsPadding extends Value {
protected long p9;
protected long p10;
protected long p11;
protected long p12;
protected long p13;
protected long p14;
protected long p15;
RhsPadding() {
}
}
class Value extends LhsPadding {
protected volatile long value;
Value() {
}
}
class LhsPadding {
protected long p1;
protected long p2;
protected long p3;
protected long p4;
protected long p5;
protected long p6;
protected long p7;
LhsPadding() {
}
}
创建RingBuffer
实例
RingBufferFields
为RingBuffer
父类,在创建RingBuffer
实例时,会为RingBuffer
的环形数组提前填充Event
对象,即内存池机制。
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (this.bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
} else if (Integer.bitCount(this.bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
} else {
this.indexMask = (long)(this.bufferSize - 1);
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
this.fill(eventFactory);
}
}
private void fill(EventFactory<E> eventFactory) {
for(int i = 0; i < this.bufferSize; ++i) {
this.entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
获取Unsafe
对象。
public static Unsafe getUnsafe() {
return THE_UNSAFE;
}
static {
try {
PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() {
public Unsafe run() throws Exception {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe)theUnsafe.get((Object)null);
}
};
THE_UNSAFE = (Unsafe)AccessController.doPrivileged(action);
} catch (Exception var1) {
throw new RuntimeException("Unable to load unsafe", var1);
}
}
指定EventHandler
//设置事件业务处理器---消费者
disruptor.handleEventsWith(new HelloEventHandler());
Disruptor#handleEventsWith()
。
- 有多少个
eventHandlers
就创建多少个BatchEventProcessor
实例(消费者),BatchEventProcessor
消费者其实就是一个实现Runnable
接口的线程实例; - 同一批次的每个
BatchEventProcessor
实例共用同一个SequenceBarrier
实例
public EventHandlerGroup<T> handleEventsWith(EventHandler<? super T>... handlers) {
return this.createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(Sequence[] barrierSequences, EventHandler<? super T>[] eventHandlers) {
this.checkNotStarted();
Sequence[] processorSequences = new Sequence[eventHandlers.length];
SequenceBarrier barrier = this.ringBuffer.newBarrier(barrierSequences);
int i = 0;
for(int eventHandlersLength = eventHandlers.length; i < eventHandlersLength; ++i) {
EventHandler<? super T> eventHandler = eventHandlers[i];
BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor(this.ringBuffer, barrier, eventHandler);
if (this.exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(this.exceptionHandler);
}
this.consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
if (processorSequences.length > 0) {
this.consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
return new EventHandlerGroup(this, this.consumerRepository, processorSequences);
}
AbstractSequencer#newBarrier
,序列器创建栅栏。
@Override
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
创建栅栏,将生产者序列器和cursorSequence
放在一起。
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
Disruptor#after()
public EventHandlerGroup<T> after(final EventHandler<T>... handlers)
{
final Sequence[] sequences = new Sequence[handlers.length];
for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
{
sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
}
return new EventHandlerGroup<T>(this, consumerRepository, sequences);
}
启动Disruptor
实例
dsl.Disruptor#start
public RingBuffer<T> start()
{
final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
ringBuffer.addGatingSequences(gatingSequences);
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
EventProcessorInfo#start
,用线程池执行封装的eventprocessor
public void start(final Executor executor)
{
executor.execute(eventprocessor);
}
生产者生产数据
RingBuffer#next()
public long next()
{
return sequencer.next();
}
SingleProducerSequencer#next()
,单生产者获取sequence
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
RingBuffer#publish()
@Override
public void publish(long sequence)
{
sequencer.publish(sequence);
}
SingleProducerSequencer#publish(long)
,指定cursor,进行唤醒线程。
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
消费者消费数据
BatchEventProcessor
,dataProvider
是ringBuffer
,sequenceBarrier
是ProcessingSequenceBarrier
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}
timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
BatchEventProcessor#run
,判断是否需要阻塞,使用事件处理器处理数据。
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
ProcessingSequenceBarrier#waitFor
,调用等待策略。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
等待策略
BlockingWaitStrategy
public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
}
协作者模式
Disruptor广播模式跟MQ
的广播模式功能是一样的即生产者生产的消息会广播到每个消费者;不过相信大家在使用MQ
的过程中更多使用的是MQ
的协作者模式即同一个消费者组的消费者共同消费生产者生产的消息,同样,Disruptor
也一样提供了协作者模式。
//4 构建多消费者协作模式工作池
WorkerPool<LongEvent> workerPool = new WorkerPool<LongEvent>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
workHandlers);
//5 设置GatingSequences,创建一个生产者与消费者组之间的屏障,避免生产者生产速度赶上消费最慢的消费者
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());