代码基于:Kernel 6.6
0. 前言
读写信号量的原理与读写锁类似,读写信号量归根到底是 “信号量”,读写锁归根到底是 “自旋锁”,而信号量与自旋锁的区别一个可以睡眠,一个只能自旋。
读写信号量原理:
- 允许多个读者同时进入临界区;
- 读者与写者不能同时进入临界区(读者与写者互斥);
- 写者与写者不能同时进入临界区(写者与写者互斥);
1. struct rw_semaphore
include/linux/rwsem.h
struct rw_semaphore {
atomic_long_t count; //读写信号量的计数
atomic_long_t owner; //当写者成功获取锁时,owner会指向锁的持有者
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; //乐观自旋
#endif
raw_spinlock_t wait_lock; //自旋锁,用于count值的互斥访问
struct list_head wait_list; //不能立即获取到信号量的访问者,都会加到等待队列中
...
};
1.1 count
读写信号量的计数,逻辑控制的核心变量,是个atomic 变量。
kernel/locking/rwsem.c
* On 64-bit architectures, the bit definitions of the count are:
*
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
* Bit 2 - lock handoff bit
* Bits 3-7 - reserved
* Bits 8-62 - 55-bit reader count
* Bit 63 - read fail bit
*
* On 32-bit architectures, the bit definitions of the count are:
*
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
* Bit 2 - lock handoff bit
* Bits 3-7 - reserved
* Bits 8-30 - 23-bit reader count
* Bit 31 - read fail bit
*/
#define RWSEM_WRITER_LOCKED (1UL << 0)
#define RWSEM_FLAG_WAITERS (1UL << 1)
#define RWSEM_FLAG_HANDOFF (1UL << 2)
#define RWSEM_FLAG_READFAIL (1UL << (BITS_PER_LONG - 1))
#define RWSEM_READER_SHIFT 8
#define RWSEM_READER_BIAS (1UL << RWSEM_READER_SHIFT)
#define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1))
#define RWSEM_WRITER_MASK RWSEM_WRITER_LOCKED
#define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL)
注意 bit1:RWSEM_FLAG_WAITERS
当前持锁的如果是reader,来了一个writer,持锁失败进入等待队列,那么 RWSEM_FLAG_WAITERS
为1。此时又有一个reader 来持锁,在快速路径中根据这个判断,持锁也会失败,进入慢速路径,再根据一些条件进行判断是否可以持锁。所以,reader 已经持锁的情况下,其他的reader 也不是无条件可以马上持锁的。
rwsem 在初始化的时候会将count 设为 RWSEM_UNLOCKED_VALUE
:
include/linux/rwsem.h
#define RWSEM_UNLOCKED_VALUE 0L
注意 reader cnt计数:RWSEM_READER_BIAS
reader 申请锁时,会计算 sem->count + RWSEM_READER_BIAS
*cntp = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count);
注意 READ_FAILED标记:RWSEM_READ_FAILED_MASK
while (!(tmp & RWSEM_READ_FAILED_MASK)) {
在 down_read
时,确认sem->count 是否处于READ_FAILED 状态:
- RWSEM_WRITER_LOCKED //有writer 申请锁
- RWSEM_FLAG_WAITERS //锁被writer持有
- RWSEM_FLAG_HANDOFF
- RWSEM_FLAG_READFAIL //reader cnt太多,溢出
1.2 owner
owner 有两个作用:
- 记录rwsem 被哪个task 持有。只有writer 持锁时,这个owner 才能正确表示持有者,而可能同时存在很多个reader,所以reader 持锁时,owner 不能正确表示持锁着,这也是锁传递不能对 reader 进行传递的原因;
- 因为task_struct 的地址是 L1_CACHE_BYTES 对齐的,所以其最低 2 位恒为0,这 2 位就可以用来描述一些状态信息。
来看下最低 2 位:
kernel/locking/rwsem.c
#define RWSEM_READER_OWNED (1UL << 0)
#define RWSEM_NONSPINNABLE (1UL << 1)
#define RWSEM_OWNER_FLAGS_MASK (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)
当一个writer 获得rwsem,会将task_struct 指针存入owner,在 unlock时 清除owner;
1.2.1 RWSEM_READER_OWNED
当一个reader 获得rwsem,也会将task_struct 指针存入owner,但需要将 RWSEM_READER_OWNED
标志位置1. 在unlock 的时候所有的字段将基本保持不变,所以,对于一个空闲的rwsem 或 已经有reader-owned 的rwsem,owner 的值或许包含上一次获得rwsem 的 reader 的信息;
1.2.1 RWSEM_NONSPINNABLE
在读/写 的乐观自旋流程中,如果判断到 owner 设置了对应的 nonspinnable 标记,就不会进入乐观自旋。乐观自旋有很大概率把锁偷走。
在进入乐观自旋之前没有设置 nonspinnable,进入后被设置了,也会结束乐观自旋。
设置 RWSEM_NONSPINNABLE 的函数是 rwsem_set_nonspinnable
:
static inline void rwsem_set_nonspinnable(struct rw_semaphore *sem)
{
unsigned long owner = atomic_long_read(&sem->owner);
do {
if (!(owner & RWSEM_READER_OWNED)) //reader持锁时,才能设置
break;
if (owner & RWSEM_NONSPINNABLE) //已经设置了,pass
break;
} while (!atomic_long_try_cmpxchg(&sem->owner, &owner,
owner | RWSEM_NONSPINNABLE)); //尝试设置
}
调用 rwsem_set_nonspinnable
有三个地方:
- 在
rwsem_read_trylock
时,如果 reader cnt 过多溢出,设置nonspinnalbe; - 在
rwsem_optimistic_spin
时,如果自旋的是 writer 并且持锁的是reader,等待的时间超过自旋的timeout,则会设置 nonspinnable; - 在
__rwsem_set_reader_owned
时,设置新的owner,会将上一个owner 的nonspinnable 属性延续;
清除 RWSEM_NONSPINNABLE 的函数是 clear_nonspinnable
:
static inline void clear_nonspinnable(struct rw_semaphore *sem)
{
if (unlikely(rwsem_test_oflags(sem, RWSEM_NONSPINNABLE)))
atomic_long_andnot(RWSEM_NONSPINNABLE, &sem->owner);
}
调用 clear_nonspinnable
有两个地方:
- 在
_up_read
释放锁之后,如果只有 writer 持锁时,可以把这个标记清除; - 在
down_read
慢速路径中,如果没有持锁者,可以把这个标记清除,因为之前 writer spin 出现超时后,设置过nonspinnalbe;
1.3 osq(乐观自旋队列)
struct optimistic_spin_node {
struct optimistic_spin_node *next, *prev;
int locked; /* 1 if lock acquired */
int cpu; /* encoded CPU # + 1 value */
};
struct optimistic_spin_queue {
/*
* Stores an encoded value of the CPU # of the tail node in the queue.
* If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
*/
atomic_t tail;
};
Optimistic spin queue 翻译过来为乐观自旋队列,即形成一组处于自旋状态的任务队列。
和等待队列不同,osq 中的任务都是当前正在执行的任务。osq 并没有将这些任务的 task struct 形成队列结构,而是把 per-cpu 的MSC lock 对象串联形成队列。如下图:
虽然都是自旋,但是自旋方式并不一样。Osq队列中的头部节点是持有osq锁的,只有该任务处于对rwsem的owner进行乐观自旋的状态(我们称之rwsem乐观自旋)。Osq队列中的其他节点都是自旋在自己的mcs lock上(我们称之mcs乐观自旋)。当头部的mcs lock释放掉后(结束rwsem乐观自旋),它会将mcs lock传递给下一个节点,从而让spinner队列上的任务一个个的按顺序进入rwsem的乐观自旋,从而避免了cache-line bouncing带来的性能开销。
cache-line bouncing的理解:为了以较低的成本大幅提升性能,现代CPU都有cache。cpu cache已经发展到了三级缓存结构。其中L1和L2cache为每一个核独有,L3则全部核共享。为了保证全部的核看到正确的内存数据,一个核在写入本身的L1 cache后,CPU会执行Cache一致性算法把对应的cacheline同步到其余核,这个过程并不很快。当多个cpu上频繁修改某个字段时,这个字段所在的cacheline被不停地同步到其它的核上,就像在核间弹来弹去,这个现象就叫作cache bouncing。
2. 读写信号量初始化
2.1 静态初始化
include/linux/rwsem.h
#define __RWSEM_INITIALIZER(name) \
{ __RWSEM_COUNT_INIT(name), \
.owner = ATOMIC_LONG_INIT(0), \
__RWSEM_OPT_INIT(name) \
.wait_lock = __RAW_SPIN_LOCK_UNLOCKED(name.wait_lock), \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
__RWSEM_DEBUG_INIT(name) \
__RWSEM_DEP_MAP_INIT(name) }
#define DECLARE_RWSEM(name) \
struct rw_semaphore name = __RWSEM_INITIALIZER(name)
2.2 动态初始化
include/linux/rwsem.h
#define init_rwsem(sem) \
do { \
static struct lock_class_key __key; \
\
__init_rwsem((sem), #sem, &__key); \
} while (0)
kernel/locking/rwsem.c
void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE); //初始化count
raw_spin_lock_init(&sem->wait_lock); //初始化wait_lock
INIT_LIST_HEAD(&sem->wait_list); //初始化wait_list
atomic_long_set(&sem->owner, 0L); //初始化owner
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
osq_lock_init(&sem->osq); //初始化osq
#endif
}
EXPORT_SYMBOL(__init_rwsem);
3. down_read()
kernel/locking/rwsem.c
void __sched down_read(struct rw_semaphore *sem)
{
might_sleep();
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
}
EXPORT_SYMBOL(down_read);
3.1 LOCK_CONTENDED()
include/linux/lockdep.h
#define LOCK_CONTENDED(_lock, try, lock) \
do { \
if (!try(_lock)) { \
lock_contended(&(_lock)->dep_map, _RET_IP_); \
lock(_lock); \
} \
lock_acquired(&(_lock)->dep_map, _RET_IP_); \
} while (0)
宏函数 LOCK_CONTENDED
主要目的是先进行 trylock,如果失败再进行 lock。
对应到 down_read() 函数,则会先调用 __down_read_trylock
,如果失败则调用 __down_read
函数。
3.2 __down_read_trylock()
kernel/locking/rwsem.c
static inline int __down_read_trylock(struct rw_semaphore *sem)
{
int ret = 0;
long tmp;
DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);
preempt_disable(); //关闭抢占
tmp = atomic_long_read(&sem->count); //读取sem->count
while (!(tmp & RWSEM_READ_FAILED_MASK)) { //确认是否有writer或handoff
//确认是否可以将reader cnt 计数+1
if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
tmp + RWSEM_READER_BIAS)) {
rwsem_set_reader_owned(sem);
ret = 1;
break;
}
}
preempt_enable(); //打开抢占
return ret;
}
首先会读取 sem->count,如果此时没有 writer、没有置handoff,reader cnt也没有溢出,则尝试将 sem->count 加上 RWSEM_READER_BIAS,即 reader cnt 计数+1.
atomic_long_try_cmpxchg_acquire
函数用以对reader cnt 计数+1,并调用 rwsem_set_reader_owned
函数配置 sem 的owner。
3.3 rwsem_set_reader_owned()
static inline void rwsem_set_reader_owned(struct rw_semaphore *sem)
{
__rwsem_set_reader_owned(sem, current);
}
static inline void __rwsem_set_reader_owned(struct rw_semaphore *sem,
struct task_struct *owner)
{
unsigned long val = (unsigned long)owner | RWSEM_READER_OWNED |
(atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE);
atomic_long_set(&sem->owner, val);
}
最新的owner 是current 线程,因为task_struct 的地址是 L1_CACHE_BYTES 对齐的,所以其最低 2 位恒为0,这 2 位就可以用来描述一些状态信息。
确认上一个owner即 sem->owner 中的 RWSEM_NONSPINNABLE 状态,将其延续到最新的 owner,并将最新的owner 置上 RWSEM_READER_OWNED 状态,标记reader 持有该锁。
最后将最新的owner 存入sem->owner
3.3 __down_read()
static __always_inline void __down_read(struct rw_semaphore *sem)
{
__down_read_common(sem, TASK_UNINTERRUPTIBLE);
}
3.4 __down_read_common()
static __always_inline int __down_read_common(struct rw_semaphore *sem, int state)
{
int ret = 0;
long count;
preempt_disable(); //关闭抢占
if (!rwsem_read_trylock(sem, &count)) { //确认READ_FAILED 标记
if (IS_ERR(rwsem_down_read_slowpath(sem, count, state))) {
ret = -EINTR;
goto out;
}
DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
}
out:
preempt_enable(); //打开抢占
return ret;
}
不是整个down_read 过程都是关闭抢占的,在 rwsem_down_read_slowpath
函数中在进入睡眠之前会打开抢占。
这里试图调用 rwsem_read_trylock
获得锁,需要注意在 reader cnt 溢出时,且是reader 持锁,会将sem 的owner 置上RWSEM_NONSPINNABLE。
如果调用 rwsem_read_trylock
没有获得锁,则进入慢速路径rwsem_down_read_slowpath
.
3.4.1 rwsem_read_trylock()
static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cntp)
{
*cntp = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count);
if (WARN_ON_ONCE(*cntp < 0))
rwsem_set_nonspinnable(sem);
if (!(*cntp & RWSEM_READ_FAILED_MASK)) {
rwsem_set_reader_owned(sem);
return true;
}
return false;
}
当一个reader 调用down_read 时,会经历两条路:
- __down_read_trylock
- __down_read
当通过 __down_read_trylock
无法获取到锁时,sem->count 是不会计数,而是调用__down_read
一直到这里尝试计数+1,并将+1后的值返回存入 *cntp 中.
atomic_long_add_return_acquire
是个原子操作,目的是将 sem->count 计数 +1,并将此时+1 后的结果返回(多线程时,需要返回此次reader申请后的count)。
3.5 rwsem_down_read_slowpath()
static struct rw_semaphore __sched *
rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int state)
{
long adjustment = -RWSEM_READER_BIAS;
long rcnt = (count >> RWSEM_READER_SHIFT); //当前reader申请时的获取reader cnt
struct rwsem_waiter waiter; //定义一个waiter
DEFINE_WAKE_Q(wake_q); //定义一个唤醒队列
/*
* 如果锁的owner是一个reader,那说明有个writer在等锁,不去尝试乐观自旋,防止writer饿死
*/
if ((atomic_long_read(&sem->owner) & RWSEM_READER_OWNED) &&
(rcnt > 1) && !(count & RWSEM_WRITER_LOCKED))
goto queue;
/*
* 当持锁的是一个writer,将该reader 加入乐观自旋
*/
if (!(count & (RWSEM_WRITER_LOCKED | RWSEM_FLAG_HANDOFF))) {
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_steal);
/*
* 如果当前reader 是第一个reader,唤醒等待队列里的其他的reader
*/
if ((rcnt == 1) && (count & RWSEM_FLAG_WAITERS)) {
raw_spin_lock_irq(&sem->wait_lock);
if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED,
&wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
}
return sem;
}
queue:
//waiter 初始化
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ; //
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
waiter.handoff_set = false;
raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list)) {
/*
* In case the wait queue is empty and the lock isn't owned
* by a writer, this reader can exit the slowpath and return
* immediately as its RWSEM_READER_BIAS has already been set
* in the count.
*/
if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
/* Provide lock ACQUIRE */
smp_acquire__after_ctrl_dep();
raw_spin_unlock_irq(&sem->wait_lock);
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_fast);
return sem;
}
adjustment += RWSEM_FLAG_WAITERS;
}
rwsem_add_waiter(sem, &waiter);
/* we're now waiting on the lock, but no longer actively locking */
count = atomic_long_add_return(adjustment, &sem->count);
rwsem_cond_wake_waiter(sem, count, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
if (!wake_q_empty(&wake_q))
wake_up_q(&wake_q);
trace_contention_begin(sem, LCB_F_READ);
/* wait to be given the lock */
for (;;) {
set_current_state(state);
if (!smp_load_acquire(&waiter.task)) {
/* Matches rwsem_mark_wake()'s smp_store_release(). */
break;
}
if (signal_pending_state(state, current)) {
raw_spin_lock_irq(&sem->wait_lock);
if (waiter.task)
goto out_nolock;
raw_spin_unlock_irq(&sem->wait_lock);
/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
break;
}
schedule_preempt_disabled();
lockevent_inc(rwsem_sleep_reader);
}
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock);
trace_contention_end(sem, 0);
return sem;
out_nolock:
rwsem_del_wake_waiter(sem, &waiter, &wake_q);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
trace_contention_end(sem, -EINTR);
return ERR_PTR(-EINTR);
}
未完待续。。。