MIT 6.S081 教材第七章内容 -- 调度 -- 下
- 引言
- 调度
- 多路复用
- 代码:上下文切换
- 代码:调度
- 代码:mycpu和myproc
- sleep与wakeup
- 代码:sleep和wakeup
- 代码:Pipes
- 代码:wait, exit和kill
- 真实世界
- 练习
引言
MIT 6.S081 2020 操作系统
本文为MIT 6.S081课程第七章教材内容翻译加整理。
本课程前置知识主要涉及:
- C语言(建议阅读C程序语言设计—第二版)
- RISC-V汇编
- 推荐阅读: 程序员的自我修养-装载,链接与库
前面两节整理了调度小节课程上所讲内容,本节将对应教材章节内容进行整理(相关代码可能不会给出,大家可以参考前面两节配合食用)。
调度
任何操作系统都可能运行比CPU数量更多的进程,所以需要一个进程间分时共享CPU的方案。这种共享最好对用户进程透明。一种常见的方法是,通过将进程多路复用到硬件CPU上,使每个进程产生一种错觉,即它有自己的虚拟CPU。本章解释了XV6如何实现这种多路复用。
多路复用
Xv6通过在两种情况下将每个CPU从一个进程切换到另一个进程来实现多路复用(Multiplexing):
- 第一:当进程等待设备或管道I/O完成,或等待子进程退出,或在
sleep
系统调用中等待时,xv6使用睡眠(sleep)和唤醒(wakeup)机制切换。 - 第二:xv6周期性地强制切换以处理长时间计算而不睡眠的进程。
这种多路复用产生了每个进程都有自己的CPU的错觉,就像xv6使用内存分配器和硬件页表来产生每个进程都有自己内存的错觉一样。
实现多路复用带来了一些挑战:
- 首先,如何从一个进程切换到另一个进程?尽管上下文切换的思想很简单,但它的实现是xv6中最不透明的代码之一。
- 第二,如何以对用户进程透明的方式强制切换?Xv6使用标准技术,通过定时器中断驱动上下文切换。
- 第三,许多CPU可能同时在进程之间切换,使用一个用锁方案来避免争用是很有必要的。
- 第四,进程退出时必须释放进程的内存以及其他资源,但它不能自己完成所有这一切,因为(例如)它不能在仍然使用自己内核栈的情况下释放它。
- 第五,多核机器的每个核心必须记住它正在执行哪个进程,以便系统调用正确影响对应进程的内核状态。
- 最后,
sleep
允许一个进程放弃CPU,wakeup
允许另一个进程唤醒第一个进程。需要小心避免导致唤醒通知丢失的竞争。
Xv6试图尽可能简单地解决这些问题,但结果代码很复杂。
代码:上下文切换
图7.1概述了从一个用户进程(旧进程)切换到另一个用户进程(新进程)所涉及的步骤:
- 一个到旧进程内核线程的用户-内核转换(系统调用或中断)
- 一个到当前CPU调度程序线程的上下文切换
- 一个到新进程内核线程的上下文切换
- 以及一个返回到用户级进程的陷阱。
调度程序在旧进程的内核栈上执行是不安全的:
- 其他一些核心可能会唤醒进程并运行它,而在两个不同的核心上使用同一个栈将是一场灾难,因此xv6调度程序在每个CPU上都有一个专用线程(保存寄存器和栈)。
多核情况下,如果同时有多个进程都由任务切换需求,那么就会产生两个不同核心上运行的调度程序使用同一个栈的问题。
在本节中,我们将研究在内核线程和调度程序线程之间切换的机制。
从一个线程切换到另一个线程需要保存旧线程的CPU寄存器,并恢复新线程先前保存的寄存器;栈指针和程序计数器被保存和恢复的事实意味着CPU将切换栈和执行中的代码。
函数swtch
为内核线程切换执行保存和恢复操作。swtch
对线程没有直接的了解;它只是保存和恢复寄存器集,称为上下文(contexts)。当某个进程要放弃CPU时,该进程的内核线程调用swtch
来保存自己的上下文并返回到调度程序的上下文。每个上下文都包含在一个struct context
(kernel/proc.h:2)中,这个结构体本身包含在一个进程的struct proc
或一个CPU的struct cpu
中。Swtch
接受两个参数:struct context *old
和struct context *new
。它将当前寄存器保存在old
中,从new
中加载寄存器,然后返回。
让我们跟随一个进程通过swtch
进入调度程序。我们在第4章中看到,中断结束时的一种可能性是usertrap
调用了yield
。依次地:
Yield
调用sched
,sched
调用swtch
将当前上下文保存在p->context
中,并切换到先前保存在cpu->scheduler
(kernel/proc.c:517)中的调度程序上下文。
注:当前版本的XV6中调度程序上下文是
cpu->context
Swtch
(kernel/swtch.S:3)只保存被调用方保存的寄存器(callee-saved registers);调用方保存的寄存器(caller-saved registers)通过调用C代码保存在栈上(如果需要)。Swtch
知道struct context
中每个寄存器字段的偏移量。它不保存程序计数器。但swtch
保存ra
寄存器,该寄存器保存调用swtch
的返回地址。现在,swtch
从新进程的上下文中恢复寄存器,该上下文保存前一个swtch
保存的寄存器值。当swtch
返回时,它返回到由ra
寄存器指定的指令,即新线程以前调用swtch
的指令。另外,它在新线程的栈上返回。
注:关于callee-saved registers和caller-saved registers请回看视频课程LEC5以及文档《Calling Convention》
在我们的示例中,sched
调用swtch
切换到cpu->scheduler
,即每个CPU的调度程序上下文。调度程序上下文之前通过scheduler
对swtch
(kernel/proc.c:475)的调用进行了保存。当我们追踪swtch
到返回时,他返回到scheduler
而不是sched
,并且它的栈指针指向当前CPU的调用程序栈(scheduler stack)。
以cc程序切换到ls程序为例,且ls此前运行过:
- XV6将cc程序的内核线程的内核寄存器保存在一个context对象中
- 因为要切换到ls程序的内核线程,那么ls 程序现在的状态必然是RUNABLE ,表明ls程序之前运行了一半。这同时也意味着:
- ls程序的用户空间状态已经保存在了对应的trapframe中
- ls程序的内核线程对应的内核寄存器已经保存在对应的context对象中
- 所以接下来,XV6会恢复ls程序的内核线程的context对象,也就是恢复内核线程的寄存器。
- 之后ls会继续在它的内核线程栈上,完成它的中断处理程序
- 恢复ls程序的trapframe中的用户进程状态,返回到用户空间的ls程序中
- 最后恢复执行ls
代码:调度
上一节介绍了swtch
的底层细节;现在,让我们以swtch
为给定对象,检查从一个进程的内核线程通过调度程序切换到另一个进程的情况。调度器(scheduler)以每个CPU上一个特殊线程的形式存在,每个线程都运行scheduler
函数。此函数负责选择下一个要运行的进程。
想要放弃CPU的进程必须先获得自己的进程锁p->lock
,并释放它持有的任何其他锁,更新自己的状态(p->state
),然后调用sched
。Yield
(kernel/proc.c:515)遵循这个约定,sleep
和exit
也遵循这个约定,我们将在后面进行研究。Sched
对这些条件再次进行检查(kernel/proc.c:499-504),并检查这些条件的隐含条件:
- 由于锁被持有,中断应该被禁用。
最后,sched
调用swtch
将当前上下文保存在p->context
中,并切换到cpu->scheduler
中的调度程序上下文。Swtch
在调度程序的栈上返回,就像是scheduler
的swtch
返回一样。scheduler
继续for
循环,找到要运行的进程,切换到该进程,重复循环。
我们刚刚看到,xv6在对swtch
的调用中持有p->lock
:
swtch
的调用者必须已经持有了锁,并且锁的控制权传递给切换到的代码。
这种约定在锁上是不寻常的;通常,获取锁的线程还负责释放锁,这使得对正确性进行推理更加容易。对于上下文切换,有必要打破这个惯例,因为p->lock
保护进程state
和context
字段上的不变量,而这些不变量在swtch
中执行时不成立。如果在swtch
期间没有保持p->lock
,可能会出现一个问题:
- 在
yield
将其状态设置为RUNNABLE
之后,但在swtch
使其停止使用自己的内核栈之前,另一个CPU可能会决定运行该进程。结果将是两个CPU在同一栈上运行,这不可能是正确的。
内核线程总是在sched
中放弃其CPU,并总是切换到调度程序中的同一位置,而调度程序(几乎)总是切换到以前调用sched
的某个内核线程。因此,如果要打印xv6切换线程处的行号,将观察到以下简单模式:(kernel/proc.c:475),(kernel/proc.c:509),(kernel/proc.c:475),(kernel/proc.c:509)等等。在两个线程之间进行这种样式化切换的过程有时被称为协程(coroutines);在本例中,sched
和scheduler
是彼此的协同程序。
存在一种情况使得调度程序对swtch
的调用没有以sched
结束。一个新进程第一次被调度时,它从forkret
(kernel/proc.c:527)开始。Forkret
存在以释放p->lock
;否则,新进程可以从usertrapret
开始。
这部分内容在前面两节都给出了源码解读,不清楚的可以回顾前面两小节
scheduler
(kernel/proc.c:457)运行一个简单的循环:
- 找到要运行的进程,运行它直到它让步,然后重复循环。
scheduler
在进程表上循环查找可运行的进程,该进程具有p->state == RUNNABLE
。一旦找到一个进程,它将设置CPU当前进程变量c->proc
,将该进程标记为RUNINING
,然后调用swtch
开始运行它(kernel/proc.c:470-475)。
考虑调度代码结构的一种方法是,它为每个进程强制维持一个不变量的集合,并在这些不变量不成立时持有p->lock
。
- 其中一个不变量是:如果进程是
RUNNING
状态,计时器中断的yield
必须能够安全地从进程中切换出去;这意味着CPU寄存器必须保存进程的寄存器值(即swtch
没有将它们移动到context
中),并且c->proc
必须指向进程。 - 另一个不变量是:如果进程是
RUNNABLE
状态,空闲CPU的调度程序必须安全地运行它;这意味着p->context
必须保存进程的寄存器(即,它们实际上不在实际寄存器中),没有CPU在进程的内核栈上执行,并且没有CPU的c->proc
引用进程。
请注意,在保持p->lock
时,这些属性通常不成立。
维护上述不变量是xv6经常在一个线程中获取p->lock
并在另一个线程中释放它的原因,例如在yield
中获取并在scheduler
中释放。一旦yield
开始修改一个RUNNING
进程的状态为RUNNABLE
,锁必须保持被持有状态,直到不变量恢复:最早的正确释放点是scheduler
(在其自身栈上运行)清除c->proc
之后。类似地,一旦scheduler
开始将RUNNABLE
进程转换为RUNNING
,在内核线程完全运行之前(在swtch
之后,例如在yield
中)绝不能释放锁。
p->lock
还保护其他东西:exit
和wait
之间的相互作用,避免丢失wakeup
的机制(参见第7.5节),以及避免一个进程退出和其他进程读写其状态之间的争用(例如,exit
系统调用查看p->pid
并设置p->killed
(kernel/proc.c:611))。为了清晰起见,也许为了性能起见,有必要考虑一下p->lock
的不同功能是否可以拆分。
代码:mycpu和myproc
Xv6通常需要指向当前进程的proc
结构体的指针。在单处理器系统上,可以有一个指向当前proc
的全局变量。但这不能用于多核系统,因为每个核执行的进程不同。解决这个问题的方法是基于每个核心都有自己的寄存器集,从而使用其中一个寄存器来帮助查找每个核心的信息。
Xv6为每个CPU维护一个struct cpu
,它记录当前在该CPU上运行的进程(如果有的话),为CPU的调度线程保存寄存器,以及管理中断禁用所需的嵌套自旋锁的计数。函数mycpu
(kernel/proc.c:60)返回一个指向当前CPU的struct cpu
的指针。RISC-V给它的CPU编号,给每个CPU一个hartid
。Xv6确保每个CPU的hartid
在内核中存储在该CPU的tp
寄存器中。这允许mycpu
使用tp
对一个cpu结构体数组(即cpus
数组,kernel/proc.c:9)进行索引,以找到正确的那个。
确保CPU的tp
始终保存CPU的hartid
有点麻烦。mstart
在CPU启动次序的早期设置tp
寄存器,此时仍处于机器模式(kernel/start.c:46)。因为用户进程可能会修改tp
,usertrapret
在蹦床页面(trampoline page)中保存tp
。最后,uservec
在从用户空间(kernel/trampoline.S:70)进入内核时恢复保存的tp
。编译器保证永远不会使用tp
寄存器。如果RISC-V允许xv6直接读取当前hartid会更方便,
但这只允许在机器模式下,而不允许在管理模式下。
cpuid
和mycpu
的返回值很脆弱:如果定时器中断并导致线程让步(yield),然后移动到另一个CPU,以前返回的值将不再正确。为了避免这个问题,xv6要求调用者禁用中断,并且只有在使用完返回的struct cpu
后才重新启用。
函数myproc
(kernel/proc.c:68)返回当前CPU上运行进程struct proc
的指针。myproc
禁用中断,调用mycpu
,从struct cpu
中取出当前进程指针(c->proc
),然后启用中断。即使启用中断,myproc
的返回值也可以安全使用:如果计时器中断将调用进程移动到另一个CPU,其struct proc
指针不会改变。
// Must be called with interrupts disabled,
// to prevent race with process being moved
// to a different CPU.
int
cpuid()
{
int id = r_tp();
return id;
}
// Return this CPU's cpu struct.
// Interrupts must be disabled.
struct cpu*
mycpu(void) {
int id = cpuid();
struct cpu *c = &cpus[id];
return c;
}
// Return the current struct proc *, or zero if none.
struct proc*
myproc(void) {
push_off();
struct cpu *c = mycpu();
struct proc *p = c->proc;
pop_off();
return p;
}
函数调用者调用mycpu函数获取当前cpu对应结构体时,需要确保此时全局中断时关闭的状态,并且在使用完毕返回的cpu后,才会开启全局中断,例如: myproc函数实现中那样。
例如: sched函数也会调用mycpu,但是sched函数是执行在全局中断关闭的前提下的。
sleep与wakeup
调度和锁有助于隐藏一个进程对另一个进程的存在,但到目前为止,我们还没有帮助进程进行有意交互的抽象。为解决这个问题已经发明了许多机制。Xv6使用了一种称为sleep
和wakeup
的方法,它允许一个进程在等待事件时休眠,而另一个进程在事件发生后将其唤醒。睡眠和唤醒通常被称为序列协调(sequence coordination)或条件同步机制(conditional synchronization mechanisms)。
为了说明,让我们考虑一个称为信号量(semaphore)的同步机制,它可以协调生产者和消费者。信号量维护一个计数并提供两个操作。“V”操作(对于生产者)增加计数。“P”操作(对于使用者)等待计数为非零,然后递减并返回。如果只有一个生产者线程和一个消费者线程,并且它们在不同的CPU上执行,并且编译器没有进行过积极的优化,那么此实现将是正确的:
struct semaphore {
struct spinlock lock;
int count;
};
void V(struct semaphore* s) {
acquire(&s->lock);
s->count += 1;
release(&s->lock);
}
void P(struct semaphore* s) {
while (s->count == 0)
;
acquire(&s->lock);
s->count -= 1;
release(&s->lock);
}
上面的实现代价昂贵。如果生产者很少采取行动,消费者将把大部分时间花在while
循环中,希望得到非零计数。消费者的CPU可以找到比通过反复轮询s->count
繁忙等待更有成效的工作。要避免繁忙等待,消费者需要一种方法来释放CPU,并且只有在V
增加计数后才能恢复。
这是朝着这个方向迈出的一步,尽管我们将看到这是不够的。让我们想象一对调用,sleep
和wakeup
,工作流程如下。Sleep(chan)
在任意值chan
上睡眠,称为等待通道(wait channel)。Sleep
将调用进程置于睡眠状态,释放CPU用于其他工作。Wakeup(chan)
唤醒所有在chan
上睡眠的进程(如果有),使其sleep
调用返回。如果没有进程在chan
上等待,则wakeup
不执行任何操作。我们可以将信号量实现更改为使用sleep
和wakeup
(更改的行添加了注释):
void V(struct semaphore* s) {
acquire(&s->lock);
s->count += 1;
wakeup(s); // !pay attention
release(&s->lock);
}
void P(struct semaphore* s) {
while (s->count == 0)
sleep(s); // !pay attention
acquire(&s->lock);
s->count -= 1;
release(&s->lock);
}
P
现在放弃CPU而不是自旋,这很好。然而,事实证明,使用此接口设计sleep
和wakeup
而不遭受所谓的丢失唤醒(lost wake-up)问题并非易事。假设P
在第9行发现s->count==0
。当P
在第9行和第10行之间时,V
在另一个CPU上运行:它将s->count
更改为非零,并调用wakeup
,这样就不会发现进程处于休眠状态,因此不会执行任何操作。现在P继续在第10行执行:它调用sleep
并进入睡眠。这会导致一个问题:P
正在休眠,等待调用V
,而V
已经被调用。除非我们运气好,生产者再次呼叫V
,否则消费者将永远等待,即使count
为非零。
这个问题的根源是V
在错误的时刻运行,违反了P
仅在s->count==0
时才休眠的不变量。保护不变量的一种不正确的方法是将锁的获取(下面以黄色突出显示)移动到P
中,以便其检查count
和调用sleep
是原子的:
void V(struct semaphore* s) {
acquire(&s->lock);
s->count += 1;
wakeup(s);
release(&s->lock);
}
void P(struct semaphore* s) {
acquire(&s->lock); // !pay attention
while (s->count == 0)
sleep(s);
s->count -= 1;
release(&s->lock);
}
人们可能希望这个版本的P
能够避免丢失唤醒,因为锁阻止V
在第10行和第11行之间执行。它确实这样做了,但它会导致死锁:P
在睡眠时持有锁,因此V
将永远阻塞等待锁。
我们将通过更改sleep
的接口来修复前面的方案:调用方必须将条件锁(condition lock)传递给sleep,以便在调用进程被标记为asleep并在睡眠通道上等待后sleep
可以释放锁。如果有一个并发的V
操作,锁将强制它在P
将自己置于睡眠状态前一直等待,因此wakeup
将找到睡眠的消费者并将其唤醒。一旦消费者再次醒来,sleep
会在返回前重新获得锁。我们新的正确的sleep/wakeup
方案可用如下(更改以黄色突出显示):
void V(struct semaphore* s) {
acquire(&s->lock);
s->count += 1;
wakeup(s);
release(&s->lock);
}
void P(struct semaphore* s) {
acquire(&s->lock);
while (s->count == 0)
sleep(s, &s->lock); // !pay attention
s->count -= 1;
release(&s->lock);
}
P
持有s->lock
的事实阻止V
在P
检查s->count
和调用sleep
之间试图唤醒它。然而请注意,我们需要sleep
释放s->lock
并使消费者进程进入睡眠状态的操作是原子的。
为了防止lose wakeup现象产生,我们需要将对条件变量是否成立的判断移到加锁的区域中,同时我们需要在进入sleep前释放锁,再被wakeup后,首先尝试去获取锁。
代码:sleep和wakeup
让我们看看sleep
(kernel/proc.c:548)和wakeup
(kernel/proc.c:582)的实现。其基本思想是让sleep
将当前进程标记为SLEEPING
,然后调用sched
释放CPU;wakeup
查找在给定等待通道上休眠的进程,并将其标记为RUNNABLE
。sleep
和wakeup
的调用者可以使用任何相互间方便的数字作为通道。Xv6通常使用等待过程中涉及的内核数据结构的地址。
sleep
获得p->lock
(kernel/proc.c:559)。要进入睡眠的进程现在同时持有p->lock
和lk
。在调用者(示例中为P
)中持有lk
是必要的:它确保没有其他进程(在示例中指一个运行的V
)可以启动wakeup(chan)
调用。既然sleep
持有p->lock
,那么释放lk
是安全的:其他进程可能会启动对wakeup(chan)
的调用,但是wakeup
将等待获取p->lock
,因此将等待sleep
把进程置于睡眠状态的完成,以防止wakeup
错过sleep
。
还有一个小问题:如果lk
和p->lock
是同一个锁,那么如果sleep
试图获取p->lock
就会自身死锁。但是,如果调用sleep
的进程已经持有p->lock
,那么它不需要做更多的事情来避免错过并发的wakeup
。当wait
(kernel/proc.c:582)持有p->lock
调用sleep
时,就会出现这种情况。
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
//如果`lk`和`p->lock`是同一个锁,那么如果`sleep`试图获取`p->lock`就会自身死锁
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &p->lock){
release(&p->lock);
acquire(lk);
}
}
由于sleep
只持有p->lock
而无其他锁,它可以通过记录睡眠通道、将进程状态更改为SLEEPING
并调用sched
(kernel/proc.c:564-567)将进程置于睡眠状态。过一会儿,我们就会明白为什么在进程被标记为SLEEPING
之前不将p->lock
释放(由scheduler
)是至关重要的。
在某个时刻,一个进程将获取条件锁,设置睡眠者正在等待的条件,并调用wakeup(chan)
。在持有状态锁时调用wakeup
非常重要。wakeup
遍历进程表(kernel/proc.c:582)。它获取它所检查的每个进程的p->lock
,这既是因为它可能会操纵该进程的状态,也是因为p->lock
确保sleep
和wakeup
不会彼此错过。当wakeup
发现一个SLEEPING
的进程且chan
相匹配时,它会将该进程的状态更改为RUNNABLE
。调度器下次运行时,将看到进程已准备好运行。
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
注:严格地说,
wakeup
只需跟在acquire
之后就足够了(也就是说,可以在release
之后调用wakeup
)
为什么sleep
和wakeup
的用锁规则能确保睡眠进程不会错过唤醒?
- 休眠进程从检查条件之前的某处到标记为休眠之后的某处,要么持有条件锁,要么持有其自身的
p->lock
或同时持有两者。调用wakeup
的进程在wakeup
的循环中同时持有这两个锁。因此,要么唤醒器(waker)在消费者线程检查条件之前使条件为真;要么唤醒器的wakeup
在睡眠线程标记为SLEEPING
后对其进行严格检查。然后wakeup
将看到睡眠进程并将其唤醒(除非有其他东西首先将其唤醒)。
有时,多个进程在同一个通道上睡眠:
- 例如,多个进程读取同一个管道。一个单独的
wakeup
调用就能把他们全部唤醒。其中一个将首先运行并获取与sleep
一同调用的锁,并且(在管道例子中)读取在管道中等待的任何数据。尽管被唤醒,其他进程将发现没有要读取的数据。从他们的角度来看,醒来是“虚假的”,他们必须再次睡眠。因此,在检查条件的循环中总是调用sleep
。
如果两次使用sleep/wakeup
时意外选择了相同的通道,则不会造成任何伤害:它们将看到虚假的唤醒,但如上所述的循环将容忍此问题。sleep/wakeup
的魅力在于它既轻量级(不需要创建特殊的数据结构来充当睡眠通道),又提供了一层抽象(调用者不需要知道他们正在与哪个特定进程进行交互)。
代码:Pipes
使用睡眠和唤醒来同步生产者和消费者的一个更复杂的例子是xv6的管道实现。我们在第1章中看到了管道接口:写入管道一端的字节被复制到内核缓冲区,然后可以从管道的另一端读取。以后的章节将研究围绕管道的文件描述符支持,但现在让我们看看pipewrite
和piperead
的实现。
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
每个管道都由一个struct pipe
表示,其中包含一个锁lock
和一个数据缓冲区data
。字段nread
和nwrite
统计从缓冲区读取和写入缓冲区的总字节数。缓冲区是环形的:在buf[PIPESIZE-1]
之后写入的下一个字节是buf[0]
。而计数不是环形。此约定允许实现区分完整缓冲区(nwrite==nread+PIPESIZE
)和空缓冲区(nwrite==nread
),但这意味着对缓冲区的索引必须使用buf[nread%PIPESIZE]
,而不仅仅是buf[nread]
(对于nwrite
也是如此)。
让我们假设对piperead
和pipewrite
的调用同时发生在两个不同的CPU上。
int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
int i;
char ch;
struct proc *pr = myproc();
acquire(&pi->lock);
for(i = 0; i < n; i++){
while(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full
if(pi->readopen == 0 || pr->killed){
release(&pi->lock);
return -1;
}
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
}
if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
break;
pi->data[pi->nwrite++ % PIPESIZE] = ch;
}
wakeup(&pi->nread);
release(&pi->lock);
return i;
}
Pipewrite
(kernel/pipe.c:77)从获取管道锁开始,它保护计数、数据及其相关不变量。Piperead
(kernel/pipe.c:103)然后也尝试获取锁,但无法实现。它在acquire
(kernel/spinlock.c:22)中旋转等待锁。当piperead
等待时,pipewrite
遍历被写入的字节(addr[0..n-1]
),依次将每个字节添加到管道中(kernel/pipe.c:95)。在这个循环中缓冲区可能会被填满(kernel/pipe.c:85)。在这种情况下,pipewrite
调用wakeup
来提醒所有处于睡眠状态的读进程缓冲区中有数据等待,然后在&pi->nwrite
上睡眠,等待读进程从缓冲区中取出一些字节。作为使pipewrite
进程进入睡眠状态的一部分,Sleep
释放pi->lock
。
int
piperead(struct pipe *pi, uint64 addr, int n)
{
int i;
struct proc *pr = myproc();
char ch;
acquire(&pi->lock);
while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty
if(pr->killed){
release(&pi->lock);
return -1;
}
sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread++ % PIPESIZE];
if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
break;
}
wakeup(&pi->nwrite); //DOC: piperead-wakeup
release(&pi->lock);
return i;
}
现在pi->lock
可用,piperead
设法获取它并进入其临界区域:它发现pi->nread != pi->nwrite
(kernel/pipe.c:110)(pipewrite
进入睡眠状态是因为pi->nwrite == pi->nread+PIPESIZE
(kernel/pipe.c:85)),因此它进入for
循环,从管道中复制数据(kernel/pipe.c:117),并根据复制的字节数增加nread
。那些读出的字节就可供写入,因此piperead
调用wakeup
(kernel/pipe.c:124)返回之前唤醒所有休眠的写进程。Wakeup
寻找一个在&pi->nwrite
上休眠的进程,该进程正在运行pipewrite
,但在缓冲区填满时停止。它将该进程标记为RUNNABLE
。
管道代码为读者和写者使用单独的睡眠通道(pi->nread
和pi->nwrite
);这可能会使系统在有许多读者和写者等待同一管道这种不太可能的情况下更加高效。管道代码在检查休眠条件的循环中休眠;如果有多个读者或写者,那么除了第一个醒来的进程之外,所有进程都会看到条件仍然错误,并再次睡眠。
代码:wait, exit和kill
Sleep
和wakeup
可用于多种等待。第一章介绍的一个有趣的例子是子进程exit
和父进程wait
之间的交互。在子进程死亡时,父进程可能已经在wait
中休眠,或者正在做其他事情;在后一种情况下,随后的wait
调用必须观察到子进程的死亡,可能是在子进程调用exit
后很久。xv6记录子进程终止直到wait
观察到它的方式是让exit
将调用方置于ZOMBIE
状态,在那里它一直保持到父进程的wait
注意到它,将子进程的状态更改为UNUSED
,复制子进程的exit
状态码,并将子进程ID返回给父进程。如果父进程在子进程之前退出,则父进程将子进程交给init
进程,init
进程将永久调用wait
;因此,每个子进程退出后都有一个父进程进行清理。主要的实现挑战是父级和子级wait
和exit
,以及exit
和exit
之间可能存在竞争和死锁。
// Wait for a child process to exit and return its pid.
// Return -1 if this process has no children.
int
wait(uint64 addr)
{
struct proc *np;
int havekids, pid;
struct proc *p = myproc();
// hold p->lock for the whole time to avoid lost
// wakeups from a child's exit().
acquire(&p->lock);
for(;;){
// Scan through table looking for exited children.
havekids = 0;
for(np = proc; np < &proc[NPROC]; np++){
// this code uses np->parent without holding np->lock.
// acquiring the lock first would cause a deadlock,
// since np might be an ancestor, and we already hold p->lock.
if(np->parent == p){
// np->parent can't change between the check and the acquire()
// because only the parent changes it, and we're the parent.
acquire(&np->lock);
havekids = 1;
if(np->state == ZOMBIE){
// Found one.
pid = np->pid;
if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,
sizeof(np->xstate)) < 0) {
release(&np->lock);
release(&p->lock);
return -1;
}
freeproc(np);
release(&np->lock);
release(&p->lock);
return pid;
}
release(&np->lock);
}
}
// No point waiting if we don't have any children.
if(!havekids || p->killed){
release(&p->lock);
return -1;
}
// Wait for a child to exit.
sleep(p, &p->lock); //DOC: wait-sleep
}
}
Wait
使用调用进程的p->lock
作为条件锁,以避免丢失唤醒,并在开始时获取该锁(kernel/proc.c:398)。然后它扫描进程表。如果它发现一个子进程处于ZOMBIE
状态,它将释放该子进程的资源及其proc
结构体,将该子进程的退出状态码复制到提供给wait
的地址(如果不是0),并返回该子进程的进程ID。如果wait
找到子进程但没有子进程退出,它将调用sleep
以等待其中一个退出(kernel/proc.c:445),然后再次扫描。
这里,sleep
中释放的条件锁是等待进程的p->lock
,这是上面提到的特例。注意,wait
通常持有两个锁:它在试图获得任何子进程的锁之前先获得自己的锁;因此,整个xv6都必须遵守相同的锁定顺序(父级,然后是子级),以避免死锁。
Wait
查看每个进程的np->parent
以查找其子进程。它使用np->parent
而不持有np->lock
,这违反了通常的规则,即共享变量必须受到锁的保护。np
可能是当前进程的祖先,在这种情况下,获取np->lock
可能会导致死锁,因为这将违反上述顺序。这种情况下无锁检查np->parent
似乎是安全的:进程的parent
字段仅由其父进程更改,因此如果np->parent==p
为true
,除非当前流程更改它,否则该值无法被更改。
// Exit the current process. Does not return.
// An exited process remains in the zombie state
// until its parent calls wait().
void
exit(int status)
{
struct proc *p = myproc();
if(p == initproc)
panic("init exiting");
// Close all open files.
for(int fd = 0; fd < NOFILE; fd++){
if(p->ofile[fd]){
struct file *f = p->ofile[fd];
fileclose(f);
p->ofile[fd] = 0;
}
}
begin_op();
iput(p->cwd);
end_op();
p->cwd = 0;
// we might re-parent a child to init. we can't be precise about
// waking up init, since we can't acquire its lock once we've
// acquired any other proc lock. so wake up init whether that's
// necessary or not. init may miss this wakeup, but that seems
// harmless.
acquire(&initproc->lock);
wakeup1(initproc);
release(&initproc->lock);
// grab a copy of p->parent, to ensure that we unlock the same
// parent we locked. in case our parent gives us away to init while
// we're waiting for the parent lock. we may then race with an
// exiting parent, but the result will be a harmless spurious wakeup
// to a dead or wrong process; proc structs are never re-allocated
// as anything else.
acquire(&p->lock);
struct proc *original_parent = p->parent;
release(&p->lock);
// we need the parent's lock in order to wake it up from wait().
// the parent-then-child rule says we have to lock it first.
acquire(&original_parent->lock);
acquire(&p->lock);
// Give any children to init.
reparent(p);
// Parent might be sleeping in wait().
wakeup1(original_parent);
p->xstate = status;
p->state = ZOMBIE;
release(&original_parent->lock);
// Jump into the scheduler, never to return.
sched();
panic("zombie exit");
}
Exit
(kernel/proc.c:333)记录退出状态码,释放一些资源,将所有子进程提供给init
进程,在父进程处于等待状态时唤醒父进程,将调用方标记为僵尸进程(zombie),并永久地让出CPU。最后的顺序有点棘手。退出进程必须在将其状态设置为ZOMBIE
并唤醒父进程时持有其父进程的锁,因为父进程的锁是防止在wait
中丢失唤醒的条件锁。子级还必须持有自己的p->lock
,否则父级可能会看到它处于ZOMBIE
状态,并在它仍运行时释放它。锁获取顺序对于避免死锁很重要:因为wait
先获取父锁再获取子锁,所以exit
必须使用相同的顺序。
// Wake up p if it is sleeping in wait(); used by exit().
// Caller must hold p->lock.
static void
wakeup1(struct proc *p)
{
if(!holding(&p->lock))
panic("wakeup1");
if(p->chan == p && p->state == SLEEPING) {
p->state = RUNNABLE;
}
}
Exit
调用一个专门的唤醒函数wakeup1
,该函数仅唤醒父进程,且父进程必须正在wait
中休眠(kernel/proc.c:598)。在将自身状态设置为ZOMBIE
之前,子进程唤醒父进程可能看起来不正确,但这是安全的:虽然wakeup1
可能会导致父进程运行,但wait
中的循环在scheduler
释放子进程的p->lock
之前无法检查子进程,所以wait
在exit
将其状态设置为ZOMBIE
(kernel/proc.c:386)之前不能查看退出进程。
// Kill the process with the given pid.
// The victim won't exit until it tries to return
// to user space (see usertrap() in trap.c).
int
kill(int pid)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++){
acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){
// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}
exit
允许进程自行终止,而kill
(kernel/proc.c:611)允许一个进程请求另一个进程终止。对于kill
来说,直接销毁受害者进程(即要杀死的进程)太复杂了,因为受害者可能在另一个CPU上执行,也许是在更新内核数据结构的敏感序列中间。因此,kill
的工作量很小:它只是设置受害者的p->killed
,如果它正在睡眠,则唤醒它。受害者进程终将进入或离开内核,此时,如果设置了p->killed
,usertrap
中的代码将调用exit
。如果受害者在用户空间中运行,它将很快通过进行系统调用或由于计时器(或其他设备)中断而进入内核。
如果受害者进程在sleep
中,kill
对wakeup
的调用将导致受害者从sleep
中返回。这存在潜在的危险,因为等待的条件可能不为真。但是,xv6对sleep
的调用总是封装在while
循环中,该循环在sleep
返回后重新测试条件。一些对sleep
的调用还在循环中测试p->killed
,如果它被设置,则放弃当前活动。只有在这种放弃是正确的情况下才能这样做。例如,如果设置了killed
标志,则管道读写代码返回;最终代码将返回到陷阱,陷阱将再次检查标志并退出。
一些XV6的sleep
循环不检查p->killed
,因为代码在应该是原子操作的多步系统调用的中间。virtio驱动程序(kernel/virtio_disk.c:242)就是一个例子:它不检查p->killed
,因为一个磁盘操作可能是文件系统保持正确状态所需的一组写入操作之一。等待磁盘I/O时被杀死的进程将不会退出,直到它完成当前系统调用并且usertrap
看到killed
标志
真实世界
xv6调度器实现了一个简单的调度策略:它依次运行每个进程。这一策略被称为轮询调度(round robin)。真实的操作系统实施更复杂的策略,例如,允许进程具有优先级。其思想是调度器将优先选择可运行的高优先级进程,而不是可运行的低优先级进程。这些策略可能变得很复杂,因为常常存在相互竞争的目标:例如,操作系统可能希望保证公平性和高吞吐量。此外,复杂的策略可能会导致意外的交互,例如优先级反转(priority inversion)和航队(convoys)。当低优先级进程和高优先级进程共享一个锁时,可能会发生优先级反转,当低优先级进程持有该锁时,可能会阻止高优先级进程前进。当许多高优先级进程正在等待一个获得共享锁的低优先级进程时,可能会形成一个长的等待进程航队;一旦航队形成,它可以持续很长时间。为了避免此类问题,在复杂的调度器中需要额外的机制。
睡眠和唤醒是一种简单有效的同步方法,但还有很多其他方法。所有这些问题中的第一个挑战是避免我们在本章开头看到的“丢失唤醒”问题。原始Unix内核的sleep
只是禁用了中断,这就足够了,因为Unix运行在单CPU系统上。因为xv6在多处理器上运行,所以它为sleep
添加了一个显式锁。FreeBSD的msleep
采用了同样的方法。Plan 9的sleep
使用一个回调函数,该函数在马上睡眠时获取调度锁,并在运行中持有;该函数用于在最后时刻检查睡眠条件,以避免丢失唤醒。Linux内核的sleep
使用一个显式的进程队列,称为等待队列,而不是等待通道;队列有自己内部的锁。
在wakeup
中扫描整个进程列表以查找具有匹配chan
的进程效率低下。一个更好的解决方案是用一个数据结构替换sleep
和wakeup
中的chan
,该数据结构包含在该结构上休眠的进程列表,例如Linux的等待队列。Plan 9的sleep
和wakeup
将该结构称为集结点(rendezvous point)或Rendez。许多线程库引用与条件变量相同的结构;在这种情况下,sleep
和wakeup
操作称为wait
和signal
。所有这些机制都有一个共同的特点:睡眠条件受到某种在睡眠过程中原子级释放的锁的保护。
wakeup
的实现会唤醒在特定通道上等待的所有进程,可能有许多进程在等待该特定通道。操作系统将安排所有这些进程,它们将竞相检查睡眠条件。进程的这种行为有时被称为惊群效应(thundering herd),最好避免。大多数条件变量都有两个用于唤醒的原语:signal
用于唤醒一个进程;broadcast
用于唤醒所有等待进程。
信号量(Semaphores)通常用于同步。计数count通常对应于管道缓冲区中可用的字节数或进程具有的僵尸子进程数。使用显式计数作为抽象的一部分可以避免“丢失唤醒”问题:使用显式计数记录已经发生wakeup
的次数。计数还避免了虚假唤醒和惊群效应问题。
终止进程并清理它们在xv6中引入了很多复杂性。在大多数操作系统中甚至更复杂,因为,例如,受害者进程可能在内核深处休眠,而展开其栈空间需要非常仔细的编程。许多操作系统使用显式异常处理机制(如longjmp
)来展开栈。此外,还有其他事件可能导致睡眠进程被唤醒,即使它等待的事件尚未发生。例如,当一个Unix进程处于休眠状态时,另一个进程可能会向它发送一个signal
。在这种情况下,进程将从中断的系统调用返回,返回值为-1,错误代码设置为EINTR
。应用程序可以检查这些值并决定执行什么操作。Xv6不支持信号,因此不会出现这种复杂性。
Xv6对kill
的支持并不完全令人满意:有一些sleep
循环可能应该检查p->killed
。一个相关的问题是,即使对于检查p->killed
的sleep
循环,sleep
和kill
之间也存在竞争;后者可能会设置p->killed
,并试图在受害者的循环检查p->killed
之后但在调用sleep
之前尝试唤醒受害者。如果出现此问题,受害者将不会注意到p->killed
,直到其等待的条件发生。这可能比正常情况要晚一点(例如,当virtio驱动程序返回受害者正在等待的磁盘块时)或永远不会发生(例如,如果受害者正在等待来自控制台的输入,但用户没有键入任何输入)。
注:上节中说到kill的工作方式,
kill
设置p->killed
,如果遇到进程正在休眠,则会唤醒它,此后在usertrap
中检测p->killed
,并使进程退出而如果像上面说的,在检查
p->killed
之后调用sleep
之前唤醒受害者进程,那么接下来执行sleep
就会导致进程无法进入内核,无法在usertrap
中退出,而必须等待所需事件的发生再次唤醒
一个实际的操作系统将在固定时间内使用空闲列表找到自由的proc
结构体,而不是allocproc
中的线性时间搜索;xv6使用线性扫描是为了简单起见。
练习
Sleep
必须检查lk != &p->lock
来避免死锁(kernel/proc.c:558-561). 假设通过将
if(lk != &p->lock) {
acquire(&p->lock);
release(lk);
}
替换为
release(lk);
acquire(&p->lock);
来消除特殊情况,这样做将会破坏sleep
。是如何破坏的呢?
- 大多数进程清理可以通过
exit
或wait
来完成。事实证明,必须是exit
作为关闭打开的文件的那个。为什么?答案涉及管道。 - 在xv6中实现信号量而不使用
sleep
和wakeup
(但可以使用自旋锁)。用信号量取代xv6中sleep
和wakeup
的使用。判断结果。 - 修复上面提到的
kill
和sleep
之间的竞争,这样在受害者的sleep
循环检查p->killed
之后但在调用sleep
之前发生的kill
会导致受害者放弃当前系统调用。 - 设计一个计划,使每个睡眠循环检查
p->killed
,这样,例如,virtio驱动程序中的一个进程可以在被另一个进程终止时从while循环快速返回。 - 修改xv6,使其在从一个进程的内核线程切换到另一个线程时仅使用一次上下文切换,而不是通过调度器线程进行切换。屈服(yield)线程需要选择下一个线程本身并调用
swtch
。挑战在于:防止多个内核意外执行同一个线程;获得正确的锁;避免死锁。 - 修改xv6的调度程序,以便在没有进程可运行时使用RISC-V的
WFI
(wait for interrupt,等待中断)指令。尽量确保在任何时候有可运行的进程等待运行时,没有核心在WFI
中暂停。 - 锁
p->lock
保护许多不变量,当查看受p->lock
保护的特定xv6代码段时,可能很难确定保护的是哪个不变量。通过将p->lock
拆分为多个锁,设计一个更清晰的计划。