MpscArrayQueue是一个固定大小的环形数组队列,继承自ConcurrentCircularArrayQueue
MpscArrayQueue的特点:
- 环形队列
- 底层数据结构为数组
- 有界
看一下MpscArrayQueue的属性(填充类除外)---
//生产者索引
private volatile long producerIndex;
//生产者边界
private volatile long producerLimit;
//消费者索引
private volatile long consumerIndex;
//继承自父类属性
//下标运算符
protected final long mask;
//消息存储数组
protected final E[] buffer;
首先,系统理解一下MpscArrayQueue的实现,MpscArrayQueue是一个循环数组队列,支持多生产者并发提交消息,重点依赖三个参数----pIndex,cIndex,producerLimit,看一下这三个参数在MpscArrayQueue中的作用,然后一步步深入他们是如何保证Mpsc运行机制的--
上图是普通情况下,cIndex,pIndex和producerlimit的关系,如图可以很容易得理解这三个参数的意义-
- cIndex表示消费者的消费索引,当消费者调用poll()方法出队的时候,获取到的便是cIndex索引指向的消息元素
- pIndex表示的是生产者的生产索引,当生产者调用offer方法入队的时候,将新的消息写入到pIndex指向的位置;
- producerlimit表示的队列边界,或者说pIndex和producerlimit之间表示的是队列空白位置,而cIndex和pIndex之间表示的是队列已经写入的位置
- 他们三者的关系也就很清晰了---cIndex<pIndex,pIndex<producerlimit----但是这里要注意的是,这三者的关系是在有界数组中的位置关系,而实际上MpscArrayQueue使用的是循环数组,所以,这三者的关系重新定义为:cIndex在pIndex位置之前,pIndex 在producerlimit位置之前,而producerlimit一般位于cIndex的前一个位置;
- 这里重点是为了解释逻辑上他们三者之间的位置关系,并非实际实现上的数组Index大小关系,另外,由于MpscArrayQueue的无锁实现,实际上在多线程并发的情况下有可能会短暂的出现这三者逻辑位置错误的情况,但是MpscArrayQueue通过自旋和内存屏障保证了最终数据一致性,以及生产消费之间的逻辑正确性,具体实现继续往下看;
由上图我们大概可以理解MpscArrayQueue的工作机制:
- 初始化时,pIndex指向数组头,也就是array[0]的位置,而producerlimit指向数组尾的位置----array[length-1],而cIndex可以理解为空,因为此时队列中没有消息数据;
- 生产消息---每次生产者提交消息的时候先获取当前pIndex的值,判断pIndex是否大于producerlimit,
- 如果pIndex小于等于producerlimit,则表明当前队列中还有空闲位置,则在pIndex的位置上写入新消息,并将pIndex自增;
- 如果此时pIndex大于producerlimit则说明队列已满,返回错误即可
- 消费消息----这里消费者先获取Index的值,并将cindex加一,然后判断是否大于pIndex
- 如果小于等于pIndex,则说明该位置上存在可消费的消息,则获取该消息的值即可
- 如果大于pIndex,说明消息队列此时是空的,返回错误即可
- 数组循环----数组循环主要体现在对producerlimit的维护上,主要在于一点:
- 当producerlimit到达数组尾的时候,这时候要判断数组头到cIndex之间是否有空闲位置,这个逻辑在概念上其实很好理解-----也就是说,每次消费者消费之后,要想办法及时将producerlimit的位置更新到cIndex的前一个位置,从而保证了Index可以循环使用数组中的空闲位置,如图--
当消费者对val1出队之后,此时producerlimit应该指向原来val1 的位置,也就是数组的头,从而保证生产者在数组满的时候可以从数组头的空闲位置生产消息;
上边主要从概念上解释了循环数组队列的生产消费实现,以及cIndex,pIndex,producerlimit这三个参数之间的逻辑关系,下边看一下MpscArrayQueue具体是怎么实现队列循环以及保证多生产者线程并发安全的--
先看一下三个参数的初始化----由于填充和继承的关系,略去了无关代码,只体现具体初始化的逻辑
- producerlimit初始化为数组容量capacity
- cIndex和pIndex初始化为0----没有显式初始化赋值;
然后看一下offer方法---
- 首先获取mask和producerlimit的值,mask可以理解为一个固定值,在队列初始化的时候就指定了mask值的大小,主要用来计算位移偏移量和producerlimit的值,这里的数学逻辑稍后解释
然后进入do-while循环,注意这里是do-while,先判断producerlimit和pIndex的大小关系,然后更新producerlimit的值,while循环的条件是cas自旋更新抢占pIndex的值是否正确看一下代码
---
do
{//注意这里的do-While循环,循环体内主要获取并更新producerlimit,然后cas自旋获取pIndex锁,获取成功后再一次更新producerlimit;
//这也是官方代码设计巧妙的一点,正常逻辑在生产者提交消息之前首先抢占PIndex的索引即可,但是这里更新producerlimit保证了在抢占pindex前pindex是有效的
//获取producer index
pIndex = lvProducerIndex();
if (pIndex >= producerLimit)
{
//获取consumer index
final long cIndex = lvConsumerIndex();
//计算producer limit
producerLimit = cIndex + mask + 1;
if (pIndex >= producerLimit)
{
return false;
}
else
{
//将生产者限制更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的
//这里的意思是---上边的if判断成运行到这里之后,可能有其他线程在此处并发修改了producerlimit,所以这里producerlimit的set是有竞争的,是线程不安全的,
// 但是这种不安全最终对消息的提交和生产不会造成并发错误---这里有一个非常巧妙的设计,下边解释里会详细解释
soProducerLimit(producerLimit);//这里对producerlimit的修改也是lazyset---底层调用了Unsafe类的putLongVolatile()方法
}
}
}
while (!casProducerIndex(pIndex, pIndex + 1));
这个循环可以理解为循环获取最新pIndex和producerlimit的值,然后比较二者的大小,最后cas抢占pIndex索引位置;当然producerlimit计算出新值之后要lazyset写回,这里有两点需要注意--
- 为什么使用do-while循环----这里主要是为了保证在抢占索引之前,这个索引是有效的,即pIndex<=producerlimit;否则就发生错误了,正常的while循环逻辑--
while(true){//死循环,自旋尝试抢占pIndex pIndex = lvProducerIndex(); if (pIndex >= producerLimit) { final long cIndex = lvConsumerIndex(); producerLimit = cIndex + mask + 1; if (pIndex >= producerLimit) { return false; } else { soProducerLimit(producerLimit); } } if(casProducerIndex(pIndex, pIndex + 1)) break; }
- 为什么第一次判断pIndex>=producerlimit之后还要重新判断一次而不是直接退出循环----这里就是MpscArrayQueue的核心点之一----多生产者的实现了,非常关键!!!!(另外一个核心点就是循环数组的实现)
- 首先看一下第一次判断的producerlimit来自哪里----第一次producerlimit是在循环开始之前获取到的值,在循环开始之后判断第一次pIndex>=producerlimit直接退出是没有问题的---逻辑上不会产生错误,此时代码应该是这样子的--
long producerLimit = lvProducerLimit(); while(true){//死循环,自旋尝试抢占pIndex //注意这里pIndex必须在循环内更新,每次抢占失败说明pIndex被其他线程更新了,所以要重新获取pIndex; pIndex = lvProducerIndex(); if (pIndex >= producerLimit) { return false; } if(casProducerIndex(pIndex, pIndex + 1)) break; }
- 这个逻辑下,假设循环开始的时候producerlimit的值是10,pIndex第一次获取到的值是5,这时有两种情况--
- 在pIndex为8的时候抢占成功了,开始执行数组写入的逻辑,没有问题,
- 在pIndex为10的时候都没有抢占陈工,也就是走了return false的逻辑,生产者此次提交消息失败了,只能等待下次提交;
- 但是这里有个问题就是,在cas自旋的时候,producerlimit的值可能会更新(这里先不考虑在哪里更新的问题),所以当pIndex从5抢占到10这个过程中,上面的逻辑没有办法及时获取到最新的producerlimit的值,只能用循环开始前获取到的producerlimit的值做判断---这里逻辑上是没有问题的,producerlimit只会往后移,我们拿到的值不会产生错误;
- 所以为了解决3中的问题就应该将producerlimit每次在循环内更新,所以代码变成了这样--
while(true){//死循环,自旋尝试抢占pIndex //注意这里pIndex必须在循环内更新,每次抢占失败说明pIndex被其他线程更新了,所以要重新获取pIndex; pIndex = lvProducerIndex(); //重新计算producerlimit的值 final long cIndex = lvConsumerIndex(); producerLimit = cIndex + mask + 1; if (pIndex >= producerLimit) { return false; } soProducerLimit(producerLimit); if(casProducerIndex(pIndex, pIndex + 1)) break; }
- 这样的话逻辑上就完全没有问题了,每次cas自旋都能获取到最新的pIndex和producerlimit的值,判断队列是否满了,队列没满的情况下就写入;否则失败;
- 但是这样处理的话会有另一个问题---性能问题,每次pIndex和producerlimit判断的时候都要重新计算producer的值,而producerlimit是volatile修饰的,读写都会加上很多内存屏障,另外producerlimit本身表示的是队列可写容量,在可写容量没满的情况下我们其实不需要获取或者更新最新的producerlimit的值,比如在上述情况1中,当cas自旋到8的时候就抢占成功了,这时我们保证了8是可以用的(producerlimit只会后移),但实际上producerlimit更新到了15还是20本次消息提交是不需要关心的,为此做的内存操作显然是没有太大必要的,我们只需要一开始的一个缓存值就可以了;为了保证更新的问题,只有当当前线程一直到producerlimit的时候都没有对pIndex抢占成功才需要重新计算producerlimit,然后判断是否需要继续自旋;
- 所以问题又回到了MpscArrayQueue中的实现---一开始获取一个producerlimit的快照,当快照容量内的pIndex都抢占失败之后才获取计算新的producerlimit的值,同时更新这个新的producerlimit的值-----这样一来,每个生产者对producerlimit的值的获取和更新都变成了懒加载机制----真的很巧妙的实现!!!!
- 这里再采用一下官方的注释--
"使用consumer index的缓存视图,可能在循环中更新"
- 首先看一下第一次判断的producerlimit来自哪里----第一次producerlimit是在循环开始之前获取到的值,在循环开始之后判断第一次pIndex>=producerlimit直接退出是没有问题的---逻辑上不会产生错误,此时代码应该是这样子的--
- 然后再看看关于producerlimit的更新的问题----producerlimit真的是一直后移的吗?这里直接贴出来官方注释------"将producerlimit更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的"
- 这里其实说明了确实会产生竞争,也就是说多个生产者线程并发更新producerlimit会产生不一致问题,怎么理解,比如两个线程A和B都拿到了producerlimit的快照值,且都在快容量内没有抢占pIndex成功,除了线程AB之外还有其他线程抢占pIndex,此时AheB都会计算新的producerlimit值,先看一下producerlimit的计算逻辑--
final long cIndex = lvConsumerIndex(); producerLimit = cIndex + mask + 1;
可以看出来producerlimit的值依赖于mask和cIndex的值,而mask的值在队列初始化的时候指定并在运行过程中不会改变,唯一会改变的值是cIndex,这里我们认为volatile修饰的cIndex值的修改对A和B线程都是立即可见的,A和B会拿到真实的cIndex的值,另外假设A和B获取cIndex的中间cIndex的值发生了变化,最终导致A和B拿到的cIndex的值是递增的,最后计算出来的producerlimit的值当然也是递增的,这里假设A计算出来的producerlimit的值是10,B计算出来的producerlimit的值是11,再看一下producerlimit的写回逻辑--
soProducerLimit(producerLimit);
这里-底层调用了Unsafe类的putLongVolatile()方法,也就是lazyset写回(其实与lazyset没有关系,CPU的线程调度也会发生先后问题,因为本身没有做同步机制),所以理论上producerlimit在AB都更新完成之后应该变为11,但是实际上并不能保证这一点,有可能A在B之前获取到的cIndex的值,并完成producerlimit的计算,但是在写回之前CPU调度A写回之前先执行了线程B,先将11写回了producerlimit,之后才调度线程A将10写回了producerlimit,这样导致producerlimit的最终值变成了10而不是11,这确实在MpscArrayQueue的实现中是有可能发生的,但是这里为什么官方说这种竞争是良性的呢?其实读到这里就已经发现了----每次pIndex的值小于producerlimit的时候重新计算producerlimit的值,这时候只要保证pIndex的值是递增的就不会影响到生产者消息的提交,只是多了一次producerlimit的重新计算而已;这里的设计也体现了cas同步的一些基本思路;而对于pIndex的值使用了cas操作,不会导致多个线程获取到同一个pIndex的情况---如果多个线程获取到同一个pIndex的时候,才会真正产生多线程并发问题;
- 这里其实说明了确实会产生竞争,也就是说多个生产者线程并发更新producerlimit会产生不一致问题,怎么理解,比如两个线程A和B都拿到了producerlimit的快照值,且都在快容量内没有抢占pIndex成功,除了线程AB之外还有其他线程抢占pIndex,此时AheB都会计算新的producerlimit值,先看一下producerlimit的计算逻辑--
至此,就分析完了MpscArrayQueue中对于多生产这线程并发提交的实现的分析,其实总结起来就是一句话---通过cas抢占pIndex来完成多线程并发提交的问题;而producerlimit主要是为了实现环形数组的实现,环形数主要通过对数组长度取模来计算的--实际实现是对length-1按位与来代替取模计算;
环形数组的实现依赖mask;首先看一下mask的定义;
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
mask = actualCapacity - 1;
首先将容量近似为定义的参数的下一个2的n次幂的值,比如我们传入的值为3,则队列实际长度为4,这样与capacity-1计算出来的mask进行&运算相当于取模;
整体看一下相关代码及注释---
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
//官方注释--- 使用consumer index的缓存视图可能在循环中更新---
// 这句话怎么理解--producer limit计算方式--
// producerLimit = cIndex + mask + 1
//可以看出producer limit的值跟mask和consumer limit有关,
//首先看一下mask的值--
// int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
// mask = actualCapacity - 1;
//可以看出mask的值是跟capacity有关,而容量是我们初始化队列的时候就定义好的值,所以运行过程中producerlimit的值的变化取决于consumer index,
//而这里获取到producerlimit的值之后才进入cas自旋更新pIndex,所以在自旋的过程中consumerindex的值可能会发生改变,导致producerlimit的值发生改变
//所以这里在自旋开始前获取到的producer limit的值是源于cIndex的一个缓存值--在自旋成功后该值未必是正确的--这也导致了自旋成功后第二次判断producerlimit的值;
//其实这里可以先不获取producerlimit的值,自旋跟新pIndex的值之后再获取,而官方这里提前获取可能是为了优化性能---断定这里cIndex被修改的可能性小(因为是单消费者)
final long mask = this.mask;
long producerLimit = lvProducerLimit();
long pIndex;
do
{//注意这里的do-While循环,循环体内主要获取并更新producerlimit,然后cas自旋获取pIndex锁,获取成功后再一次更新producerlimit;
//这也是官方代码设计巧妙的一点,正常逻辑在生产者提交消息之前首先抢占PIndex的索引即可,但是这里更新producerlimit保证了在抢占pindex前pindex是有效的
//获取producer index
pIndex = lvProducerIndex();
if (pIndex >= producerLimit)
{
//获取consumer index
final long cIndex = lvConsumerIndex();
//计算producer limit
producerLimit = cIndex + mask + 1;
if (pIndex >= producerLimit)
{
return false;
}
else
{
//将生产者限制更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的
//这里的意思是---上边的if判断成运行到这里之后,可能有其他线程在此处并发修改了producerlimit,所以这里producerlimit的set是有竞争的,是线程不安全的,
// 但是这种不安全最终对消息的提交和生产不会造成并发错误---这里有一个非常巧妙的设计,下边解释里会详细解释
soProducerLimit(producerLimit);//这里对producerlimit的修改也是lazyset---底层调用了Unsafe类的putLongVolatile()方法
}
}
}
while (!casProducerIndex(pIndex, pIndex + 1));
/*注意:新的pindex在数组中的元素之前可见。如果我们依赖poll()的索引可见性,我们将需要处理元素不可见的情况。
* ---这句官方注释怎么理解:
* 多生产者为了实现生产者并发生产消息,每个生产者在抢占到Pindex之后会先将pindex暴露出去,提供其他的生产者抢占,之后才对具体的消息进行lazyset,这里就有一个问题了,消费者也能看到这个pIndex,所以消费者想
* 消费这个pIndex对应的消息的时候有可能这个时候生产者还未实际进行写入,或者写入不可见,所以在消费者poll的时候要处理这中情况
* */
// Won CAS, move on to storing
final long offset = calcCircularRefElementOffset(pIndex, mask);
// REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
//REF_ARRAY_BASE表示数组初始位置,相当于0,REF_ELEMENT_SHIFT为2,这里左移两位相当于×4
//重点看一下index&mask, index 表示当前生产者线程获取到的index,mask的计算方式--
// int actualCapacity = Pow2.roundToPowerOfTwo(capacity)---从中可以找到下一个二次幂的值。
//返回下一个2的正幂,如果是2的幂,则返回该值。负值映射到1。
// mask = actualCapacity - 1;
soRefElement(buffer, offset, e);
return true; // AWESOME :)
}