Disruptor 是苹国外厂本易公司LMAX开发的一个高件能列,研发的初夷是解决内存队列的延识问顾在性能测试中发现竟然与10操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCn演讲后,获得了业界关注,201年,企业应用软件专家Martin Fower专门撰写长文介绍。同年它还获得了Oradle官方的Duke大奖。目前,包括Apache StomCame、 L0g4 2在内的很多知名项目都应用了Disrupior以获取高性能。注意,这里所说的队列是系统内部的内存队列,而不是Kaka这样的分布式队列。
前两篇介绍了Disruptor,【数据结构】Disruptor环形数组无锁并发框架阅读_wenchun001的博客-CSDN博客
【并发编程】ShenyuAdmin里面数据同步用到的无锁环形队列LMAX Disruptor并发框架_wenchun001的博客-CSDN博客
今天开始依次从引用包到编码步骤说明如下
引用依赖
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>
1.构建消息载体(事件)
单生产者单消费者模式 1,创建Event(消息载体/事件)和EventFactory (事件工厂) 2,创建 OrderEvent类,这个类将会被放入环形队列中作为消息内容。创建OrderEventFactory类,用于创建OrderEvent 事件
@Data
public class OrderEvent2 {
private long value;
private String name:
}
public class OrderEventFactory implements EventFactory<orderEvent> {
@Override
public OrderEvent newInstance()
{
return new OrderEvent();
}
}
2.构建生产者
创建 OrderEventProducer 类,它将作为生产者使 用
public class OrderEventProducer {
//事件队列
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer)( this,ringBuffer =ringBuffer;}
public void onData(long value,String name) {
// 获取事件队列 的下一个槽
long sequence = ringBuffer.next();
try {
//获取消息 (事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入消息数据
orderEvent.setValue(value):
orderEvent.setName(name);
}catch (Exception e){
//异常
}finally {
//发布事件
rringBuffer.publish(sequence);
}
}
}
3.构建消费者
4.生产消息,消费消息的测试
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory()ringBufferSize:124 * 124
Executors.defaultThreadFactory(), ProducerType.SINGLE,//单生产者
new YieldingwaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
disruptor.handleEventswith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventswith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者 消费者要实现workHandLer接口,一条消息只会被一个消费者消费
//disruptor.handleEventsWithworkerPool(new OrderEventHandler(), new OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 发送消息
for(int i=0;i<10;i++){
eventProduceronData(i, "消息"+1);
}
disruptor.shutdown();
}
多生产者的案例
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory()ringBufferSize:124 * 124
Executors.defaultThreadFactory(), ProducerType.MULIT,//多生产者
new YieldingwaitStrategy() //等待策略
);
消费者优先级模式
在实际场景中,我们通常会因为业务逻而形成一条消费链,比如一个消息必须由 消费者A->消费者B->消费者C 的顺序依次进行消费。在配置消费者时,可以通过.then 方法去实现顺序消费。
I disruptor.handleEventswith(new OrderEventHandler())
then(new OrderEventHandler())
then(new OrderEventHandler());
handleEventsWith 与 handleEventsWithworkerPool 都是支持hen 的,它们可以结合使用。比可以按照消费者A 消费者B 消费者C)->消费者D 的消费项序
1 disruptor.handleEventswith(new OrderEventHandler())
thenHandleEventsWithworkerPool(new OrderEventHandler(), new OrderEventHandler())
then(new OrderEventHandler());