循环队列
概述
在优化系统性能时,我们通常需要分析一个单线程程序各模块的功能和性能,然后将这些模块拆分到多个线程中并行执行。而多个线程之间需要加入缓存以实现线程间的通信。如图1所示:
为方便进程间通信,通常需要采用循环队列的数据结构来实现生产者-消费者模式。在图1中,对缓存1而言,线程1为生产者,将经由线程1处理后的数据写入缓存1,线程2为消费者从缓存1中读取数据进行后续处理。同理对缓存2而言,线程2为生产者而线程3为消费者。对于线程间的循环队列有如下关键的操作步骤:
-
空间分配
为队列分配一定大小的空间,在初始化时执行该操作。
-
获取写空间
获取队列的写空间大小。
-
获取读空间
获取队列的读空间大小。
-
写队列
向环队列中写入一定长度的数据,由生产者线程执行该操作。
-
读队列
从队列中读取一定长度的数据,由消费者线程执行该操作。
-
循环
当读写到对尾时循环到对头。
在图1的架构中,读写队列将是一个非常高频的操作,所以读写队列的开销至关重要。下面将重点阐述如何实现一个循环队列,以及如何实现一个高性能的循环队列。
普通循环队列
基本思路
首先,我们需要设计一个结构来管理循环队列,这个结构需要包含以下元素:
-
队列指针
指向循环队列的缓存buffer。
-
队列大小
表示循环队列的大小(单位:字节)。
-
队列写入位置
写队列时,从写入位置向后写入数据。写入完成后需要修改写入位置。
-
队列读取位置
读队列时,从读取位置向后读取数据。读取完成后需要修改读取位置。
-
互斥锁
由于可能存在多个线程操作队列,修改队列的读写位置,所以需要互斥锁进行线程同步(注意不是读写锁)。
循环队列的结构及各元素如图2所示。采用常规算法,完成读写队列的操作后,都会修改队列的读写位置,当读写位置到达队尾后将读写位置重置到队头,完成循环操作。如此循环队列会呈现出如图3所示的两种状态。
- 队列读取位置在队列写入位置前
- 队列读取位置在队列写入位置后
如此,在实际读写队列前就需要判断队列读取位置和写入位置的先后顺序,以此来决定读写空间的获取方式以及数据读写的方式。同时还有非常关键的一点,在图3所示的两种状态下,都有可能出现读写位置相等的情况。状态1读写位置相等表示队列为空,状态2读写位置相等表示队列为满。必须事先规定读写位置相同时表示何种状态,假设读写位置相同时表示队列为空,那么写入位置就不能追上读取位置,在写入操作进行空间判断时就需要在所需空间的基础上加上至少一个字节的的占位符。下面我们来看看如何实现循环队列的几个关键步骤。
空间分配
循环队列的空间分配非常简单,可以直接按照指定长度调用new
或malloc
分配一段内存空间,然后将内存地址赋值给队列指针即可。
获取写空间
根据队列的不同状态,获取写空间的流程如图4所示。
-
判断读取位置是否小于等于写入位置。
-
是,则属于状态1,写空间 = 队列长度 - 写入位置 + 读取位置。
-
否,则属于状态2,写空间 = 读取位置 - 写入位置。
获取读空间
根据队列的不同状态,获取读空间的流程如图5所示。
1.判断读取位置是否小于等于写入位置。
2.是,则属于状态1,读空间 = 写入位置 - 读取位置。
3.否,则属于状态2,读空间 = 写入位置 + 队列长度 - 读取位置。
写队列
根据队列的不同状态,写队列的流程如图6所示。
-
判断读取位置是否小于等于写入位置。
-
是,则属于状态1,先从写入位置写到队列结尾,再从队列起始位置写入剩余数据。
-
否,则属于状态2,从写入位置向后写入指定长度的数据。
-
更新写入位置。
读队列
根据队列的不同状态,读队列的流程如图7所示。
-
判断读取位置是否小于等于写入位置。
-
是,则属于状态1,从读取位置直接向后读取指定长度。
-
否,则属于状态2,先从队列读取位置读取到队列结尾,再从队列起始位置读剩余数据。
-
更新读取位置。
循环
当读写到队尾时,需要将读写位置重置到队列头,完成循环操作。循环操作的常规实现方式为取余。所以读写位置的修改方式通常如下:
读位置 = (读位置 + 读取长度) % 队列大小
写位置 = (写位置 + 写入长度) % 队列大小
锁
我们来回顾一下整个队列的读写流程,可以分为如下步骤:
- 判断读写空间
- 获取读写位置
- 实际读写操作
- 更新读写位置
这四个步骤中,前三个步骤会使用读写位置,最后一个步骤会更新读写位置。步骤3会读写队列,所以在并行环境下,这4个步骤需要串行执行。即在开始执行前加互斥锁结束执行后解锁,具体如下:
- 加锁
- 判断读写空间
- 获取读写位置
- 实际读写操作
- 更新读写位置
- 解锁
扩展
在上述6个步骤中,实际读写队列相对比较耗时,所以为了提高并发性,通常有一种优化方式,就是采用如下流程:
- 加锁
- 判断读写空间
- 获取读写位置
- 更新读写位置
- 返回原始读写位置
- 解锁
- 实际读写操作(使用原始读写位置)
先更新读写位置,然后解锁,解锁后再完成读写操作。由于更新读写位置本身是串行的,所以更新读写位置后,不同线程读写的是队列中的不同位置,所以不存在并发性问题(PostgreSQL写XLOG就是采用的这样的思想)。但有一点需要注意,一旦更新了读取位置,就说明队列中的这段数据已经被读取,内容可以覆盖,而实际上可能并没有读取;同理,一旦更新了写入位置,就说明队列中的这段数据已经写入,内容可以被读取,而实际上并没有写入。所以需要在读写操作的时候做相应处理(PostgreSQL的
WALInsertLockUpdateInsertingAt
就是用于处理类似问题的)。
高性能循环队列—kfifo
明白了普通循环队列的实现原理后,我们来看看大神们是如何实现循环队列的,该循环队列的实现来源于Linux内核kfifo的实现。
kfifo的定义文件: kernel/kfifo.c
kfifo的头文件: include/linux/kfifo.h
在讲解源代码之前,我们先来看看前面实现的普通循环队列存在的问题以及可以优化的地方。
锁
在普通循环队列中,读写队列的整个流程都需要加锁,而实际是否真的有必要呢?假设当前场景下只有两个线程,一个读队列,一个写队列,那么还需要加锁么?答案是不需要!因为读操作只会修改读取位置,写操作只会修改写入位置,读写位置就不会存在多线程并发修改的情况。同时读写线程读写的是队列的不同位置,所以并发读写也不会存在并发性问题。所以在只有一个读线程和一个写线程读写队列的场景下,读写操作无需加锁,而这样的场景是非常常见的单生产者,单消费者模式。
多生产者/消费者
如果生产者或消费者不止一个,那就必须要加锁了。有相关文档会介绍采用CAS来实现多生产者\消费者的循环队列。CAS其实就是自旋锁的底层实现,所以CAS也让是一种轻量级的锁,而不是真正意义上的无锁。
循环
在普通循环队列中,循环操作是通过取余来实现的。然而,如果队列大小为2的n次幂那么取余操作就可以采用&运算来代替。即如果b为2的n次幂,那么a % b
等价于a & (b - 1)
。由于&运算会比取余运算快很多,所以可以采用&运算来替代取余运算。
空间分配
由于&运算替代取余运算的前提是队列大小为2的n次幂,所以在分配空间前需要将指定的空间大小向上圆整为2的n次幂。
写队列
在普通循环队列中,写循环队列存在两种状态,两种状态下的写入流程不同。但两种状态的写入流程可以归一为状态1的写入流程,不论为状态1还是状态2都可以按照状态1的流程执行写入,即:
-
从队列写入位置写入到队列结尾处。
-
从队列起始位置向后写入剩余数据。
归一化之后 ,如果缓存为状态2,那么在执行完步骤1后就不可能还有剩余数据了,所以步骤2自然就不会执行。归一化之后有两大好处:
- 逻辑更加清晰、简单。
- 省略了if判断。没有if判断的代码可以更好的利用CPU流水线,性能更佳(不会面临分支预测以及预测错误的代价)。
注意
写队列操作是在计算完写入空间大小之后执行的,能执行写队列操作说明队列里的空间是足够写入数据的。也就是说如果队列状态为状态2,那么读取位置-写入位置的大小足够写入数据,所以从队列写入位置到队列结尾处一定有足够的空间可以写入数据,反之就是计算空间大小时计算错了。
读队列
同理,读队列时也可以将两种状态的读取流程归一为状态2的读取流程,即:
-
从缓存读取位置读取到缓存结尾处。
-
从缓存起始位置读取剩余数据。
获取写空间
在普通队列中,我们总是在**修改写入位置的同时进行取余操作,完成循环。**即:
写位置 = (写位置 + 写入长度) % 队列大小
这也是为什么队列会呈现出两种状态的根本原因。如果在修改写入位置时只是简单的递增写入位置而不进行取余操作,那么队列就只会存在状态1这一种情况,那么写空间的大小也只有一种计算方式:
写空间 = 队列大小 -(写入位置 - 读取位置)=> 队列大小 - 写入位置 + 读取位置
如此我们将写空间的获取流程也进行了归一化,从而避免了条件判断。
获取读空间
同理,对于读位置的修改,我们也可以简单的递增读位置而不进行区域操作,那么读空间的大小也只有一种计算方式:
读空间 = 写入位置 - 读取位置
优化后的读写流程
由于我们不会在修改读写位置的同时进行取余操作,那么对于读写循环的实现就需要放在实际读写时进行。优化后的读写全流程如图8、图9所示。
无符号整数回绕特性
由于在修改读写位置时,我们只是简单的递增读写位置而不取余,所以看起来队列的状态永远都是状态1。单读写位置单调递增,最终会超过数据类型所能表达的上限发生溢出。关于这一点,可以利用无符号数的回绕机制来解决:一个无符号数超过它的上限时会回绕到0。假设一个无符号数的上限为max,假设有一个大于max的数value,那么value等于value - max。有了这样一个机制,无符号数溢出时不会影响读写空间的计算(而实际读写时由于做了取余操作所以不存在溢出)。
以写入空间的计算方式为例进行说明:
写空间 = 队列大小 - 写入位置 + 读取位置
设无符号整数的最大值为max,假设此时写入位置发生了溢出,那么写空间为:
写空间 = 队列大小 -(写入位置 - max)+ 读取位置 => 队列大小 - 写入位置 + max + 读取位置 => (队列大小 - 写入位置 + 读取位置) + max
这个值显然大于max也会发生溢出,溢出后为:
溢出结果 =(缓存大小 - 缓存写入位置 + 缓存读取位置) + max - max => 缓存大小 - 缓存写入位置 + 缓存读取位置
所以由于无符号数的回绕特性,无符号数溢出时并不影响读写空间的运算方式。
kfifo代码解读
结构体设计
struct kfifo {
unsigned char *buffer; /* the buffer holding the data 队列指针*/
unsigned int size; /* the size of the allocated buffer 队列大小*/
unsigned int in; /* data is added at offset (in % size) 队列读取位置*/
unsigned int out; /* data is extracted from off. (out % size) 队列写入位置*/
spinlock_t *lock; /* protects concurrent modifications 自旋锁,存在多个读写线程时使用*/
};
空间分配
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
* 向上圆整为2的n次幂
*/
if (size & (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
//实际分配空间
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
//分配并初始化kfifo结构体,给kfifo结构体各元素赋初值
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
写队列
/**
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most @len bytes from the @buffer into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//计算写入空间:队列大小 - 写入位置 + 读取位置
len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*
* 内存屏障(全屏障)
*/
smp_mb();
/* first put the data starting from fifo->in to buffer end */
/* 从队列写入位置(mod队列大小)写入到队列结尾处 */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
/* 从队列起始位置向后写入剩余数据 */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*
* 内存屏障(写屏障)
*/
smp_wmb();
//单调递增写入位置
fifo->in += len;
return len;
}
读队列
/**
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most @len bytes from the FIFO into the
* @buffer and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//计算读取空间:写入位置 - 读取位置
len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*
* 内存屏障(读屏障)
*/
smp_rmb();
/* first get the data from fifo->out until the end of the buffer */
/* 从缓存读取位置(mod队列大小)读取到缓存结尾处 */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
/* 从缓存起始位置读取剩余数据 */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*
* 内存屏障(全屏障)
*/
smp_mb();
//单调递增读取位置
fifo->out += len;
return len;
}
关于内存屏障
在读写缓存时虽然不需要使用锁,但需要使用内存屏障来防止CPU优化带来的并发性问题。关于内存屏障以及在kfifo中的作用,详见《内存屏障与volatile(C语言版)》