Disruptor框架简介
Disruptor框架内部核心的数据结构是Ring Buffer,Ring Buffer是一个环形的数组,Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构;JDK的BlockingQueue相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟BlockingQueue一样,Disruptor框架也是围绕Ring Buffer实现生产者和消费者之间数据的交换,只不过Disruptor框架性能更高,笔者曾经在同样的环境下拿Disruptor框架跟ArrayBlockingQueue做过性能测试,Disruptor框架处理数据的性能比ArrayBlockingQueue的快几倍。
Disruptor
框架性能为什么会更好呢?其有以下特点:
- 预加载内存可以理解为使用了内存池;
- 无锁化
- 单线程写
- 消除伪共享
- 使用内存屏障
- 序号栅栏机制
相关概念
-
Disruptor
:是使用Disruptor框架的核心类,持有RingBuffer、消费者线程池、消费者集合ConsumerRepository和消费者异常处理器ExceptionHandler等引用; -
Ring Buffer
: RingBuffer处于Disruptor框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有Sequencer的引用; -
Sequencer
: Sequencer是Disruptor框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类SingleProducerSequencer和MultiProducerSequencer。 -
Sequence
:Sequence被用来标识Ring Buffer和消费者Event Processor的处理进度,每个消费者Event Processor和Ring Buffer本身都分别维护了一个Sequence,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。 -
Sequence Barrier
:Sequence Barrier即为序号屏障,通过追踪生产者的cursorSequence和每个消费者( EventProcessor)的sequence的方式来协调生产者和消费者之间的数据交换进度,其实现类ProcessingSequenceBarrier持有的WaitStrategy等待策略类是实现序号屏障的核心。 -
Wait Strategy
:Wait Strategy是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢? -
Event Processor
:Event Processor可以理解为消费者线程,该线程会一直从Ring Buffer获取数据来消费数据,其有两个核心实现类:BatchEventProcessor和WorkProcessor。 -
Event Handler
:Event Handler可以理解为消费者实现业务逻辑的Handler,被BatchEventProcessor类引用,在BatchEventProcessor线程的死循环中不断从Ring Buffer获取数据供Event Handler消费。 -
Producer
:生产者,一般用RingBuffer.publishEvent来生产数据。
快速入门
MQManager
启用Disruptor
,返回RingBuffer
实例。
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
ExecutorService executor = Executors.newFixedThreadPool(2);
//指定事件工厂
HelloEventFactory factory = new HelloEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单线程模式,获取额外的性能
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
//设置事件业务处理器---消费者
disruptor.handleEventsWith(new HelloEventHandler());
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
MessageModel
消息实体类
@Data
public class MessageModel {
private String message;
}
工厂类
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
消息处理器
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
log.info("消费者处理消息开始");
if (event != null) {
log.info("消费者消费的信息是:{}",event);
}
} catch (Exception e) {
log.info("消费者处理消息失败");
}
log.info("消费者处理消息结束");
}
}
消息发送
@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//获取下一个Event槽的下标
long sequence = messageModelRingBuffer.next();
try {
//给Event填充数据
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//发布Event,激活观察者去消费,将sequence传递给改消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}