在linux系统当中存在很多独占性的资源,他在同一个时间只能被一个进程使用。常见的有打印机、内存或者系统内部表现等资源。如果打印机同时被两个进程使用,打印结果就会混乱输出结果;如果一个内存资源被多个进程同时修改,进程实际的执行结果就没有办法被保证。因此,操作系统中会通过几种锁(原子变量,关中断,信号量,自旋锁)授权某一个进程排他性的访问某一种资源。
在很多应用中,需要一个进程排他性的访问的若干资源而不是一个资源。如果进程A写一片内存位置的A资源,进程B进程写一片内存位置B的资源。在进程B释放内存B的之前,B同时也申请写内存A位置的资源,A进程这个时候也没有释放A位置资源,同时请求内存B位置资源,这种情况下变回出现死锁问题,两个进程都会阻塞在这一种状态中。
总结,关于锁需要掌握linux内核中常见的锁资源使用和实现原理,同时能够在使用过程中学习避免死锁的一些方法。
1. 资源
计算机系统当中资源有硬件和软件区分,其概念是指随着时间的推移,必须能够获得、使用以及释放的任何东西。资源分为可抢占和不可抢占两种情况。具体某一项内容是抢占还是非抢占资源是根据具体上下文来确定的。比方说,在很多情况下,我们通过资源的划分,例如内存分布情况,可以将内存资源做成非抢占资源;但是在另外一些情况,比方说某个设备资源映射的内存区域,需要使用设备时就必须实用这片内存,这种情况下这个内存资源就是不可抢占资源。对于抢占资源我们通过锁临时对数据区域进行保护,当然也就会有出现死锁的可能性。锁 - linux内核锁(零)_生活需要深度的博客-CSDN博客_linux 内核锁计算机系统当中资源有硬件和软件区分,其概念是指随着时间的推移,必须能够获得、使用以及释放的任何东西。资源分为可抢占和不可抢占两种情况。具体某一项内容是抢占还是非抢占资源是根据具体上下文来确定的。比方说,在很多情况下,我们通过资源的划分,例如内存分布情况,可以将内存资源做成非抢占资源;但是在另外一些情况,比方说某个设备资源映射的内存区域,需要使用设备时就必须实用这片内存,这种情况下这个内存资源就是不可抢占资源。对于抢占资源我们通过锁临时对数据区域进行保护,当然也就会有出现死锁的可能性。
1.1. 资源访问流程
使用资源需要时间顺序抽象表示为:1) 请求资源;2) 使用资源;3)释放资源。当一个进程请求资源失败的情况下会重复这样的小循环:请求(失败),休眠,在请求。至于多次请求失败以后怎么处理决定于具体的系统或者业务设计者,可以在多次请求以后直接退出,也可以让请求者处于睡眠状态,一段时间以后再次唤醒。
2. 锁
2.1. 原子操作必要性
下面代码描述一个线程中的函数和中断处理函数,它们对同一个全局变量执行加 1 操作:
int a = 0;
void interrupt_handle()
{
a++;
}
void thread_func()
{
a++;
}
对于ARM处理器,经过编译器翻译后是{ ldr r0,=addr; add r0, r0, 1; str r0, [addr]}。这种才做在单进程无中断情况下是没有任何问题,单如果如上面粒子存在中断情况下,可能导致结果不确定。thread_func 函数还没运行完第 2 条指令时,中断就来了。CPU 转而处理中断,也就是开始运行 interrupt_handle 函数,这个函数运行完 a=1,CPU 还会回去继续运行第 3 条指令,此时 a 依然是 1,这显然是错的。下面来看一下表格,你就明白了。显然在 t2 时刻发生了中断,导致了 t2 到 t4 运行了 interrupt_handle 函数,t5 时刻 thread_func 又恢复运行,导致 interrupt_handle 函数中 a 的操作丢失,因此出错。
在单核无中断情况下资源都比较简单,在单核有中断的情况下,我们可以通过关闭中断方式实现资源的原子访问。但是,现在CPU都是多核了,关闭多个CPU的中断显然是不合适的。因此,原子操作,自旋锁,信号量这些类型的锁资源就出现了。
今天谈谈linux中常见并发访问的保护机制设计原理。为什么要写这篇文章呢?其实想帮助自己及读者更深入的了解背后的原理(据可靠消息,锁的实现经常出现在笔试环节。既可以考察面试者对锁的原理的理解,又可以考察面试者编程技能)。我们抛开linux中汇编代码。用C语言为大家呈现背后实现的原理。同时,文章中的代码都没有考虑并发情况(例如某些操作需要原子性,或者数据需要保护等)。
注:部分代码都是根据ARM64架构汇编代码翻译成C语言并经过精简(例如:spin lock、read-write lock)。也有部分代码实现是为了呈现背后设计的原理自己编写的,而不是精简linux中实现的代码(例如mutex)。
自旋锁(spin lock)
自旋锁是linux中使用非常频繁的锁,原理简单。当进程A申请锁成功后,进程B申请锁就会失败,但是不会调度,原地自旋。就在原地转到天昏地暗只为等到进程A释放锁。由于不会睡眠和调度的特性,在中断上下文中,数据的保护一般都是选择自旋锁。如果有多个进程去申请锁。当第一个申请锁成功的线程在释放的时候,其他进程是竞争的关系。因此是一种不公平。所以现在的linux采用的是排队机制。先到先得。谁先申请,谁就先得到锁。
【文章福利】小编推荐自己的Linux内核技术交流群:【977878001】整理一些个人觉得比较好得学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!前100进群领取,额外赠送一份价值699的内核资料包(含视频教程、电子书、实战项目及代码)
内核资料直通车:Linux内核源码技术学习路线+视频教程代码资料
学习直通车:Linux内核源码/内存调优/文件系统/进程管理/设备驱动/网络协议栈
原理
举个例子,大家应该都去过银行办业务吧。银行的办事大厅一般会有几个窗口同步进行。今天很不巧,只有一个窗口提供服务。现在的银行服务都是采用取号排队,叫号服务的方式。当你去银行办理业务的时候,首先会去取号机器领取小票,上面写着你排多少号。然后你就可以排队等待了。一般还会有个显示屏,上面会显示一个数字(例如:"请xxx号到1号窗口办理"),代表当前可以被服务顾客的排队号码。每办理完一个顾客的业务,显示屏上面的数字都会增加1。等待的顾客都会对比自己手上写的编号和显示屏上面是否一致,如果一致的话,就可以去前台办理业务了。现在早上刚开业,顾客A是今天的第一个顾客,去取号机器领取0号(next计数)小票,然后看到显示屏上显示0(owner计数),顾客A就知道现在轮到自己办理业务了。顾客A到前台办理业务(持有锁)中,顾客B来了。同样,顾客B去取号机器拿到1号(next计数)小票。然后乖乖的坐在旁边等候。顾客A依然在办理业务中,此时顾客C也来了。顾客C去取号机器拿到2号(next计数)小票。顾客C也乖乖的找个座位继续等待。终于,顾客A的业务办完了(释放锁)。然后,显示屏上面显示1(owner计数)。顾客B和C都对比显示屏上面的数字和自己手中小票的数字是否相等。顾客B终于可以办理业务了(持有锁)。顾客C依然等待中。顾客B的业务办完了(释放锁)。然后,显示屏上面显示2(owner计数)。顾客C终于开始办理业务(持有锁)。顾客C的业务办完了(释放锁)。3个顾客都办完了业务离开了。只留下一个银行柜台服务员。最终,显示屏上面显示3(owner计数)。取号机器的下一个排队号也是3号(next计数)。无人办理业务(锁是释放状态)。
linux中针对每一个spin lock会有两个计数。分别是next和owner(初始值为0)。进程A申请锁时,会判断next和owner的值是否相等。如果相等就代表锁可以申请成功,否则原地自旋。直到owner和next的值相等才会退出自旋。假设进程A申请锁成功,然后会next加1。此时owner值为0,next值为1。进程B也申请锁,保存next得值到局部变量tmp(tmp = 1)中。由于next和owner值不相等,因此原地自旋读取owner的值,判断owner和tmp是否相等,直到相等退出自旋状态。当然next的值还是加1,变成2。进程A释放锁,此时会将owner的值加1,那么此时B进程的owner和tmp的值都是1,因此B进程获得锁。当B进程释放锁后,同样会将owner的值加1。最后owner和next都等于2,代表没有进程持有锁。next就是一个记录申请锁的次数,而owner是持有锁进程的计数值。
实现
我们首先定义描述自旋锁的结构体arch_spinlock_t。
typedef struct {
union {
unsigned int slock;
struct __raw_tickets {
unsigned short owner;
unsigned short next;
} tickets;
};
} arch_spinlock_t;
如上面的原理描述,我们需要两个计数,分别是owner和next。slock所占内存区域覆盖owner和next(据说C语言学好的都能看得懂)。下面实现申请锁操作 arch_spin_lock。
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
arch_spinlock_t old_lock;
old_lock.slock = lock->slock; /* 1 */
lock->tickets.next++; /* 2 */
while (old_lock.tickets.next != old_lock.tickets.owner) { /* 3 */
wfe(); /* 4 */
old_lock.tickets.owner = lock->tickets.owner; /* 5 */
}
}
- 继续上面的举例。顾客从取号机器得到排队号。
- 取号机器更新下个顾客将要拿到的排队号。
- 看一下显示屏,判断是否轮到自己了。
- wfe()函数是指ARM64架构的WFE(wait for event)汇编指令。WFE是让ARM核进入低功耗模式的指令。当进程拿不到锁的时候,原地自旋不如cpu睡眠。节能。睡下去之后,什么时候醒来呢?就是等到持有锁的进程释放的时候,醒过来判断是否可以持有锁。如果不能获得锁,继续睡眠即可。这里就相当于顾客先小憩一会,等到广播下一位排队者的时候,醒来看看是不是自己。
- 前台已经为上一个顾客办理完成业务,剩下排队的顾客都要抬头看一下显示屏是不是轮到自己了。
释放锁的操作就非常简单了。还记得上面银行办理业务的例子吗?释放锁的操作仅仅是显示屏上面的排队号加1。我们仅仅需要将owner计数加1即可。arch_spin_unlock实现如下。
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
lock->tickets.owner++;
sev();
}
sev()函数是指ARM64架构的SEV汇编指令。当进程无法获取锁的时候会使用WFE指令使CPU睡眠。现在释放锁了,自然要唤醒所有睡眠的CPU醒来检查自己是不是可以获取锁。
信号量(semaphore)
信号量(semaphore)是进程间通信处理同步互斥的机制。是在多线程环境下使用的一种措施,它负责协调各个进程,以保证他们能够正确、合理的使用公共资源。 它和spin lock最大的不同之处就是:无法获取信号量的进程可以睡眠,因此会导致系统调度。
原理
信号量一般可以用来标记可用资源的个数。老规矩,还是举个例子。假设图书馆有2本《C语言从入门到放弃》书籍。A同学想学C语言,于是发现这本书特别的好。于是就去学校的图书馆借书,A同学成功的从图书馆借走一本。这时,A同学室友B同学发现A同学竟然在偷偷的学习武功秘籍(C语言)。于是,B同学也去借一本。此时,图书馆已经没有书了。C同学也想借这本书,可能是这本书太火了。图书馆管理员告诉C同学,图书馆这本书都被借走了。如果有同学换回来,会第一时间通知你。于是,管理员就把C同学的信息登记先来,以备后续通知C同学来借书。所以,C同学只能悲伤的走了(如果是自旋锁的原理的话,那么C同学将会端个小板凳坐在图书馆,一直要等到A同学或者B同学还书并借走)。
实现
为了记录可用资源的数量,我们肯定需要一个count计数,标记当前可用资源数量。当然还要一个可以像图书管理员一样的笔记本功能。用来记录等待借书的同学。所以,一个双向链表即可。因此只需要一个count计数和等待进程的链表头即可。描述信号量的结构体如下。
struct semaphore {
unsigned int count;
struct list_head wait_list;
};
在linux中,每个进程就相当于是每个借书的同学。通知一个同学,就相当于唤醒这个进程。因此,我们还需要一个结构体记录当前的进程信息(task_struct)。
struct semaphore_waiter {
struct list_head list;
struct task_struct *task;
};
struct semaphore_waiter的list成员是当进程无法获取信号量的时候挂入semaphore的wait_list成员。task成员就是记录后续被唤醒的进程信息。
一切准备就绪,现在就可以实现信号量的申请函数。
void down(struct semaphore *sem)
{
struct semaphore_waiter waiter;
if (sem->count > 0) {
sem->count--; /* 1 */
return;
}
waiter.task = current; /* 2 */
list_add_tail(&waiter.list, &sem->wait_list); /* 2 */
schedule(); /* 3 */
}
- 如果信号量标记的资源还有剩余,自然可以成功获取信号量。只需要递减可用资源计数。
- 既然无法获取信号量,就需要将当前进程挂入信号量的等待队列链表上。
- schedule()主要是触发任务调度的示意函数,主动让出CPU使用权。在让出之前,需要将当前进程从运行队列上移除。
释放信号的实现也是比较简单。实现如下。
void up(struct semaphore *sem)
{
struct semaphore_waiter waiter;
if (list_empty(&sem->wait_list)) {
sem->count++; /* 1 */
return;
}
waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
list_del(&waiter->list); /* 2 */
wake_up_process(waiter->task); /* 2 */
}
- 如果等待链表没有进程,那么自然只需要增加资源计数。
- 从等待进程链表头取出第一个进程,并从链表上移除。然后就是唤醒该进程。
读写锁(read-write lock)
不管是自旋锁还是信号量在同一时间只能有一个进程进入临界区。对于有些情况,我们是可以区分读写操作的。因此,我们希望对于读操作的进程可以并发进行。对于写操作只限于一个进程进入临界区。而这种同步机制就是读写锁。读写锁一般具有以下几种性质。
- 同一时间有且仅有一个写进程进入临界区。
- 在没有写进程进入临界区的时候,同时可以有多个读进程进入临界区。
- 读进程和写进程不可以同时进入临界区。
读写锁有两种,一种是信号量类型,另一种是spin lock类型。下面以spin lock类型讲解。
原理
老规矩,还是举个例子理解读写锁。我绞尽脑汁才想到一个比较贴切的例子。这个例子来源于生活。我发现公司一般都会有保洁阿姨打扫厕所。如果以男厕所为例的话,我觉得男士进入厕所就相当于读者进入临界区。因为可以有多个男士进厕所。而保洁阿姨进入男士厕所就相当于写者进入临界区。假设A男士发现保洁阿姨不在打扫厕所,就进入厕所。随后B和C同时也进入厕所。然后保洁阿姨准备打扫厕所,发现有男士在厕所里面,因此只能在门口等待。ABC都离开了厕所。保洁阿姨迅速进入厕所打扫。然后D男士去上厕所,发现保洁阿姨在里面。灰溜溜的出来了在门口等着。现在体会到了写者(保洁阿姨)具有排他性,读者(男士)可以并发进入临界区了吧。
既然我们允许多个读者进入临界区,因此我们需要一个计数统计读者的个数。同时,由于写者永远只存在一个进入临界区,因此只需要一个bit标记是否有写进程进入临界区。所以,我们可以将两个计数合二为一。只需要1个unsigned int类型即可。最高位(bit31)代表是否有写者进入临界区,低31位(0~30bit)统计读者个数。
+----+-------------------------------------------------+
| 31 | 30 0 |
+----+-------------------------------------------------+
| |
| +----> [0:30] Read Thread Counter
+-------------------------> [31] Write Thread Counter