文章目录
一、Disruptor介绍
1、为什么要有Disruptor
juc包下阻塞队列的缺陷:
1) juc下的队列大部分采用加ReentrantLock锁
方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列
。
2)加锁的方式通常会严重影响性能
。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
3) 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)
。
2、Disruptor介绍
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
Github:https://github.com/LMAX-Exchange/disruptor
官方学习网站:http://ifeve.com/disruptor-getting-started/
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
3、Disruptor的高性能设计
Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。- 元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。 无锁设计
每个生产者或者消费者线程,会通过先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。整个过程通过原子变量CAS,保证操作的线程安全。利用缓存行填充解决了伪共享的问题
- 实现了基于事件驱动的生产者消费者模型(观察者模式)
消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费
4、RingBuffer数据结构
使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组
。除数组外还有一个序列号(sequence),用以指向下一个可用的元素
,供生产者与消费者使用。原理图如下所示:
- Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
- 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
5、等待策略
名称 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU资源之间有很好的折中。延迟比较均匀 |
6、Disruptor在日志框架中的应用
Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优
。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy,默认是Timeout策略。
loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue队列。
由图可见,单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。
7、术语
RingBuffer: 被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。
Sequence: Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。
Sequencer: 这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。
SequenceBarrier: 由Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。
WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor。Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。
EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。 Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。
WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。
WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交 。
LifecycleAware:当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。
二、Disruptor实战
1、引入依赖
<!-- disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
2、Disruptor构造器
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
eventFactory -在环缓冲区中创建事件的工厂。
ringBufferSize -环形缓冲区的大小,必须是2的幂。
threadFactory——一个为处理器创建线程的threadFactory。
producerType——用于环形缓冲区的声明策略。
waitStrategy -用于环缓冲区的等待策略。
3、入门实例
(1)Hello World
在Disruptor中,我们想实现hello world 需要如下几步骤:
第一:建立一个Event类
第二:建立一个工厂Event类,用于创建Event类实例对象
第三:需要有一个监听事件类,用于处理数据(Event类)
第四:我们需要进行测试代码编写。实例化Disruptor实例,配置一系列参数。然后我们对Disruptor实例绑定监听事件类,接受并处理数据。
第五:在Disruptor中,真正存储数据的核心叫做RingBuffer,我们通过Disruptor实例拿到它,然后把数据生产出来,把数据加入到RingBuffer的实例对象中即可。
//1、真正要生产的对象
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
import com.lmax.disruptor.EventFactory;
// 2、需要让disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
public class LongEventFactory implements EventFactory {
@Override
public Object newInstance() {
return new LongEvent();
}
}
import com.lmax.disruptor.EventHandler;
//3、我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
//消费逻辑
System.out.println("consumer" + longEvent.getValue());
}
}
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
/**
* 很明显的是:当用一个简单队列来发布事件的时候会牵涉更多的细节,这是因为事件对象还需要预先创建。
* 发布事件最少需要两步:获取下一个事件槽并发布事件(发布事件的时候要使用try/finnally保证事件一定会被发布)。
* 如果我们使用RingBuffer.next()获取一个事件槽,那么一定要发布对应的事件。
* 如果不能发布事件,那么就会引起Disruptor状态的混乱。
* 尤其是在多个事件生产者的情况下会导致事件消费者失速,从而不得不重启应用才能会恢复。
* <B>系统名称:</B><BR>
* <B>模块名称:</B><BR>
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
*/
//4、这是一个生产者
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* onData用来发布事件,每调用一次就发布一次事件
* 它的参数会用过事件传递给消费者
*/
public void onData(ByteBuffer bb) {
//1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
LongEvent event = ringBuffer.get(sequence);
//3.获取要通过事件传递的业务数据
event.setValue(bb.getLong(0));
} finally {
//4.发布事件,发布后才能消费
//注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 5、编码
public class LongEventMain {
public static void main(String[] args) throws Exception {
//创建缓冲池
ExecutorService executor = Executors.newCachedThreadPool();
//创建工厂
LongEventFactory factory = new LongEventFactory();
//创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
int ringBufferSize = 1024 * 1024; //
//创建disruptor
//1.第一个参数为工厂类对象,用于创建一个个的LongEvent,LongEvent是实际的消费数据
//2.第二个参数为缓冲区
//3.第三个参数为线程池
//4.第四个参数为ProducerType.SINGLE(表示生产者只有一个)和ProducerType.MULTY(表示有多个生产者)
//5.第五个参数是一种策略
/**
//BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
Disruptor<LongEvent> disruptor =
new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 连接消费事件方法
disruptor.handleEventsWith(new LongEventHandler());
// 启动
disruptor.start();
//Disruptor 的事件发布过程是一个两阶段提交的过程:
//发布事件
//使用该方法获得具体存放数据的容器ringbuffer(环形结构)
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//把容器传入生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);//传值用的,不用太在意
for(long l = 0; l<100; l++){
byteBuffer.putLong(0, l);
producer.onData(byteBuffer);
Thread.sleep(1000);
}
disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
}
}
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
/**
* Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,
* 所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件
* <B>系统名称:</B><BR>
* <B>模块名称:</B><BR>
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
*/
// 其他:生产者也可以这样写
public class LongEventProducerWithTranslator {
//一个translator可以看做一个事件初始化器,publicEvent方法会调用它
//填充Event
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
event.setValue(buffer.getLong(0));
}
};
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer) {
ringBuffer.publishEvent(TRANSLATOR, buffer);
}
}
(2)单生产者单消费者模式
/**
* 1、消息载体(事件)
*/
public class OrderEvent {
private long value;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
import com.lmax.disruptor.EventFactory;
/**
* 2、事件工厂
*/
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
import com.lmax.disruptor.RingBuffer;
import com.disruptor.event.OrderEvent;
/**
* 3、消息(事件)生产者
*/
public class OrderEventProducer {
//事件队列
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long value,String name) {
// 获取事件队列 的下一个槽
long sequence = ringBuffer.next();
try {
//获取消息载体(事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入消息数据
orderEvent.setValue(value);
orderEvent.setName(name);
} catch (Exception e) {
// TODO 异常处理
e.printStackTrace();
} finally {
System.out.println("生产者"+ Thread.currentThread().getName()
+"发送数据value:"+value+",name:"+name);
//发布事件
ringBuffer.publish(sequence);
}
}
}
import com.disruptor.event.OrderEvent;
import com.lmax.disruptor.EventHandler;
/**
* 4、消费者
*/
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("EventHandler 消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
}
import com.disruptor.consumer.OrderEventHandler;
import com.disruptor.event.OrderEvent;
import com.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executors;
/**
* 5、编码
*/
public class DisruptorDemo {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE, //单生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
//disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
disruptor.shutdown();
}
}
(3)单生产者多消费者模式
如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去。
//设置多消费者,消息会被重复消费
disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法。
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
注意:消费者要实现WorkHandler接口
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
@Override
public void onEvent(OrderEvent event) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
}
(4)多生产者多消费者模式
在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。
import com.disruptor.consumer.OrderEventHandler;
import com.disruptor.event.OrderEvent;
import com.disruptor.event.OrderEventFactory;
import com.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executors;
/**
* 5、编码
*/
public class DisruptorDemo2 {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.MULTI, //多生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
//disruptor.handleEventsWith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
},"producer1").start();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"monkey"+i);
}
},"producer2").start();
//disruptor.shutdown();
}
}
4、场景应用
(1)使用EventProcessor消息处理器
import java.util.concurrent.atomic.AtomicInteger;
// 1、消息
public class Trade {
private String id;//ID
private String name;
private double price;//金额
private AtomicInteger count = new AtomicInteger(0);
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import java.util.UUID;
//消费者
public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { //根据具体需要实现之一即可
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
//这里做具体的消费逻辑
event.setId(UUID.randomUUID().toString());//简单生成下ID
System.out.println(event.getId());
}
}
import com.lmax.disruptor.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main1 {
public static void main(String[] args) throws Exception {
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
/*
* createSingleProducer创建一个单生产者的RingBuffer,
* 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。
* 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
* 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
*/
final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, BUFFER_SIZE, new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());
//这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类
Future<?> future= executors.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
long seq;
for(int i=0;i<10;i++){
seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
}
return null;
}
});
future.get();//等待生产者结束
Thread.sleep(1000);//等上1秒,等消费都处理完成
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
executors.shutdown();//终止线程
}
}
(2)使用WorkerPool消息处理器
import com.lmax.disruptor.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main2 {
public static void main(String[] args) throws InterruptedException {
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
EventFactory<Trade> eventFactory = new EventFactory<Trade>() {
public Trade newInstance() {
return new Trade();
}
};
RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);
WorkHandler<Trade> handler = new TradeHandler();
WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);
workerPool.start(executor);
//下面这个生产8个数据 这里其实应该换成生产者
for(int i=0;i<8;i++){
long seq=ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
Thread.sleep(1000);
workerPool.halt();
executor.shutdown();
}
}
5、复杂场景下使用
在复杂场景下使用RingBuffer(希望P1生产的数据给C1、C2并行执行,最后C1、C2执行结束后C3执行)
这种场景必须使用disruptor了。
import java.util.concurrent.atomic.AtomicInteger;
// 1、数据
public class Trade {
private String id;//ID
private String name;
private double price;//金额
private AtomicInteger count = new AtomicInteger(0);
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
// 2、生产者
public class TradePublisher implements Runnable {
Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int LOOP=10;//模拟百万次交易的发生
public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {
this.disruptor=disruptor;
this.latch=latch;
}
@Override
public void run() {
TradeEventTranslator tradeTransloator = new TradeEventTranslator();
for(int i=0;i<LOOP;i++){
disruptor.publishEvent(tradeTransloator);
}
latch.countDown();
}
}
class TradeEventTranslator implements EventTranslator<Trade>{
private Random random=new Random();
@Override
public void translateTo(Trade event, long sequence) {
this.generateTrade(event);
}
private Trade generateTrade(Trade trade){
trade.setPrice(random.nextDouble()*9999);
return trade;
}
}
// 3、以下是消费者
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
System.out.println("handler1: set name");
event.setName("h1");
Thread.sleep(1000);
}
}
import com.lmax.disruptor.EventHandler;
public class Handler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler2: set price");
event.setPrice(17.0);
Thread.sleep(1000);
}
}
import com.lmax.disruptor.EventHandler;
public class Handler3 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString());
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
System.out.println("handler4: get name : " + event.getName());
event.setName(event.getName() + "h4");
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
System.out.println("handler5: get price : " + event.getPrice());
event.setPrice(event.getPrice() + 3.0);
}
}
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 编码
public class Main {
public static void main(String[] args) throws InterruptedException {
long beginTime=System.currentTimeMillis();
int bufferSize=1024;
ExecutorService executor=Executors.newFixedThreadPool(8);
Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
//菱形操作
/**
//使用disruptor创建消费者组C1,C2
EventHandlerGroup<Trade> handlerGroup =
disruptor.handleEventsWith(new Handler1(), new Handler2());
//声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3
handlerGroup.then(new Handler3());
*/
//顺序操作
/**
disruptor.handleEventsWith(new Handler1()).
handleEventsWith(new Handler2()).
handleEventsWith(new Handler3());
*/
//六边形操作.
/**
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h2);
disruptor.after(h1).handleEventsWith(h4);
disruptor.after(h2).handleEventsWith(h5);
disruptor.after(h4, h5).handleEventsWith(h3);
*/
disruptor.start();//启动
CountDownLatch latch=new CountDownLatch(1);
//生产者准备
executor.submit(new TradePublisher(latch, disruptor));
latch.await();//等待生产者完事.
disruptor.shutdown();
executor.shutdown();
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
}
}