目录
- 一、简介
- 二、代码
- 2.1 依赖
- 2.2 角色介绍
- 2.3 事件类
- 2.4 生产者
- 2.5 消费者
- 2.6 启动Disruptor
- 2.7 测试
- 源码
一、简介
Disruptor
是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。由于其高性能,获得了很多大奖。
在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。
从数据结构上来看,Disruptor 是一个支持 生产者 -> 消费者
模式的 环形队列
。能够在 无锁 的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后消费次序。
二、代码
2.1 依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
2.2 角色介绍
Event :事件类:生产者和消费者之间进行交换的数据被称为事件(Event)。
Producer: 生产者,用于发布事件。
Consumer :消费者(实现EventHandler接口):用于处理事件。
Disruptor通过事件工厂EventFactory在RingBuffer中预创建事件Event的实例。
一个事件实例Event类似于一个数据槽。
生产者Producer发布Publish之前,先从Ringbuffer中获取一个事件Event实例。
然后生产者Producer向事件Event实例中填充数据,然后再发布到RingBuffer中。
最后由消费者Consumer获取事件Event实例并读取实例中的数据。
2.3 事件类
/**
* @Author: LiuShihao
* @Date: 2022/11/23 14:39
* @Desc: 定义事件类:生产者和消费者之间进行交换的数据
*/
public class LogEvent {
//事件类工厂:引用new方法
public static final EventFactory<LogEvent> FACTORY = LogEvent::new;
private String data;
private Instant timestamp;
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public Instant getTimestamp() {
return timestamp;
}
public void setTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "LogEvent{" +
"data='" + data + '\'' +
", timestamp=" + timestamp +
'}';
}
}
2.4 生产者
/**
* @Author: LiuShihao
* @Date: 2022/11/23 14:39
* @Desc: 生产者类:用于发布事件。
*/
public class MyProducer {
//RingBuffer
private final RingBuffer<LogEvent> ringBuffer;
//有参构造
public MyProducer(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 发布事件
* @param eventObject
*/
public void publish(LogEvent eventObject) {
boolean isPublished = ringBuffer.tryPublishEvent((event, sequence) -> {
event.setTimestamp(Instant.now());
event.setData(eventObject.getData());
});
if (!isPublished) {
System.err.println(Thread.currentThread().getName()+" - "+Thread.currentThread().getId() + " producer Failed to publish!");
}
}
}
2.5 消费者
/**
* @Author: LiuShihao
* @Date: 2022/11/23 14:39
* @Desc: 消费者类:接收事件,实现EventHandler接口
*/
public class MyConsumer implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("consumer:"+event);
Thread.sleep(3000);
}
}
2.6 启动Disruptor
Disruptor的构造参数有5个:
- EventFactory:事件工厂类,用于生产事件。
- ringBufferSize:环形缓冲区的大小,必须是2的次幂。
- threadFactory:线程工厂,用于创建线程。
- ProducerType:事件生产者策略(单线程和多线程)。
- WaitStrategy:等待策略。
通过disruptor.handleEventsWith();
方法设置消费者,方法内可以传入一个或者多个消费者。
Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(
LogEvent.FACTORY,
2,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(myConsumer);
disruptor.start();
MyProducer myProducer = new MyProducer(disruptor.getRingBuffer());
2.7 测试
/**
* @Author: LiuShihao
* @Date: 2022/11/23 14:38
* @Desc:
*/
public class Main {
public static void main(String[] args) {
MyConsumer myConsumer = new MyConsumer();
Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(
LogEvent.FACTORY,
2,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(myConsumer);
disruptor.start();
MyProducer myProducer = new MyProducer(disruptor.getRingBuffer());
for (int i = 0; i < 10; i++) {
new Thread(()->{
LogEvent logEvent = LogEvent.FACTORY.newInstance();
logEvent.setData(Thread.currentThread().getName());
myProducer.publish(logEvent);
}).start();
}
}
}
源码
https://github.com/Liu-Shihao/disruptor-demo.git
参考文章:
https://juejin.cn/post/6844904020973191181
https://juejin.cn/post/6844903976924610574
https://blog.51cto.com/u_15185289/3313032