MIT 6.S081 教材第六章内容 -- 锁 -- 上
- 引言
- 锁
- 竞态条件
- 代码:使用锁
- 死锁和锁排序
- 锁和中断处理函数
- 指令和内存访问排序
- 睡眠锁
- 真实世界
- 思考
引言
MIT 6.S081 2020 操作系统
本文为MIT 6.S081课程第六章教材内容翻译加整理。
本课程前置知识主要涉及:
- C语言(建议阅读C程序语言设计—第二版)
- RISC-V汇编
- 推荐阅读: 程序员的自我修养-装载,链接与库
锁
大多数内核,包括xv6,交错执行多个活动。交错的一个来源是多处理器硬件:计算机的多个CPU之间独立执行,如xv6的RISC-V。多个处理器共享物理内存,xv6利用共享(sharing)来维护所有CPU进行读写的数据结构。这种共享增加了一种可能性,即一个CPU读取数据结构,而另一个CPU正在更新它,甚至多个CPU同时更新相同的数据;如果不仔细设计,这种并行访问可能会产生不正确的结果或损坏数据结构。即使在单处理器上,内核也可能在许多线程之间切换CPU,导致它们的执行交错。最后,如果中断发生在错误的时间,设备中断处理程序修改与某些可中断代码相同的数据,可能导致数据损坏。单词并发(concurrency)是指由于多处理器并行、线程切换或中断,多个指令流交错的情况。
内核中充满了并发访问数据(concurrently-accessed data)。例如:
- 两个CPU可以同时调用
kalloc
,从而从空闲列表的头部弹出。
内核设计者希望允许大量的并发,因为这样可通过并行性提高性能,并提高响应能力。然而,结果是,尽管存在这种并发性,内核设
计者还是花费了大量的精力来使其正确运行。有许多方法可以得到正确的代码,有些方法比其他方法更容易。以并发下的正确性为目标的策略和支持它们的抽象称为并发控制技术(concurrency control techniques)。
Xv6使用了许多并发控制技术,这取决于不同的情况。本章重点介绍了一种广泛使用的技术:锁。锁提供了互斥,确保一次只有一个CPU可以持有锁。如果程序员将每个共享数据项关联一个锁,并且代码在使用一个数据项时总是持有相关联的锁,那么该项一次将只被一个CPU使用。在这种情况下,我们说锁保护数据项。尽管锁是一种易于理解的并发控制机制,但锁的缺点是它们会扼杀性能,因为它们会串行化并发操作。
本章的其余部分解释了为什么xv6需要锁,xv6如何实现它们,以及如何使用它们。
竞态条件
作为我们为什么需要锁的一个例子,考虑两个进程在两个不同的CPU上调用wait
。wait
会等待直到当前进程的某个子进程退出为止,子进程退出时会释放其内存。因此,在每个CPU上,内核将调用kfree
来释放子进程的页面。内核分配器维护一个链接列表:
kalloc()
(kernel/kalloc.c:69) 从空闲页面列表中取出(pop)一个内存页面;kfree()
(kernel/kalloc.c:47) 将一个内存页面添加(push)到空闲列表上。
为了获得最佳性能,我们可能希望两个父进程的kfree
可以并行执行,而不必等待另一个进程,但是考虑到xv6的kfree
实现,这将导致错误。
图6.1更详细地说明了这项设定:链表位于两个CPU共享的内存中,这两个CPU使用load
和store
指令操作链表。
(实际上,每个处理器都有cache,但从概念上讲,多处理器系统的行为就像所有CPU共享一块单独的内存一样)
如果没有并发请求,您可能以如下方式实现列表push操作:
struct element {
int data;
struct element *next;
};
struct element *list = 0;
void
push(int data)
{
struct element *l;
l = malloc(sizeof *l);
l->data = data;
l->next = list;
list = l;
}
如果存在隔离性,那么这个实现是正确的。但是,如果多个副本并发执行,代码就会出错。如果两个CPU同时执行push
,如图6.1所示,两个CPU都可能在执行第16行之前执行第15行,这会导致如图6.2所示的不正确的结果。然后会有两个类型为element
的列表元素使用next
指针设置为list
的前一个值。当两次执行位于第16行的对list
的赋值时,第二次赋值将覆盖第一次赋值;第一次赋值中涉及的元素将丢失。
第16行丢失的更新是竞态条件(race condition)的一个例子。竞态条件是指多个进程读写某些共享数据(至少有一个访问是写入)的情况。竞争通常包含bug,要么丢失更新(如果访问是写入的),要么读取未完成更新的数据结构。竞争的结果取决于进程在处理器运行的确切时机以及内存系统如何排序它们的内存操作,这可能会使竞争引起的错误难以复现和调试。例如,在调试push
时添加printf
语句可能会改变执行的时间,从而使竞争消失。
避免竞争的通常方法是使用锁。锁确保互斥,这样一次只有一个CPU可以执行push
中敏感的代码行;这使得上述情况不可能发生。上面代码的正确上锁版本只添加了几行(用黄色突出显示):
struct element {
int data;
struct element *next;
};
struct element *list = 0;
struct lock listlock;
void
push(int data)
{
struct element *l;
l = malloc(sizeof *l);
l->data = data;
acquire(&listlock);
l->next = list;
list = l;
release(&listlock);
}
acquire
和release
之间的指令序列通常被称为临界区域(critical section)。锁的作用通常被称为保护list
。
当我们说锁保护数据时,我们实际上是指锁保护适用于数据的某些不变量集合。不变量是跨操作维护的数据结构的属性。通常,操作的正确行为取决于操作开始时不变量是否为真。操作可能暂时违反不变量,但必须在完成之前重新建立它们。
- 例如,在链表的例子中,不变量是
list
指向列表中的第一个元素,以及每个元素的next
字段指向下一个元素。 push
的实现暂时违反了这个不变量:在第17行,l->next
指向list
(注:则此时list
不再指向列表中的第一个元素,即违反了不变量),但是list
还没有指向l
(在第18行重新建立)。- 我们上面检查的竞态条件发生了,因为第二个CPU执行了依赖于列表不变量的代码,而这些代码(暂时)被违反了。
- 正确使用锁可以确保每次只有一个CPU可以对临界区域中的数据结构进行操作,因此当数据结构的不变量不成立时,将没有其他CPU对数据结构执行操作。
您可以将锁视为串行化(serializing)并发的临界区域,以便同时只有一个进程在运行这部分代码,从而维护不变量(假设临界区域设定了正确的隔离性)。您还可以将由同一锁保护的临界区域视为彼此之间的原子,即彼此之间只能看到之前临界区域的完整更改集,而永远看不到部分完成的更新。
尽管正确使用锁可以改正不正确的代码,但锁限制了性能。
- 例如,如果两个进程并发调用
kfree
,锁将串行化这两个调用,我们在不同的CPU上运行它们没有任何好处。
如果多个进程同时想要相同的锁或者锁经历了争用,则称之为发生冲突(conflict)。内核设计中的一个主要挑战是避免锁争用。Xv6为此几乎没做任何工作,但是复杂的内核会精心设计数据结构和算法来避免锁的争用。在链表示例中,内核可能会为每个CPU维护一个空闲列表,并且只有当CPU的列表为空并且必须从另一个CPU挪用内存时才会触及另一个CPU的空闲列表。其他用例可能需要更复杂的设计。
锁的位置对性能也很重要。
- 例如,在
push
中把acquire
的位置提前也是正确的:将acquire
移动到第13行之前完全没问题。但这样对malloc
的调用也会被串行化,从而降低了性能。下面的《使用锁》一节提供了一些关于在哪里插入acquire
和release
调用的指导方针。
代码:使用锁
Xv6在许多地方使用锁来避免竞争条件(race conditions)。如上所述,kalloc
(kernel/kalloc.c:69)和kfree
(kernel/kalloc.c:47)就是一个很好的例子。尝试练习1和练习2,看看如果这些函数省略了锁会发生什么。你可能会发现很难触发不正确的行为,这表明很难可靠地测试代码是否经历了锁错误和竞争后被释放。xv6有一些竞争是有可能发生的。
使用锁的一个困难部分是决定要使用多少锁,以及每个锁应该保护哪些数据和不变量。有几个基本原则。
- 首先,任何时候可以被一个CPU写入,同时又可以被另一个CPU读写的变量,都应该使用锁来防止两个操作重叠。
- 其次,请记住锁保护不变量(invariants):如果一个不变量涉及多个内存位置,通常所有这些位置都需要由一个锁来保护,以确保不变量不被改变。
上面的规则说什么时候需要锁,但没有说什么时候不需要锁。为了提高效率,不要向太多地方上锁是很重要的,因为锁会降低并行性。如果并行性不重要,那么可以安排只拥有一个线程,而不用担心锁。一个简单的内核可以在多处理器上做到这一点,方法是拥有一个锁,这个锁必须在进入内核时获得,并在退出内核时释放(尽管如管道读取或wait
的系统调用会带来问题)。许多单处理器操作系统已经被转换为使用这种方法在多处理器上运行,有时被称为“大内核锁(big kernel lock)”,但是这种方法牺牲了并行性:
- 一次只能有一个CPU运行在内核中。如果内核做一些繁重的计算,使用一组更细粒度的锁的集合会更有效率,这样内核就可以同时在多个处理器上执行。
作为粗粒度锁的一个例子,xv6的kalloc.c分配器有一个由单个锁保护的空闲列表。如果不同CPU上的多个进程试图同时分配页面,每个进程在获得锁之前将必须在acquire
中自旋等待。自旋会降低性能,因为它只是无用的等待。如果对锁的争夺浪费了很大一部分CPU时间,也许可以通过改变分配器的设计来提高性能,使其拥有多个空闲列表,每个列表都有自己的锁,以允许真正的并行分配。
作为细粒度锁定的一个例子,xv6对每个文件都有一个单独的锁,这样操作不同文件的进程通常可以不需等待彼此的锁而继续进行。文件锁的粒度可以进一步细化,以允许进程同时写入同一个文件的不同区域。最终的锁粒度决策需要由性能测试和复杂性考量来驱动。
在后面的章节解释xv6的每个部分时,他们将提到xv6使用锁来处理并发的例子。作为预览,表6.3列出了xv6中的所有锁。
锁 | 描述 |
---|---|
bcache.lock | 保护块缓冲区缓存项(block buffer cache entries)的分配 |
cons.lock | 串行化对控制台硬件的访问,避免混合输出 |
ftable.lock | 串行化文件表中文件结构体的分配 |
icache.lock | 保护索引结点缓存项(inode cache entries)的分配 |
vdisk_lock | 串行化对磁盘硬件和DMA描述符队列的访问 |
kmem.lock | 串行化内存分配 |
log.lock | 串行化事务日志操作 |
管道的pi->lock | 串行化每个管道的操作 |
pid_lock | 串行化next_pid的增量 |
进程的p->lock | 串行化进程状态的改变 |
tickslock | 串行化时钟计数操作 |
索引结点的 ip->lock | 串行化索引结点及其内容的操作 |
缓冲区的b->lock | 串行化每个块缓冲区的操作 |
死锁和锁排序
如果在内核中执行的代码路径必须同时持有数个锁,那么所有代码路径以相同的顺序获取这些锁是很重要的。如果它们不这样做,就有死锁的风险。
- 假设xv6中的两个代码路径需要锁A和B,但是代码路径1按照先A后B的顺序获取锁,另一个路径按照先B后A的顺序获取锁。
- 假设线程T1执行代码路径1并获取锁A,线程T2执行代码路径2并获取锁B。
- 接下来T1将尝试获取锁B,T2将尝试获取锁A。
- 两个获取都将无限期阻塞,因为在这两种情况下,另一个线程都持有所需的锁,并且不会释放它,直到它的获取返回。
为了避免这种死锁,所有代码路径必须以相同的顺序获取锁。全局锁获取顺序的需求意味着锁实际上是每个函数规范的一部分:调用者必须以一种使锁按照约定顺序被获取的方式调用函数。
由于sleep
的工作方式(见第7章),Xv6有许多包含每个进程的锁(每个struct proc
中的锁)在内的长度为2的锁顺序链。
- 例如,
consoleintr
(kernel/console.c:138)是处理键入字符的中断例程。当换行符到达时,任何等待控制台输入的进程都应该被唤醒。 - 为此,
consoleintr
在调用wakeup
时持有cons.lock
,wakeup
获取等待进程的锁以唤醒它。 - 因此,全局避免死锁的锁顺序包括必须在任何进程锁之前获取
cons.lock
的规则。
文件系统代码包含xv6最长的锁链。
- 例如,创建一个文件需要同时持有目录上的锁、新文件inode上的锁、磁盘块缓冲区上的锁、磁盘驱动程序的
vdisk_lock
和调用进程的p->lock
。为了避免死锁,文件系统代码总是按照前一句中提到的顺序获取锁。
遵守全局死锁避免的顺序可能会出人意料地困难。有时锁顺序与逻辑程序结构相冲突:
- 例如,也许代码模块M1调用模块M2,但是锁顺序要求在M1中的锁之前获取M2中的锁。
有时锁的身份是事先不知道的,也许是因为必须持有一个锁才能发现下一个要获取的锁的身份。
- 这种情况在文件系统中出现,因为它在路径名称中查找连续的组件,也在
wait
和exit
代码中出现,因为它们在进程表中寻找子进程。
最后,死锁的危险通常是对细粒度锁定方案的限制,因为更多的锁通常意味着更多的死锁可能性。避免死锁的需求通常是内核实现中的一个主要因素。
锁和中断处理函数
一些xv6自旋锁保护线程和中断处理程序共用的数据。
- 例如,
clockintr
定时器中断处理程序在增加ticks
(kernel/trap.c:163)的同时内核线程可能在sys_sleep
(kernel/sysproc.c:64)中读取ticks
。 - 锁
tickslock
串行化这两个访问。
自旋锁和中断的交互引发了潜在的危险。假设sys_sleep
持有tickslock
,并且它的CPU被计时器中断中断。clockintr
会尝试获取tickslock
,意识到它被持有后等待释放。在这种情况下,tickslock
永远不会被释放:只有sys_sleep
可以释放它,但是sys_sleep
直到clockintr
返回前不能继续运行。所以CPU会死锁,任何需要锁的代码也会冻结。
为了避免这种情况,如果一个自旋锁被中断处理程序所使用,那么CPU必须保证在启用中断的情况下永远不能持有该锁。Xv6更保守:
- 当CPU获取任何锁时,xv6总是禁用该CPU上的中断。中断仍然可能发生在其他CPU上,此时中断的
acquire
可以等待线程释放自旋锁;由于不在同一CPU上,不会造成死锁。 - 当CPU未持有自旋锁时,xv6重新启用中断;
它必须做一些记录来处理嵌套的临界区域。acquire
调用push_off
(kernel/spinlock.c:89) 并且release
调用pop_off
(kernel/spinlock.c:100)来跟踪当前CPU上锁的嵌套级别。当计数达到零时,pop_off
恢复最外层临界区域开始时存在的中断使能状态。intr_off
和intr_on
函数执行RISC-V指令分别用来禁用和启用当前CPU上的中断。
嵌套的临界区域这里指的不是可重入锁,而是说当前进程获取锁A后,在临界区中又尝试去获取锁B,在锁B的临界区中去尝试获取锁C这种情况。
- xv6提供的自旋锁结构体
//spinlock.h
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held? 是否处于锁定状态
// For debugging:
char *name; // Name of lock. 锁的名字,方便debug
struct cpu *cpu; // The cpu holding the lock. 持有锁的CPU
};
- xv6提供的CPU结构体
// Per-CPU state.
struct cpu {
struct proc *proc; // The process running on this cpu, or null. 正在cpu上跑的进程
struct context context; // swtch() here to enter scheduler(). 进程调度的上下文保存地址
int noff; // Depth of push_off() nesting. 重入次数
int intena; // Were interrupts enabled before push_off()? 重入前,中断是否开启
};
- 初始化锁
void
initlock(struct spinlock *lk, char *name)
{
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
- 中断关闭加锁嵌套获取计数+1
void
push_off(void)
{
//执行push_off前的中断状态
int old = intr_get();
//关闭当前CPU上的中断
intr_off();
//只会在第一次最外层区域获取锁时进行记录
if(mycpu()->noff == 0)
mycpu()->intena = old;
//设置锁嵌套获取次数+1
mycpu()->noff += 1;
}
- 开中断加锁嵌套重入次数-1
void
pop_off(void)
{
struct cpu *c = mycpu();
//锁持有期间,当前CPU上的中断需要是关闭状态
if(intr_get())
panic("pop_off - interruptible");
//锁嵌套获取次数小于1,错误,至少应该为1
if(c->noff < 1)
panic("pop_off");
//锁嵌套获取次数-1
c->noff -= 1;
//锁彻底被释放的时候,打开中断
if(c->noff == 0 && c->intena)
intr_on();
}
- 判断当前CPU是否持有锁
// Check whether this cpu is holding the lock.
// Interrupts must be off.
int
holding(struct spinlock *lk)
{
int r;
r = (lk->locked && lk->cpu == mycpu());
return r;
}
- 获取锁
// Acquire the lock.
// Loops (spins) until the lock is acquired.
void
acquire(struct spinlock *lk)
{
// 锁嵌套获取次数+1,关中断
push_off(); // disable interrupts to avoid deadlock.
// 不支持可重入锁
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
// CAS自旋锁
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
//记录持有锁的CPU
lk->cpu = mycpu();
}
- 释放锁
// Release the lock.
void
release(struct spinlock *lk)
{
//释放锁的前提是持有锁
if(!holding(lk))
panic("release");
//清空绑定关系
lk->cpu = 0;
// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);
//锁嵌套计数-1并且打开中断
pop_off();
}
严格的在设置lk->locked
(kernel/spinlock.c:28)之前让acquire
调用push_off
是很重要的。如果两者颠倒,会存在一个既持有锁又启用了中断的短暂窗口期,不幸的话定时器中断会使系统死锁。同样,只有在释放锁之后,release
才调用pop_off
也是很重要的(kernel/spinlock.c:66)。
尝试获取锁前,确保中断已经关闭,释放锁前,确保中断还未打开。
指令和内存访问排序
人们很自然地会想到程序是按照源代码语句出现的顺序执行的。然而,许多编译器和中央处理器为了获得更高的性能而不按顺序执行代码。如果一条指令需要许多周期才能完成,中央处理器可能会提前发出指令,这样它就可以与其他指令重叠,避免中央处理器停顿。
- 例如,中央处理器可能会注意到在顺序指令序列A和B中彼此不存在依赖。CPU也许首先启动指令B,或者是因为它的输入先于A的输入准备就绪,或者是为了重叠执行A和B。
- 编译器可以执行类似的重新排序,方法是在源代码中一条语句的指令发出之前,先发出另一条语句的指令。
编译器和CPU在重新排序时需要遵循一定规则,以确保它们不会改变正确编写的串行代码的结果。然而,规则确实允许重新排序后改变并发代码的结果,并且很容易导致多处理器上的不正确行为。CPU的排序规则称为内存模型(memory model)
。
例如,在push
的代码中,如果编译器或CPU将对应于第4行的存储指令移动到第6行release
后的某个地方,那将是一场灾难:
l = malloc(sizeof *l);
l->data = data;
acquire(&listlock);
l->next = list;
list = l;
release(&listlock);
如果发生这样的重新排序,将会有一个窗口期,另一个CPU可以获取锁并查看更新后的list
,但却看到一个未初始化的list->next
。
为了告诉硬件和编译器不要执行这样的重新排序,xv6在acquire
(kernel/spinlock.c:22) 和release
(kernel/spinlock.c:47)中都使用了__sync_synchronize()
。
__sync_synchronize()
是一个内存障碍:它告诉编译器和CPU不要跨障碍重新排序load
或store
指令。- 因为xv6在访问共享数据时使用了锁,xv6的
acquire
和release
中的障碍在几乎所有重要的情况下都会强制顺序执行。第9章讨论了一些例外。
睡眠锁
有时xv6需要长时间保持锁。
- 例如,文件系统(第8章)在磁盘上读写文件内容时保持文件锁定,这些磁盘操作可能需要几十毫秒。
如果另一个进程想要获取自旋锁,那么长时间保持自旋锁会导致获取进程在自旋时浪费很长时间的CPU。自旋锁的另一个缺点是,一个进程在持有自旋锁的同时不能让出(yield)CPU,然而我们希望持有锁的进程等待磁盘I/O的时候其他进程可以使用CPU。
持有自旋锁时让步是非法的,因为如果第二个线程试图获取自旋锁,就可能导致死锁:因为acquire
不会让出CPU,第二个线程的自旋可能会阻止第一个线程运行并释放锁。在持有锁时让步也违反了在持有自旋锁时中断必须关闭的要求。因此,我们想要一种锁,它在等待获取锁时让出CPU,并允许在持有锁时让步(以及中断)。
Xv6以睡眠锁(sleep-locks)的形式提供了这种锁。acquiresleep
(kernel/sleeplock.c:22) 在等待时让步CPU,使用的技术将在第7章中解释。在更高层次上,睡眠锁有一个被自旋锁保护的锁定字段,acquiresleep
对sleep
的调用原子地让出CPU并释放自旋锁。结果是其他线程可以在acquiresleep
等待时执行。
- xv6提供的睡眠锁结构体
// Long-term locks for processes
struct sleeplock {
uint locked; // Is the lock held? 锁是否被持有
struct spinlock lk; // spinlock protecting this sleep lock 自旋锁来保证locked读写的原子性
// For debugging:
char *name; // Name of lock.
int pid; // Process holding lock 持有锁的进程
};
- 初始化睡眠锁
void
initsleeplock(struct sleeplock *lk, char *name)
{
initlock(&lk->lk, "sleep lock");
lk->name = name;
lk->locked = 0;
lk->pid = 0;
}
- 当前进程是否持有睡眠锁
int
holdingsleep(struct sleeplock *lk)
{
int r;
// 使用睡眠锁内部的自旋锁保护对locked变量的读写
acquire(&lk->lk);
r = lk->locked && (lk->pid == myproc()->pid);
release(&lk->lk);
return r;
}
- 获取睡眠锁
void
acquiresleep(struct sleeplock *lk)
{
//使用内部的自旋锁保护对Locked变量的读写
acquire(&lk->lk);
// 被唤醒时,需要检查条件变量是否成立,不成立继续睡眠
while (lk->locked) {
// 在lk条件变量上进行睡眠
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}
- 将当前进程挂起在chan条件变量上
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
// 释放sleepLock内部的自旋锁,获取当前进程锁
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
// 设置当前进程所休眠的条件变量
p->chan = chan;
// 更改进程状态
p->state = SLEEPING;
// 让出CPU
sched();
// Tidy up.
// 唤醒后,首先清除睡眠的条件变量设置
p->chan = 0;
// Reacquire original lock.
// 获取sleeplock内部的自旋锁,同时释放进程lock
if(lk != &p->lock){
release(&p->lock);
acquire(lk);
}
}
- 释放睡眠锁
void
releasesleep(struct sleeplock *lk)
{
//使用内部的自旋锁保护对locked变量的读写
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
//唤醒睡眠在lk条件变量上的进程
wakeup(lk);
release(&lk->lk);
}
- 唤醒在chan条件变量上挂起的进程
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
// 因为需要更改进程状态,所以需要获取进程锁
acquire(&p->lock);
// 唤醒在chan条件变量上挂起的进程
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
因为睡眠锁保持中断使能,所以它们不能用在中断处理程序中。因为acquiresleep
可能会让出CPU,所以睡眠锁不能在自旋锁临界区域中使用(尽管自旋锁可以在睡眠锁临界区域中使用)。
因为等待会浪费CPU时间,所以自旋锁最适合短的临界区域;睡眠锁对于冗长的操作效果很好。
真实世界
尽管对并发原语和并行性进行了多年的研究,但使用锁进行编程仍然具有挑战性。通常最好将锁隐藏在更高级别的结构中,如同步队列,尽管xv6没有这样做。如果您使用锁进行编程,明智的做法是使用试图识别竞争条件(race conditions)的工具,因为很容易错过需要锁的不变量。
大多数操作系统都支持POSIX线程(Pthread),它允许一个用户进程在不同的CPU上同时运行几个线程。Pthread支持用户级锁(user-level locks)、障碍(barriers)等。支持Pthread需要操作系统的支持。
- 例如,应该是这样的情况,如果一个Pthread在系统调用中阻塞,同一进程的另一个Pthread应当能够在该CPU上运行。
- 另一个例子是,如果一个线程改变了其进程的地址空间(例如,映射或取消映射内存),内核必须安排运行同一进程下的线程的其他CPU更新其硬件页表,以反映地址空间的变化。
没有原子指令实现锁是可能的,但是代价昂贵,并且大多数操作系统使用原子指令。
如果许多CPU试图同时获取相同的锁,可能会付出昂贵的开销。如果一个CPU在其本地cache中缓存了一个锁,而另一个CPU必须获取该锁,那么更新保存该锁的cache行的原子指令必须将该行从一个CPU的cache移动到另一个CPU的cache中,并且可能会使cache行的任何其他副本无效。从另一个CPU的cache中获取cache行可能比从本地cache中获取一行的代价要高几个数量级。
为了避免与锁相关的开销,许多操作系统使用无锁的数据结构和算法。
- 例如,可以实现一个像本章开头那样的链表,在列表搜索期间不需要锁,并且使用一个原子指令在一个列表中插入一个条目。然而,无锁编程比有锁编程更复杂;例如,人们必须担心指令和内存重新排序。有锁编程已经很难了,所以xv6避免了无锁编程的额外复杂性。
思考
- 注释掉在
kalloc
中对acquire
和release
的调用。这似乎会给调用kalloc
的内核代码带来问题;你希望看到什么症状?当你运行xv6时,你看到这些症状了吗?运行usertests
时呢?如果你没有看到问题是为什么呢?看看你是否可以通过在kalloc
的临界区域插入虚拟循环来引发问题。 - 假设您将
kfree
中的锁注释掉(在kalloc
中恢复锁之后)。现在可能会出什么问题?kfree
中缺少锁比kalloc
中缺少锁的危害小吗? - 如果两个CPU同时调用
kalloc
,则其中一个不得不等待另一个,这对性能不利。修改kalloc.c以具有更多的并行性,这样不同CPU对kalloc
的同时调用就可以进行,而不需要相互等待。 - 使用POSIX线程编写一个并行程序,大多数操作系统都支持这种程序。例如,实现一个并行哈希表,并测量puts/gets的数量是否随着内核数量的增加而缩放。
- 在xv6中实现Pthread的一个子集。也就是说,实现一个用户级线程库,这样一个用户进程可以有1个以上的线程,并安排这些线程可以在不同的CPU上并行运行。想出一个正确处理线程发出阻塞系统调用并改变其共享地址空间的方案。