一 disruptor为什么快的核心原理
- 属性填充:通过添加额外的无用信息,避免伪共享问题
- 什么是共享内存
- 在系统内存中,我们的数据存在于cpu缓存中,cpu缓存的基础缓存单位为 cache line,通常cache line的大小为64字节,每个cache line指向一个主内存的地址,已知java中一个long类型占用内存8byte 那么一个缓存行就可以缓存8个long类型值,当我们访问其中一个值时,cpu会将当前缓存行都加载到内存中,此时我们可以快速的访问其他7个long类型数据不用在cpu缓存中重新查找,这就是共享内存。
- 什么是cache line
- 为了解决计算机系统中主内存与cpu之间运行速度差问题,会在cpu与主内存之间添加一级或多级高速缓冲存储器(cache)。这个cache一般是被集成在cpu内部,所以也叫cpu cache
- cache内部按行存储,同时缓存行也是cpu跟主内存交换数据的基本单位
- 什么是伪共享
- 当我们要访问某一变量时,cpu首先会查看cache内部是否有缓存,如果有则读取变量所在缓存行数据,然后对其进行读写操作,而每个缓存行只能同时被一个线程读写,此时如果有其他线程也对此缓存行进行读写,则会大大降低缓存行性能,这个并发对缓存行的读写行为称为伪共享。
- 如何避免伪共享产生
- 让每个变量独占一个缓存行,对空余位置进行填充
- 已知每个缓存行长度为64,每个long类型占8位,那么在存储long类型元素同时存储7个其他无用的long类型,达到让变量独占一行的目的
- disruptor 代码
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } public class Sequence extends RhsPadding{ static final long INITIAL_VALUE=-1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE= Util.getUnsafe(); try { VALUE_OFFSET=UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); }catch (final Exception e){ throw new RuntimeException(e); } } public Sequence(){ this(INITIAL_VALUE); } public Sequence(final long initialValue){ UNSAFE.putOrderedLong(this,VALUE_OFFSET,initialValue); } }
public class VolatileData extends RhsPadding { // 占用 8个字节 +48 + 对象头 = 64字节 //需要操作的数据 volatile long value; public VolatileData() { } public VolatileData(long defValue) { value = defValue; } public long accumulationAdd() { //因为单线程操作不需要加锁 return ++value; } public long getValue() { return value; } }
- 在disruptor中有一个重要的成员sequece,在生产者和消费者中都有一个独立的sequence,它在ringbuffer中标识着写入或消费位置。
- 什么是共享内存
- 无锁的设计:采用CAS无锁方式,保证线程的安全性
- 锁机制产生的问题
- 多线程对临界资源的锁标记获取会对cpu资源产生过多的占用。
- 加锁/解锁/唤醒等待锁线程/阻塞 等过程中会导致上下文切换和调度延迟,尤其是在进行上下文切换时会导致cpu缓存的指令和数据失效需要重新加载,此操作会导致性能问题的产生。
- 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致导致优先级反转,引起性能问题。
- cas
- 如果目标引用值=预期值,则将值修改为目标值
-
if (a = 1) { a = b; return b; } else { return a; }
- disruptor的无锁设计
- 生产者多线程do while 获取下一个可用地址,如果不可用则重新进行获取。
-
do{ current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); //next 类比于ArrayBlockQueue的数组索引index return next;
- 锁机制产生的问题
- 引入环形的数组结构:数组元素不会被回收,避免频繁的GC
- 什么是环形数组
- 通过下标访问数组内元素,当下标达到数组最大值是将下标归0,重新开始新一轮
- 为什么使用环形数组
- 可以有效避免垃圾回收,降低对象回收和重新分配内存过程产生的消耗
- 数组通过下标访问数据,访问效率更高
- 数组数据存在于一块连续的内存地址中,访问更高效
- 对数组中数据采用数据覆盖的方式,避免jvm执行gc
- 什么是环形数组
- 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行递增
二 等待策略
Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升
- blockingWaitStrategy
- 默认策略,在blockingWaitStrategy内部使用锁和condition来控制线程的唤醒。
- 它是最低效的策略,但其对cpu的消耗最小,并且在各种不同环境部署时,能提供更加一致性的性能表现。
- sleepingwaitStrategy
- 它的性能表现跟BlockingWaitStrategy差不多,对cpu的消耗差不多,但其对生产者线程的影响最小,通过使用lockSupport.parkNanos(1)实现循环等待,适合用于异步日志蕾丝的场景。
- yieldingWaitStrategy
- 它是可以使用在低延迟系统的策略之一,它将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yieId() 以允许其他排队的线程巡行。在要求极高性能且事件处理数小于cpu逻辑核心数的场景中,推荐使用此策略
- busySpinWaitStrategy
- 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于cpu逻辑核心数的场景中,推荐使用此策略
- phasedBackoffWaitStrategy
- 自旋+yieId+自定义策略,适合用于cpu资源紧缺,吞吐量和延迟不重要的场景。
三 Disruptor的工作原理。
Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上面的seq),那么这个是如何保证生产的消息不会覆盖没有消费掉的消息呢。
在Disruptor中生产者分为单生产者和多生产者,而消费者并没有区分。
单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费的位置,并进行消费。而多生产者时候,又多出了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。
在多生产者中,每个生产者首先通过CAS竞争获取可以写的空间,然后再进行慢慢往里放数据,如果正好这个时候消费者要消费数据,那么每个消费者都需要获取最大可消费的下标,这个下标是在AvailableBuffer进行获取得到的最长连续的序列下标
例:当前我要写入2个数据,那么在ringbuffer中判断当前next后面是否有两个位置(通过cas判断),如果有则将next后移两位,当前两个数据位置由当前这个线程使用,其他线程从next后面在进行获取可写入位置。