Disruptor详解,Java高性能内存队列最优解

news2024/11/15 8:31:16

文章目录

  • 一、Disruptor介绍
    • 1、为什么要有Disruptor
    • 2、Disruptor介绍
    • 3、Disruptor的高性能设计
    • 4、RingBuffer数据结构
    • 5、等待策略
    • 6、Disruptor在日志框架中的应用
    • 7、术语
  • 二、Disruptor实战
    • 1、引入依赖
    • 2、Disruptor构造器
    • 3、入门实例
      • (1)Hello World
      • (2)单生产者单消费者模式
      • (3)单生产者多消费者模式
      • (4)多生产者多消费者模式
    • 4、场景应用
      • (1)使用EventProcessor消息处理器
      • (2)使用WorkerPool消息处理器
    • 5、复杂场景下使用

一、Disruptor介绍

1、为什么要有Disruptor

juc包下阻塞队列的缺陷:
1) juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列
2)加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
3) 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)

2、Disruptor介绍

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Github:https://github.com/LMAX-Exchange/disruptor
官方学习网站:http://ifeve.com/disruptor-getting-started/

Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

3、Disruptor的高性能设计

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构
    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。
  • 元素位置定位
    数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
  • 无锁设计
    每个生产者或者消费者线程,会通过先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。整个过程通过原子变量CAS,保证操作的线程安全。
  • 利用缓存行填充解决了伪共享的问题
  • 实现了基于事件驱动的生产者消费者模型(观察者模式)
    消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费

4、RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
在这里插入图片描述

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

5、等待策略

名称措施适用场景
BlockingWaitStrategy加锁CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy自旋通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy自旋 + yield + 自定义策略CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy自旋 + yield + sleep性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy加锁,有超时限制CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy自旋 + yield + 自旋性能和CPU资源之间有很好的折中。延迟比较均匀

6、Disruptor在日志框架中的应用

Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy,默认是Timeout策略。
在这里插入图片描述
loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue队列。

由图可见,单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

7、术语

RingBuffer: 被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。

Sequence: Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。

Sequencer: 这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。

SequenceBarrier: 由Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。

WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor。Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。

EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。

EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。 Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。

WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。

WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交 。

LifecycleAware:当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。

二、Disruptor实战

1、引入依赖

<!-- disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.4</version>
</dependency>

2、Disruptor构造器

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)

eventFactory -在环缓冲区中创建事件的工厂。
ringBufferSize -环形缓冲区的大小,必须是2的幂。
threadFactory——一个为处理器创建线程的threadFactory。
producerType——用于环形缓冲区的声明策略。
waitStrategy -用于环缓冲区的等待策略。

3、入门实例

(1)Hello World

在Disruptor中,我们想实现hello world 需要如下几步骤:
第一:建立一个Event类
第二:建立一个工厂Event类,用于创建Event类实例对象
第三:需要有一个监听事件类,用于处理数据(Event类)
第四:我们需要进行测试代码编写。实例化Disruptor实例,配置一系列参数。然后我们对Disruptor实例绑定监听事件类,接受并处理数据。
第五:在Disruptor中,真正存储数据的核心叫做RingBuffer,我们通过Disruptor实例拿到它,然后把数据生产出来,把数据加入到RingBuffer的实例对象中即可。

//1、真正要生产的对象
public class LongEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}
import com.lmax.disruptor.EventFactory;

// 2、需要让disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
public class LongEventFactory implements EventFactory {
    @Override
    public Object newInstance() {
        return new LongEvent();
    }
}
import com.lmax.disruptor.EventHandler;

//3、我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:
public class LongEventHandler implements EventHandler<LongEvent> {
     @Override
     public void onEvent(LongEvent longEvent, long l, boolean  b) throws Exception {
           //消费逻辑
           System.out.println("consumer" + longEvent.getValue());
     }
}
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 * 很明显的是:当用一个简单队列来发布事件的时候会牵涉更多的细节,这是因为事件对象还需要预先创建。
 * 发布事件最少需要两步:获取下一个事件槽并发布事件(发布事件的时候要使用try/finnally保证事件一定会被发布)。
 * 如果我们使用RingBuffer.next()获取一个事件槽,那么一定要发布对应的事件。
 * 如果不能发布事件,那么就会引起Disruptor状态的混乱。
 * 尤其是在多个事件生产者的情况下会导致事件消费者失速,从而不得不重启应用才能会恢复。
 * <B>系统名称:</B><BR>
 * <B>模块名称:</B><BR>
 * <B>中文类名:</B><BR>
 * <B>概要说明:</B><BR>
 */
//4、这是一个生产者
public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * onData用来发布事件,每调用一次就发布一次事件
     * 它的参数会用过事件传递给消费者
     */
    public void onData(ByteBuffer bb) {
        //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
            LongEvent event = ringBuffer.get(sequence);
            //3.获取要通过事件传递的业务数据
            event.setValue(bb.getLong(0));
        } finally {
            //4.发布事件,发布后才能消费
            //注意,最后的 ringBuffer.publish 方法必须包含在  finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }
}
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 5、编码
public class LongEventMain {
     public static void main(String[] args) throws Exception {
           //创建缓冲池
           ExecutorService executor =  Executors.newCachedThreadPool();
           //创建工厂
           LongEventFactory factory = new LongEventFactory();
           //创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
           int ringBufferSize = 1024 * 1024; //
           
           //创建disruptor
           //1.第一个参数为工厂类对象,用于创建一个个的LongEvent,LongEvent是实际的消费数据
           //2.第二个参数为缓冲区
           //3.第三个参数为线程池
           //4.第四个参数为ProducerType.SINGLE(表示生产者只有一个)和ProducerType.MULTY(表示有多个生产者)
           //5.第五个参数是一种策略
           /**
           //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
           WaitStrategy BLOCKING_WAIT = new  BlockingWaitStrategy();
           //SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
           WaitStrategy SLEEPING_WAIT = new  SleepingWaitStrategy();
           //YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
           WaitStrategy YIELDING_WAIT = new  YieldingWaitStrategy();
            */
           Disruptor<LongEvent> disruptor =
                     new Disruptor<LongEvent>(factory,  ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
           // 连接消费事件方法
           disruptor.handleEventsWith(new LongEventHandler());
           
           // 启动
           disruptor.start();
           
           //Disruptor 的事件发布过程是一个两阶段提交的过程:
           //发布事件
           //使用该方法获得具体存放数据的容器ringbuffer(环形结构)
           RingBuffer<LongEvent> ringBuffer =  disruptor.getRingBuffer();
           //把容器传入生产者
           LongEventProducer producer = new  LongEventProducer(ringBuffer);
           //LongEventProducerWithTranslator producer = new  LongEventProducerWithTranslator(ringBuffer);
           ByteBuffer byteBuffer = ByteBuffer.allocate(8);//传值用的,不用太在意
           for(long l = 0; l<100; l++){
                byteBuffer.putLong(0, l);
                producer.onData(byteBuffer);
                Thread.sleep(1000);
           }
           
           disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
           executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;      
     }
}
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 * Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,
 * 所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event  Translator来发布事件
 * <B>系统名称:</B><BR>
 * <B>模块名称:</B><BR>
 * <B>中文类名:</B><BR>
 * <B>概要说明:</B><BR>
 */
// 其他:生产者也可以这样写
public class LongEventProducerWithTranslator {
    //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                @Override
                public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
                    event.setValue(buffer.getLong(0));
                }
            };

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer buffer) {
        ringBuffer.publishEvent(TRANSLATOR, buffer);
    }
}

(2)单生产者单消费者模式

/**
 * 1、消息载体(事件)
 */
public class OrderEvent {

    private long value;
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

import com.lmax.disruptor.EventFactory;

/**
 * 2、事件工厂
 */
public class OrderEventFactory implements EventFactory<OrderEvent> {

    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

import com.lmax.disruptor.RingBuffer;
import com.disruptor.event.OrderEvent;

/**
 * 3、消息(事件)生产者
 */
public class OrderEventProducer {
    //事件队列
    private final RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long value,String name) {
        // 获取事件队列 的下一个槽
        long sequence = ringBuffer.next();
        try {
            //获取消息载体(事件)
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 写入消息数据
            orderEvent.setValue(value);
            orderEvent.setName(name);
        } catch (Exception e) {
            // TODO  异常处理
            e.printStackTrace();
        } finally {
            System.out.println("生产者"+ Thread.currentThread().getName()
                    +"发送数据value:"+value+",name:"+name);
            //发布事件
            ringBuffer.publish(sequence);
        }
    }
}

import com.disruptor.event.OrderEvent;
import com.lmax.disruptor.EventHandler;

/**
 * 4、消费者
 */
public class OrderEventHandler implements EventHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        // TODO 消费逻辑
        System.out.println("EventHandler 消费者"+ Thread.currentThread().getName()
                +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    }

}

import com.disruptor.consumer.OrderEventHandler;
import com.disruptor.event.OrderEvent;
import com.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;


import java.util.concurrent.Executors;

/**
 * 5、编码
 */
public class DisruptorDemo {

    public static void main(String[] args) throws Exception {

        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE, //单生产者
                new YieldingWaitStrategy()  //等待策略
        );

        //设置消费者用于处理RingBuffer的事件
        disruptor.handleEventsWith(new OrderEventHandler());
        //设置多消费者,消息会被重复消费
        //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
        //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
        //disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

        //启动disruptor
        disruptor.start();

        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        //创建生产者
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // 发送消息
        for(int i=0;i<100;i++){
            eventProducer.onData(i,"Fox"+i);
        }

        disruptor.shutdown();

    }
}

(3)单生产者多消费者模式

如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去。

//设置多消费者,消息会被重复消费
disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());

上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法。

//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

注意:消费者要实现WorkHandler接口

public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者"+ Thread.currentThread().getName()
                +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    }

    @Override
    public void onEvent(OrderEvent event) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者"+ Thread.currentThread().getName()
                +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    }
}

(4)多生产者多消费者模式

在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。

import com.disruptor.consumer.OrderEventHandler;
import com.disruptor.event.OrderEvent;
import com.disruptor.event.OrderEventFactory;
import com.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;


import java.util.concurrent.Executors;

/**
 * 5、编码
 */
public class DisruptorDemo2 {

    public static void main(String[] args) throws Exception {

        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.MULTI, //多生产者
                new YieldingWaitStrategy()  //等待策略
        );
        
        //设置消费者用于处理RingBuffer的事件
        //disruptor.handleEventsWith(new OrderEventHandler());
        //设置多消费者,消息会被重复消费
        //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
        //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

        //启动disruptor
        disruptor.start();

        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

        new Thread(()->{
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for(int i=0;i<100;i++){
                eventProducer.onData(i,"Fox"+i);
            }
        },"producer1").start();

        new Thread(()->{
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for(int i=0;i<100;i++){
                eventProducer.onData(i,"monkey"+i);
            }
        },"producer2").start();


        //disruptor.shutdown();

    }
}

4、场景应用

(1)使用EventProcessor消息处理器

import java.util.concurrent.atomic.AtomicInteger;

// 1、消息
public class Trade {

    private String id;//ID
    private String name;
    private double price;//金额
    private AtomicInteger count = new AtomicInteger(0);

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    public AtomicInteger getCount() {
        return count;
    }

    public void setCount(AtomicInteger count) {
        this.count = count;
    }
}  
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

import java.util.UUID;

//消费者
public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  //根据具体需要实现之一即可
       
    @Override  
    public void onEvent(Trade event, long sequence, boolean  endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //这里做具体的消费逻辑  
        event.setId(UUID.randomUUID().toString());//简单生成下ID
        System.out.println(event.getId());  
    }  
}  
import com.lmax.disruptor.*;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main1 {
   
     public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        /*
         * createSingleProducer创建一个单生产者的RingBuffer,
         * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。
         * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
         * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
         */  
        final RingBuffer<Trade> ringBuffer =  RingBuffer.createSingleProducer(new EventFactory<Trade>() {
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());
        
        //创建线程池  
        ExecutorService executors =  Executors.newFixedThreadPool(THREAD_NUMBERS);
        
        //创建SequenceBarrier  
        SequenceBarrier sequenceBarrier =  ringBuffer.newBarrier();
          
        //创建消息处理器  
        BatchEventProcessor<Trade> transProcessor = new  BatchEventProcessor<Trade>(
                ringBuffer, sequenceBarrier, new TradeHandler());  
          
        //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略
         ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //把消息处理器提交到线程池  
        executors.submit(transProcessor);  
        
        //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
          
        Future<?> future= executors.submit(new Callable<Void>()  {
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for(int i=0;i<10;i++){  
                    seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                     ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入  数据
                    ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
                }  
                return null;  
            }  
        });
        
        future.get();//等待生产者结束  
        Thread.sleep(1000);//等上1秒,等消费都处理完成  
        transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
        executors.shutdown();//终止线程  
    }  
}  

(2)使用WorkerPool消息处理器

import com.lmax.disruptor.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main2 {  
    public static void main(String[] args) throws  InterruptedException {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        
        EventFactory<Trade> eventFactory = new  EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade();  
            }  
        };  
        
        RingBuffer<Trade> ringBuffer =  RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
          
        SequenceBarrier sequenceBarrier =  ringBuffer.newBarrier();  
          
        ExecutorService executor =  Executors.newFixedThreadPool(THREAD_NUMBERS);  
          
        WorkHandler<Trade> handler = new TradeHandler();  
        WorkerPool<Trade> workerPool = new  WorkerPool<Trade>(ringBuffer, sequenceBarrier, new  IgnoreExceptionHandler(), handler);  
          
        workerPool.start(executor);  
          
        //下面这个生产8个数据 这里其实应该换成生产者
        for(int i=0;i<8;i++){  
            long seq=ringBuffer.next();  
            ringBuffer.get(seq).setPrice(Math.random()*9999);  
            ringBuffer.publish(seq);  
        }  
          
        Thread.sleep(1000);  
        workerPool.halt();  
        executor.shutdown();  
    }  
}  

5、复杂场景下使用

在复杂场景下使用RingBuffer(希望P1生产的数据给C1、C2并行执行,最后C1、C2执行结束后C3执行)
这种场景必须使用disruptor了。
在这里插入图片描述
在这里插入图片描述

import java.util.concurrent.atomic.AtomicInteger;

// 1、数据
public class Trade {  
	
	private String id;//ID  
	private String name;
	private double price;//金额  
	private AtomicInteger count = new AtomicInteger(0);
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	public AtomicInteger getCount() {
		return count;
	}
	public void setCount(AtomicInteger count) {
		this.count = count;
	} 
	  
	  
}  
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
// 2、生产者
public class TradePublisher implements Runnable {  
	
    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  
    
    private static int LOOP=10;//模拟百万次交易的发生  
  
    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {  
    	TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
        for(int i=0;i<LOOP;i++){  
            disruptor.publishEvent(tradeTransloator);  
        }  
        latch.countDown();  
    }  
      
}

class TradeEventTranslator implements EventTranslator<Trade>{  
    
	private Random random=new Random();  
    
	@Override  
    public void translateTo(Trade event, long sequence) {  
        this.generateTrade(event);  
    }  
    
	private Trade generateTrade(Trade trade){  
        trade.setPrice(random.nextDouble()*9999);  
        return trade;  
    }  
	
}  
// 3、以下是消费者
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler1: set name");
    	event.setName("h1");
    	Thread.sleep(1000);
    }  
}  

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler2: set price");
    	event.setPrice(17.0);
    	Thread.sleep(1000);
    }  
      
}  

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.toString());
    }  
}

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler4: get name : " + event.getName());
    	event.setName(event.getName() + "h4");
    }  
}  

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler5: get price : " + event.getPrice());
    	event.setPrice(event.getPrice() + 3.0);
    }  
}  
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 编码
public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
    	long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
        //菱形操作
        /**
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
        		disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 
        handlerGroup.then(new Handler3());
        */
        
        //顺序操作
        /**
        disruptor.handleEventsWith(new Handler1()).
        	handleEventsWith(new Handler2()).
        	handleEventsWith(new Handler3());
        */
        
        //六边形操作. 
        /**
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        */

        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1311242.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB 绘制伯德图之将幅频特性和相频特性分开绘制方法

幅频和相频特性分别在两个图窗&#xff0c;不在一起方便保存&#xff0c;无需再裁剪 clear; close all; k 1; numH 1; denH [1,k]; sysH tf(numH,denH); w logspace(-2,2);[mag, phase] bode(sysH,w);% 幅频特性 loglog(w,squeeze(mag));grid on; % 相频特性 semilogx(…

在springboot中引入参数校验

一、概要 一般我们判断前端传过来的参数&#xff0c;需要对某些值进行判断&#xff0c;是否满足条件。 而springboot相关的参数校验注解&#xff0c;可以解决我们这个问题。 二、快速开始 首先&#xff0c;我用的springboot版本是 3.1.5 引入参数校验相关依赖 <!--1…

数据之美:零售业的变革之道

数据可视化能够为零售业带来令人瞩目的变化。随着零售业务的发展&#xff0c;数据可视化成为了洞察市场、优化运营并提升客户体验的强大工具。下面我就以可视化从业者的视角出发&#xff0c;简单分析一下数据可视化为零售业可能带来的改变。 数据可视化让零售商深入了解消费者行…

邮政快递物流查询,分析筛选出提前签收件

批量查询邮政快递单号的物流信息&#xff0c;将提前签收件分析筛选出来。 所需工具&#xff1a; 一个【快递批量查询高手】软件 邮政快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;并登录 步骤2&#xff1a;点击主界面左上角…

从 AST 到代码生成:代码背后的秘密花园(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

JS基础之执行上下文

JS基础之执行上下文 执行上下文顺序执行可执行代码执行上下文栈回顾上文 执行上下文 顺序执行 写个JavaScript的开发者都会有个直观的印象&#xff0c;那就是顺序执行&#xff1a; var foo function(){console.log(foo1) } foo(); //foo1 var foo function(){console.log(…

ES分词查询

全文检索介绍 全文检索的发展过程&#xff1a; 数据库使用SQL语句&#xff1a;select * from table where data like “%检索内容%”出现lucene全文检索工具&#xff08;缺点&#xff1a;暴露的接口相对复杂&#xff0c;且没有效率&#xff09;出现分布式检索服务框架solr&am…

Unity 控制刚体的移动与旋转的方法

在场景创建一个Cube,并添加刚体&#xff0c;如图&#xff1a; 编写脚本&#xff1a; using System.Collections; using System.Collections.Generic; using UnityEngine;[RequireComponent(typeof(Rigidbody))] public class RibRotate : MonoBehaviour {//private Vector3 mo…

计算机如何看待内存

计算机如何看待内存&#xff1b; 对象在内存中如何表示&#xff0c;如何操纵对象&#xff1b;

国产数据库适配-达梦(DM)

1、通用性 达梦数据库管理系统兼容多种硬件体系&#xff0c;可运行于X86、X64、SPARC、POWER等硬件体系之上。DM各种平台上的数据存储结构和消息通信结构完全一致&#xff0c;使得DM各种组件在不同的硬件平台上具有一致的使用特性。 达梦数据库管理系统产品实现了平台无关性&…

智能优化算法应用:基于蝠鲼觅食算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于蝠鲼觅食算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于蝠鲼觅食算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.蝠鲼觅食算法4.实验参数设定5.算法结果6.…

Knowledge Distillation from A Stronger Teacher(NeurIPS 2022)论文解读

paper&#xff1a;Knowledge Distillation from A Stronger Teacher official implementation&#xff1a;https://github.com/hunto/dist_kd 前言 知识蒸馏通过将教师的知识传递给学生来增强学生模型的性能&#xff0c;我们自然会想到&#xff0c;是否教师的性能越强&…

vue的slot插槽详解

目录 一、基本用法 在上面的例子中&#xff0c;我们在子组件中定义了一个插槽&#xff0c;然后在父组件中使用标签&#xff0c;并在标签内部放置了一个 标签作为插槽的内容。当父组件被渲染时&#xff0c;插槽的内容将被替换为实际传入的内容。 二、具名插槽 在上面的例子…

Java集合--Map

1、Map集合概述 在Java的集合框架中&#xff0c;Map为双列集合&#xff0c;在Map中的元素是成对以<K,V>键值对的形式存在的&#xff0c;通过键可以找对所对应的值。Map接口有许多的实现类&#xff0c;各自都具有不同的性能和用途。常用的Map接口实现类有HashMap、Hashtab…

初识GroovyShell

文章目录 前言一、GroovyShell二、maven三、解决方案四、关键代码4.1 数据库配置表(pg)4.2 入参4.3 分页查询 总结 前言 项目背景&#xff1a;查询多个表的数据列表和详情&#xff0c;但不想创建过多的po、dao、resp等项目文件。 一、GroovyShell Apache Groovy是一种强大的…

关于ctf反序列化题的一些见解([MRCTF2020]Ezpop以及[NISACTF 2022]babyserialize)

这里对php反序列化做简单了解 在PHP中&#xff0c;序列化用于存储或传递 PHP 的值的过程中&#xff0c;同时不丢失其类型和结构。 serialize&#xff08;&#xff09; 函数序列化对象后&#xff0c;可以很方便的将它传递给其他需要它的地方&#xff0c;且其类型和结构不会改变…

Python FuckIt模块:代码的“不死鸟”

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在编程世界中&#xff0c;每个开发者都曾遇到过代码中的错误&#xff0c;有时这些错误可能让人崩溃。但是&#xff0c;有一天&#xff0c;听说了一个叫做"FuckIt"的模块&#xff0c;它声称可以帮助摆脱…

ASP.NET Core 8 在 Windows 上各种部署模型的性能测试

ASP.NET Core 8 在 Windows 上各种部署模型的性能测试 我们知道 Asp.net Core 在 windows 服务器上部署的方案有 4 种之多。这些部署方案对性能的影响一直以来都是靠经验。比如如果是部署在 IIS 下&#xff0c;那么 In Process 会比 Out Process 快&#xff1b;如果是 Self Hos…

计算机操作系统-第十六天

目录 线程的实现方式 用户级线程 内核级线程 多线程模型 一对一模型 多对多模型 多对多模型 本节思维导图 线程的实现方式 用户级线程 历史背景&#xff1a;早期操作系统只支持进程&#xff0c;不支持线程&#xff0c;当时的线程是由线程库实现的 本质&#xff1a;从…

zabbix简单介绍2

学习目标: 能够实现一个web页面的监测能够实现自动发现远程linux主机能够通过动作在发现主机后自动添加主机并链接模板能够创建一个模版并添加相应的元素(监控项,图形,触发器等)能够将主机或模板的配置实现导出和导入能够实现至少一种报警方式(邮件,微信等)能够通过zabbix_pro…