MIT 6.S081 Lab Eight -- Lock
- 引言
- locks
- Memory allocator(moderate)
- 代码解析
- Buffer cache(hard)
- 代码解析
- 可选的挑战练习
引言
本文为 MIT 6.S081 2020 操作系统 实验八解析。
MIT 6.S081课程前置基础参考: 基于RISC-V搭建操作系统系列
locks
在本实验中,您将获得重新设计代码以提高并行性的经验。多核机器上并行性差的一个常见症状是频繁的锁争用。提高并行性通常涉及更改数据结构和锁定策略以减少争用。您将对xv6内存分配器和块缓存执行此操作。
Attention
在编写代码之前,请确保阅读xv6手册中的以下部分:
- 第6章:《锁》和相应的代码。
- 第3.5节:《代码:物理内存分配》
- 第8.1节至第8.3节:《概述》、《Buffer cache层》和《代码:Buffer cache》
要开始本实验,请将代码切换到lock分支
$ git fetch
$ git checkout lock
$ make clean
Memory allocator(moderate)
程序user/kalloctest.c强调了xv6的内存分配器:三个进程增长和缩小地址空间,导致对kalloc
和kfree
的多次调用。kalloc
和kfree
获得kmem.lock
。
kalloctest
打印(作为“#fetch-and-add”)在acquire
中由于尝试获取另一个内核已经持有的锁而进行的循环迭代次数,如kmem
锁和一些其他锁。acquire
中的循环迭代次数是锁争用的粗略度量。完成实验前,kalloctest
的输出与此类似:
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL
acquire
为每个锁维护要获取该锁的acquire
调用计数,以及acquire
中循环尝试但未能设置锁的次数。kalloctest
调用一个系统调用,使内核打印kmem
和bcache
锁(这是本实验的重点)以及5个最有具竞争的锁的计数。如果存在锁争用,则acquire
循环迭代的次数将很大。系统调用返回kmem
和bcache
锁的循环迭代次数之和。
对于本实验,您必须使用具有多个内核的专用空载机器。如果你使用一台正在做其他事情的机器,kalloctest
打印的计数将毫无意义。你可以使用专用的Athena 工作站或你自己的笔记本电脑,但不要使用拨号机。
kalloctest
中锁争用的根本原因是kalloc()
有一个空闲列表,由一个锁保护:
要消除锁争用,您必须重新设计内存分配器,以避免使用单个锁和列表。基本思想是为每个CPU维护一个空闲列表,每个列表都有自己的锁。因为每个CPU将在不同的列表上运行,不同CPU上的分配和释放可以并行运行。主要的挑战将是处理一个CPU的空闲列表为空,而另一个CPU的列表有空闲内存的情况;在这种情况下,一个CPU必须“窃取”另一个CPU空闲列表的一部分。窃取可能会引入锁争用,但这种情况希望不会经常发生。
YOUR JOB
- 您的工作是实现每个CPU的空闲列表,并在CPU的空闲列表为空时进行窃取。所有锁的命名必须以“
kmem
”开头。也就是说,您应该为每个锁调用initlock
,并传递一个以“kmem
”开头的名称。运行kalloctest
以查看您的实现是否减少了锁争用。要检查它是否仍然可以分配所有内存,请运行usertests sbrkmuch
。您的输出将与下面所示的类似,在kmem
锁上的争用总数将大大减少,尽管具体的数字会有所不同。确保usertests
中的所有测试都通过。评分应该表明考试通过。
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 42843
lock: kmem: #fetch-and-add 0 #acquire() 198674
lock: kmem: #fetch-and-add 0 #acquire() 191534
lock: bcache: #fetch-and-add 0 #acquire() 1242
--- top 5 contended locks:
lock: proc: #fetch-and-add 43861 #acquire() 117281
lock: virtio_disk: #fetch-and-add 5347 #acquire() 114
lock: proc: #fetch-and-add 4856 #acquire() 117312
lock: proc: #fetch-and-add 4168 #acquire() 117316
lock: proc: #fetch-and-add 2797 #acquire() 117266
tot= 0
test1 OK
start test2
total free number of pages: 32499 (out of 32768)
.....
test2 OK
$ usertests sbrkmuch
usertests starting
test sbrkmuch: OK
ALL TESTS PASSED
$ usertests
...
ALL TESTS PASSED
$
提示:
- 您可以使用kernel/param.h中的常量
NCPU
- 让
freerange
将所有可用内存分配给运行freerange
的CPU。 - 函数
cpuid
返回当前的核心编号,但只有在中断关闭时调用它并使用其结果才是安全的。您应该使用push_off()
和pop_off()
来关闭和打开中断。 - 看看kernel/sprintf.c中的
snprintf
函数,了解字符串如何进行格式化。尽管可以将所有锁命名为“kmem
”。
代码解析
本实验完成的任务是为每个CPU都维护一个空闲列表,初始时将所有的空闲内存分配到某个CPU,此后各个CPU需要内存时,如果当前CPU的空闲列表上没有,则窃取其他CPU的。例如,所有的空闲内存初始分配到CPU0,当CPU1需要内存时就会窃取CPU0的,而使用完成后就挂在CPU1的空闲列表,此后CPU1再次需要内存时就可以从自己的空闲列表中取。
(1). 将kmem
定义为一个数组,包含NCPU
个元素,即每个CPU对应一个
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];
(2). 修改kinit
,为所有锁初始化以“kmem”开头的名称,该函数只会被一个CPU调用,freerange
调用kfree
将所有空闲内存挂在该CPU的空闲列表上
void
kinit()
{
char lockname[8];
for(int i = 0;i < NCPU; i++) {
snprintf(lockname, sizeof(lockname), "kmem_%d", i);
initlock(&kmem[i].lock, lockname);
}
freerange(end, (void*)PHYSTOP);
}
(3). 修改kfree
,使用cpuid()
和它返回的结果时必须关中断,否则如果当前任务因为时间片到期切换到其他CPU上运行,那么先前获取的cpuId就不正确了
// Free the page of physical memory pointed at by v,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off(); // 关中断
int id = cpuid();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
pop_off(); //开中断
}
(4). 修改kalloc
,使得在当前CPU的空闲列表没有可分配内存时窃取其他内存的
// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;
push_off();// 关中断
int id = cpuid();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
else {
int antid; // another id
// 遍历所有CPU的空闲列表
for(antid = 0; antid < NCPU; ++antid) {
if(antid == id)
continue;
acquire(&kmem[antid].lock);
r = kmem[antid].freelist;
if(r) {
kmem[antid].freelist = r->next;
release(&kmem[antid].lock);
break;
}
release(&kmem[antid].lock);
}
}
release(&kmem[id].lock);
pop_off(); //开中断
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
Buffer cache(hard)
这一半作业独立于前一半;不管你是否完成了前半部分,你都可以完成这半部分(并通过测试)。
如果多个进程密集地使用文件系统,它们可能会争夺bcache.lock
,它保护kernel/bio.c中的磁盘块缓存。bcachetest
创建多个进程,这些进程重复读取不同的文件,以便在bcache.lock
上生成争用;(在完成本实验之前)其输出如下所示:
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 33035
lock: bcache: #fetch-and-add 16142 #acquire() 65978
--- top 5 contended locks:
lock: virtio_disk: #fetch-and-add 162870 #acquire() 1188
lock: proc: #fetch-and-add 51936 #acquire() 73732
lock: bcache: #fetch-and-add 16142 #acquire() 65978
lock: uart: #fetch-and-add 7505 #acquire() 117
lock: proc: #fetch-and-add 6937 #acquire() 73420
tot= 16142
test0: FAIL
start test1
test1 OK
您可能会看到不同的输出,但bcache
锁的acquire
循环迭代次数将很高。如果查看kernel/bio.c中的代码,您将看到bcache.lock
保护已缓存的块缓冲区的列表、每个块缓冲区中的引用计数(b->refcnt
)以及缓存块的标识(b->dev
和b->blockno
)。
YOUR JOB
- 修改块缓存,以便在运行
bcachetest
时,bcache(buffer cache的缩写)中所有锁的acquire
循环迭代次数接近于零。- 理想情况下,块缓存中涉及的所有锁的计数总和应为零,但只要总和小于500就可以。修改
bget
和brelse
,以便bcache中不同块的并发查找和释放不太可能在锁上发生冲突(例如,不必全部等待bcache.lock
)。你必须保护每个块最多缓存一个副本的不变量。完成后,您的输出应该与下面显示的类似(尽管不完全相同)。确保usertests
仍然通过。完成后,make grade
应该通过所有测试。
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 32954
lock: kmem: #fetch-and-add 0 #acquire() 75
lock: kmem: #fetch-and-add 0 #acquire() 73
lock: bcache: #fetch-and-add 0 #acquire() 85
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4159
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2118
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4274
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4326
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6334
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6321
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6704
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6696
lock: bcache.bucket: #fetch-and-add 0 #acquire() 7757
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6199
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4136
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4136
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2123
--- top 5 contended locks:
lock: virtio_disk: #fetch-and-add 158235 #acquire() 1193
lock: proc: #fetch-and-add 117563 #acquire() 3708493
lock: proc: #fetch-and-add 65921 #acquire() 3710254
lock: proc: #fetch-and-add 44090 #acquire() 3708607
lock: proc: #fetch-and-add 43252 #acquire() 3708521
tot= 128
test0: OK
start test1
test1 OK
$ usertests
...
ALL TESTS PASSED
$
请将你所有的锁以“bcache
”开头进行命名。也就是说,您应该为每个锁调用initlock
,并传递一个以“bcache
”开头的名称。
减少块缓存中的争用比kalloc
更复杂,因为bcache缓冲区真正的在进程(以及CPU)之间共享。对于kalloc
,可以通过给每个CPU设置自己的分配器来消除大部分争用;这对块缓存不起作用。我们建议您使用每个哈希桶都有一个锁的哈希表在缓存中查找块号。
在您的解决方案中,以下是一些存在锁冲突但可以接受的情形:
- 当两个进程同时使用相同的块号时。
bcachetest test0
始终不会这样做。 - 当两个进程同时在cache中未命中时,需要找到一个未使用的块进行替换。
bcachetest test0
始终不会这样做。 - 在你用来划分块和锁的方案中某些块可能会发生冲突,当两个进程同时使用冲突的块时。例如,如果两个进程使用的块,其块号散列到哈希表中相同的槽。
bcachetest test0
可能会执行此操作,具体取决于您的设计,但您应该尝试调整方案的细节以避免冲突(例如,更改哈希表的大小)。
bcachetest
的test1
使用的块比缓冲区更多,并且执行大量文件系统代码路径。
提示:
- 请阅读xv6手册中对块缓存的描述(第8.1-8.3节)。
- 可以使用固定数量的散列桶,而不动态调整哈希表的大小。使用素数个存储桶(例如13)来降低散列冲突的可能性。
- 在哈希表中搜索缓冲区并在找不到缓冲区时为该缓冲区分配条目必须是原子的。
- 删除保存了所有缓冲区的列表(
bcache.head
等),改为标记上次使用时间的时间戳缓冲区(即使用kernel/trap.c中的ticks
)。通过此更改,brelse
不需要获取bcache锁,并且bget
可以根据时间戳选择最近使用最少的块。 - 可以在
bget
中串行化回收(即bget
中的一部分:当缓存中的查找未命中时,它选择要复用的缓冲区)。 - 在某些情况下,您的解决方案可能需要持有两个锁;例如,在回收过程中,您可能需要持有bcache锁和每个bucket(散列桶)一个锁。确保避免死锁。
- 替换块时,您可能会将
struct buf
从一个bucket移动到另一个bucket,因为新块散列到不同的bucket。您可能会遇到一个棘手的情况:新块可能会散列到与旧块相同的bucket中。在这种情况下,请确保避免死锁。 - 一些调试技巧:实现bucket锁,但将全局
bcache.lock
的acquire
/release
保留在bget
的开头/结尾,以串行化代码。一旦您确定它在没有竞争条件的情况下是正确的,请移除全局锁并处理并发性问题。您还可以运行make CPUS=1 qemu
以使用一个内核进行测试。
代码解析
这个实验的目的是将缓冲区的分配与回收并行化以提高效率。
(1). 定义哈希桶结构,并在bcache
中删除全局缓冲区链表,改为使用素数个散列桶
#define NBUCKET 13
#define HASH(id) (id % NBUCKET)
struct hashbuf {
struct buf head; // 头节点
struct spinlock lock; // 锁
};
struct {
struct buf buf[NBUF];
struct hashbuf buckets[NBUCKET]; // 散列桶
} bcache;
(2). 在binit
中:
- 初始化散列桶的锁
- 将所有散列桶的
head->prev
、head->next
都指向自身表示为空 - 将所有的缓冲区挂载到
bucket[0]
桶上,代码如下
void
binit(void) {
struct buf* b;
char lockname[16];
for(int i = 0; i < NBUCKET; ++i) {
// 初始化散列桶的自旋锁
snprintf(lockname, sizeof(lockname), "bcache_%d", i);
initlock(&bcache.buckets[i].lock, lockname);
// 初始化散列桶的头节点
bcache.buckets[i].head.prev = &bcache.buckets[i].head;
bcache.buckets[i].head.next = &bcache.buckets[i].head;
}
// Create linked list of buffers
for(b = bcache.buf; b < bcache.buf + NBUF; b++) {
// 利用头插法初始化缓冲区列表,全部放到散列桶0上
b->next = bcache.buckets[0].head.next;
b->prev = &bcache.buckets[0].head;
initsleeplock(&b->lock, "buffer");
bcache.buckets[0].head.next->prev = b;
bcache.buckets[0].head.next = b;
}
}
(3). 在buf.h中增加新字段timestamp
,这里来理解一下这个字段的用途:在原始方案中,每次brelse
都将被释放的缓冲区挂载到链表头,禀明这个缓冲区最近刚刚被使用过,在bget
中分配时从链表尾向前查找,这样符合条件的第一个就是最久未使用的。而在提示中建议使用时间戳作为LRU判定的法则,这样我们就无需在brelse
中进行头插法更改结点位置
struct buf {
...
...
uint timestamp; // 时间戳
};
(4). 更改brelse
,不再获取全局锁
void
brelse(struct buf* b) {
if(!holdingsleep(&b->lock))
panic("brelse");
int bid = HASH(b->blockno);
releasesleep(&b->lock);
acquire(&bcache.buckets[bid].lock);
b->refcnt--;
// 更新时间戳
// 由于LRU改为使用时间戳判定,不再需要头插法
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
}
(5). 更改bget
,当没有找到指定的缓冲区时进行分配,分配方式是优先从当前列表遍历,找到一个没有引用且timestamp
最小的缓冲区,如果没有就申请下一个桶的锁,并遍历该桶,找到后将该缓冲区从原来的桶移动到当前桶中,最多将所有桶都遍历完。在代码中要注意锁的释放
static struct buf*
bget(uint dev, uint blockno) {
struct buf* b;
int bid = HASH(blockno);
acquire(&bcache.buckets[bid].lock);
// Is the block already cached? -- 现在当前桶中寻找
for(b = bcache.buckets[bid].head.next; b != &bcache.buckets[bid].head; b = b->next) {
if(b->dev == dev && b->blockno == blockno) {
b->refcnt++;
// 记录使用时间戳
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
acquiresleep(&b->lock);
return b;
}
}
// Not cached. 当前buf还没有缓存
b = 0;
struct buf* tmp;
// Recycle the least recently used (LRU) unused buffer.
// 从当前散列桶开始查找
for(int i = bid, cycle = 0; cycle != NBUCKET; i = (i + 1) % NBUCKET) {
++cycle;
// 如果遍历到当前散列桶,则不重新获取锁
if(i != bid) {
if(!holding(&bcache.buckets[i].lock))
acquire(&bcache.buckets[i].lock);
else
continue;
}
for(tmp = bcache.buckets[i].head.next; tmp != &bcache.buckets[i].head; tmp = tmp->next)
// 使用时间戳进行LRU算法,而不是根据结点在链表中的位置
if(tmp->refcnt == 0 && (b == 0 || tmp->timestamp < b->timestamp))
b = tmp;
if(b) {
// 如果是从其他散列桶窃取的,则将其以头插法插入到当前桶
if(i != bid) {
b->next->prev = b->prev;
b->prev->next = b->next;
release(&bcache.buckets[i].lock);
b->next = bcache.buckets[bid].head.next;
b->prev = &bcache.buckets[bid].head;
bcache.buckets[bid].head.next->prev = b;
bcache.buckets[bid].head.next = b;
}
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
acquire(&tickslock);
b->timestamp = ticks;
release(&tickslock);
release(&bcache.buckets[bid].lock);
acquiresleep(&b->lock);
return b;
} else {
// 在当前散列桶中未找到,则直接释放锁
if(i != bid)
release(&bcache.buckets[i].lock);
}
}
panic("bget: no buffers");
}
bget中重新分配需要持有两个锁,如果桶a持有自己的锁,再申请桶b的锁,与此同时如果桶b持有自己的锁,再申请桶a的锁就会造成死锁!因此代码中使用了if(!holding(&bcache.bucket[i].lock))来进行检查。此外,代码优先从自己的桶中获取缓冲区,如果自身没有依次向后查找这样的方式也尽可能地避免了前面的情况。
在bget中搜索缓冲区并在找不到缓冲区时为该缓冲区分配条目必须是原子的!在提示中说bget如果未找到而进行分配的操作可以是串行化的,也就是说多个CPU中未找到,应当串行的执行分配,同时还应当避免死锁。
- 下面给出一种错误的处理方式:
// 前半部分查找缓冲区的代码
// Not cached
release(&bcache.buckets[bid].lock);
acquire(&bcache.lock);
acquire(&bcache.buckets[bid].lock);
// 后半部分分配缓冲区的代码
- 这段代码中先释放了散列桶的锁之后再重新获取,之所以这样做是为了让所有代码都保证申请锁的顺序:先获取整个缓冲区的大锁再获取散列桶的小锁,这样才能避免死锁。但是这样做却破坏了程序执行的原子性。
- 在release桶的锁并重新acquire的这段时间,另一个CPU可能也以相同的参数调用了bget,也发现没有该缓冲区并想要执行分配。最终的结果是一个磁盘块对应了两个缓冲区,破坏了最重要的不变量,即每个块最多缓存一个副本。这样会导致usertests中的manywrites测试报错:panic: freeing free block
(6). bpin和bunpin函数更正
void
bpin(struct buf* b) {
int bid = HASH(b->blockno);
acquire(&bcache.buckets[bid].lock);
b->refcnt++;
release(&bcache.buckets[bid].lock);
}
void
bunpin(struct buf* b) {
int bid = HASH(b->blockno);
acquire(&bcache.buckets[bid].lock);
b->refcnt--;
release(&bcache.buckets[bid].lock);
}
可选的挑战练习
在buffer cache中进行无锁查找。提示:使用gcc的__sync_*函数。您如何证明自己的实现是正确的?