#Lab3A - KVRaft without log compaction
- I. Source
- II. My Code
- III. Motivation
- IV. Solution
- S1 - client请求
- S2 - server回应
- V. Result
I. Source
- MIT-6.824 2020 课程官网
- Lab3: KVRaft 实验主页
- simviso 精品付费翻译 MIT 6.824 课程
- Paper - Raft extended version
II. My Code
- source code 的 Gitee 地址
- Lab3A: KVRaft without log compaction的 Gitee 地址
课程官网提供的 Lab 代码下载地址,我没有访问成功,于是我从 Github 其他用户那里 clone 到干净的源码,有需要可以访问我的 Gitee 获取
III. Motivation
KV Service 所要完成的任务,即是实现一个简易的 kv 存储数据库,满足用户的增删改查需求。比如,client 向 service 发起 Put(x, 1) 请求,希望将数据库中的变量 x 更新为 1,以便下一次的 Get(x) 请求时能获取最新的数据
我们都知道,这样的需求在单机情况下的数据库是可以完成的,在这里为什么还要用到分布式存储的思想呢?这是因为,我们希望服务器是能够有更好的容错性,我们不太愿意看到鸡蛋都放在一个篮子里的情况,如果那个篮子出了意外,那将是毁灭性的灾难
我们提出将同样的数据存储在多台机器上的设想,这样更保险,即使 primary service 宕机了,也不影响对外提供稳定的服务,因为会有 secondary service 顶替 primary 的位置,更多的原理请移步 Lab2B: Log Replication
这就是分布式数据库所要解决的主要问题
IV. Solution
先看一下 KV Service 的工作流程,大致如下,
每台 KV Service 的上层是一张 kv 表,也就是数据库;下层是 Raft 节点,用来同步日志。需要注意的是,client 发来的请求,无论是 Get 还是 Put/Append,在 Raft 节点看来,都是一个日志条目(Log Entry),这些请求只是 kv 层的术语
图中已给出例子,client 向集群中的 primary service 发起 Get 请求,service 收到请求后不是立刻就更新 kv 表,而是先将请求转换为日志条目,进而追加至日志向量中,对应图中步骤 2 的 appendEntry。之后就进入了 Raft 层,即向集群中同步日志,对应步骤 3 的 sync
待 Raft 层的 leader 收到了过半 followers 的确切回复之后,通过图中 4 的管道 applyCh 告知 kv 层日志同步成功的结果。kv 层在知道已成功同步条目之后,就放心大胆地更新数据库了
更新完了,就可以回复 client 了。在具体实现中,我是通过图中 5 的管道 resultCh 实现的,通过 Channel 手段来实现同步,即是 service 在更新完数据库之后,会向 resultCh 中写入一个日志条目。Get 或 Put/Append 此类的 RPC Handler 协程若发现 resultCh 已有写入,就会对 client 展开回复工作
S1 - client请求
传统意义上,是 client 请求 service,很简单的逻辑。但是 Lab3: KVRaft 却多此一举,偏要整出一个 Clerk 办事员的角色。个人认为,属于脱裤子放屁,多此一举
可以将 Clerk 当成 Client 看待,每个 Client 都会生成一个 Clerk,由这个 Clerk 全权办理请求事宜,定义如下,
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
leaderId int /* Raft 集群中谁是 leader */
clntId int64 /* client 的编号 */
cmdId int /* 该 client 的第几条命令 */
}
其中的 leaderId
记录了集群 leader 的编号,以便 clerk 下次能够快速找到 primary service;clntId
是 client 的编号,理论上来说应该是唯一的;cmdId
是该 client 的第几条命令。之后在 kvraft/client.go
中完善 MakeClerk()
,
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.leaderId = 0
ck.clntId = nrand()
ck.cmdId = 0
return ck
}
其中,第 6 行的 client 编号赋值用的是 6.824-golabs-2020 自带的 nrand()
,功能是随机生成一个很大的数,从概率上来讲这些数字很少会发生重复的情况,进而达到所要求的 client 编号 “唯一” 的效果。这只是退而求其次的方法,更好的做法应该是通过一个全局的变量,递增地去分配唯一的 id,或者类似的方法
之后,就是 kvraft/client.go:Get()
了,它就是所谓的 Get 请求,即要求 service 回复 key 键所对应的 value 值,
func (ck *Clerk) Get(key string) string {
args := GetArgs{
Key: key,
CmdId: ck.cmdId,
ClntId: ck.clntId,
}
ck.cmdId++ /* 命令编号永远递增 */
leaderId := ck.leaderId
// You will have to modify this function.
for {
reply := GetReply{}
DPrintf("[%v->%v]: key: %v, cmdId: %v in Clerk's Get", ck.clntId, leaderId, key, ck.cmdId)
ok := ck.servers[leaderId].Call("KVServer.Get", &args, &reply)
if !ok {
DPrintf("%v's %v: req server not ok, timeout in Clerk's Get", ck.clntId, ck.cmdId)
leaderId = (leaderId + 1) % len(ck.servers)
continue
}
switch reply.Err {
case OK:
DPrintf("get k: %v, v: %v in Clerk's GET", key, reply.Value)
ck.leaderId = leaderId
return reply.Value
case ErrNoKey:
DPrintf("get err no key: %v in Clerk's GET", key)
ck.leaderId = leaderId
return "" /* 查无此条记录,返回空字符串 */
case ErrWrongLeader:
DPrintf("get wrong leader in Clerk's GET")
leaderId = (leaderId + 1) % len(ck.servers)
}
}
}
其中,第 7 行的命令编号在请求之后应该是要递增的,这样才能标记该条命令是独一无二的,且让 service 感知到该条命令是否有被执行过,具体在 S2 - server 回应 展开讲解
然后,就是无尽的循环,直到 service 回复之后方才退出,回复的结果有三种,其一是成功了,对应着 OK;其二是查无此键,对应着 ErrNoKey;最后一种是 ErrWrongLeader,这说明 clerk 找错人啦,需要换一家门敲敲。前两种情况是可以顺利回复 client 的,因为不管找没找到,这都算是结果;而最后一种情况表明 clerk 还需继续努力
换一家门敲敲,体现在代码中就是,
leaderId = (leaderId + 1) % len(ck.servers)
即集群中该节点的下一个节点。记得要在回复 client 时更新 leaderId
,以便 client 下一次能够快速找到集群的 primary service
如果向指明的 leader 发送 RPC 不成功,那么就换一个节点再次发送,
if !ok {
DPrintf("%v's %v: req server not ok, timeout in Clerk's Get", ck.clntId, ck.cmdId)
leaderId = (leaderId + 1) % len(ck.servers)
continue
}
为什么要换一个节点再次发送呢?而不是单纯的重试呢?即依旧向指明的 leader 发送 RPC。因为指明的 leader 可能掉线了,所谓的 leader 是上一次请求时响应我的 leader,时间久了,集群中的角色可能发生了变化,这是再正常不过的事
我们不能只认死理吧,不能只盯着上次指定的 leader。如果这家门敲不开,那就换一家。同样 PutAppend()
也是一个道理,
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
ClntId: ck.clntId,
CmdId: ck.cmdId,
}
ck.cmdId++ /* 命令编号永远递增 */
leaderId := ck.leaderId
for {
reply := PutAppendReply{}
DPrintf("[%v->%v]: key: %v, value: %v, Op: %v, cmdId: %v in Clerk's PutAppend", ck.clntId, leaderId, key, value, op, ck.cmdId)
ok := ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply)
if !ok {
DPrintf("%v's %v: req server not ok, timeout in Clerk's PutAppend", ck.clntId, ck.cmdId)
leaderId = (leaderId + 1) % len(ck.servers)
continue
}
switch reply.Err {
case OK:
DPrintf("put append key: %v, value: %v ok, leader: %v in Clerk's PutAppend", key, value, leaderId)
ck.leaderId = leaderId
return
case ErrWrongLeader:
DPrintf("put append wrong leader: %v in Clerk's PutAppend", leaderId)
leaderId = (leaderId + 1) % len(ck.servers)
}
}
}
我就不再赘述了,可以对比 Get()
请求,悟出其中的道理
S2 - server回应
首先,就是定义操作集 Op,
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ClntId int64
CmdId int
Key string
Value string
Kind string
}
ClntId
让 server 知道这条命令由哪个 client 发来的,cmdId
标记命令的标号,然后就是键值和类型。其后的 KVServer 结构体非常重要,
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
db map[string]string
ack map[int64]int /* 第 int64 位 client 已经执行到第 int 条命令了 */
results map[int]chan Op /* KV 层与 client 的接口 */
}
db
就是 IV. Solution 中提及的 kv 表,ack
类似于 TCP 三次握手中的确认机制,大意就是检查发来的请求是否已过期,如果命令是最新发来的,那么就去状态机执行;反正,则拒绝
这里有个疑问,发来的请求为什么会过期呢?因为网络情况太不稳定了,举个例子,client A 向 service 发送 cmd1 请求,但是由于网络阻塞(具体可能因为报文分组转发出现问题)的情况,导致 cmd1 根本没有到达 service,client A 苦等好一阵子之后也没有收到回复。于是,它又将 cmd1 中同样的请求封装成 cmd2,再次发送给 service,这次运气比较好,service 收到请求后并成功回复。此时,恰巧网络中的阻塞情况得到了改善,原本堵车的 cmd1请求因分组转发的线路畅通了,到达了 service。但是 service 一瞧 cmd1 的请求版本号低于刚才回应过的 cmd2,认为 cmd1 是过期的请求,不予理会
其中,判断 cmd1 版本号低于 cmd2 的手段就是通过 ack
来实现的。我们先从 client 的 Get 请求出发,按照流程进行分析,
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
entry := Op{
ClntId: args.ClntId,
CmdId: args.CmdId,
Key: args.Key,
Kind: "Get",
}
ok := kv.appendEntry2Log(entry)
if !ok {
reply.Err = ErrWrongLeader
DPrintf("wrong leader in serv %v in Get", kv.me)
return
}
kv.mu.Lock()
v, exist := kv.db[args.Key]
kv.mu.Unlock()
if !exist {
reply.Err = ErrNoKey
return
}
reply.Err = OK
reply.Value = v
}
此类的 RPC 方法也叫 RPC Handler,它是在有 Client 请求时才会被调用,也是个协程。之前提到过,无论是 Get 还是 Put/Append 请求,传到 Raft 层统一当成日志条目来处理,这里我将其封装成私有函数 appendEntry2Log
,定义如下,
func (kv *KVServer) appendEntry2Log(entry Op) bool {
idx, _, isLeader := kv.rf.Start(entry)
if !isLeader {
DPrintf("serv %v not leader, can't appendEntry2Log in appendEntry2Log", kv.me)
return false
}
kv.mu.Lock()
ch, ok := kv.results[idx] /* idx 是线性递增的,跟 clntId 没有关系 */
if !ok {
ch = make(chan Op, 1)
kv.results[idx] = ch
}
kv.mu.Unlock()
/* 等待 Raft 集群同步该条命令 */
select {
case op := <-ch:
return entry == op
case <-time.After(time.Millisecond * ReplyTimeOut):
return false
}
}
它会首先确定一下该节点是否为 leader,如果不是 leader,那就另当别论了。然后,就进入了等待环节,等待 Raft 集群同步该条目。集群同步好之后会通知该协程的,具体是通过 Channel 机制,主协程向刚分配的 kv.results[idx]
中写入数据,告知已同步该条目且已成功应用状态机,可回复 client
其中,定义的 ReplyTimeOut 为 1000,这主要是为了防止超时情况的发生。Put/Append 请求和这个类似,
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
op := Op{
ClntId: args.ClntId,
CmdId: args.CmdId,
Key: args.Key,
Value: args.Value,
Kind: args.Op,
}
ok := kv.appendEntry2Log(op)
if !ok {
reply.Err = ErrWrongLeader
} else {
reply.Err = OK
}
}
另外,对每一次的 map 操作,无论读写都要加锁,不然会出现 fatal error: concurrent map read and map write
主协程的工作,其实也不多,就是等待日志条目同步,完了在核验一下请求是否过期。如果是最新的请求,那么就应用到状态机中,更新 kv 表,
func (kv *KVServer) loop() {
for !kv.killed() {
msg := <-kv.applyCh /* Raft 集群已同步 */
op := msg.Command.(Op) /* 将 Command 空接口部分强制转换为 Op*/
idx := msg.CommandIndex /* 这是第几条命令 */
kv.mu.Lock()
/* 准备将该命令应用到状态机 */
if kv.isUp2Date(op.ClntId, op.CmdId) { /* 不执行过期的命令 */
kv.updateDB(op)
kv.ack[op.ClntId] = op.CmdId /* ack 跟踪最新的命令编号 */
}
/* 回应 client,即继续 Get 或 PutAppend 当中的流程 */
ch, ok := kv.results[idx]
if ok { /* RPC Handler 已经准备好读取已同步的命令了 */
select {
case <-kv.results[idx]:
default:
}
ch <- op
}
kv.mu.Unlock()
}
}
更新完了,就发消息告诉 RPC Handler 可以回复 client 了。其中,检验请求是否过期通过 isUp2Date()
实现的,
func (kv *KVServer) isUp2Date(clntId int64, cmdId int) bool {
oldCmdId, ok := kv.ack[clntId]
if ok {
return oldCmdId < cmdId
}
return true
}
比较一下版本号,旧的就不执行它。更新 kv 表是通过 updateDB()
完成的,
func (kv *KVServer) updateDB(args Op) {
switch args.Kind {
case "Put":
kv.db[args.Key] = args.Value
case "Append":
kv.db[args.Key] += args.Value
}
}
这里只需要考虑 Put 和 Append 两种情况即可,因为 Get 并不更改状态。另外,别忘了在更新完状态机之后,维护一下 ack
表
最后,在 kvraft/server.go:StartKVServer()
中初始化三个重要的数据结构,以及启动主协程,
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
...
// You may need initialization code here.
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
kv.results = make(map[int]chan Op)
go kv.loop()
return kv
}
至此,梳理完 KVRaft without log compaction 的整套流程
V. Result
golang 比较麻烦,它有 GOPATH 模式,也有 GOMODULE 模式,6.824-golabs-2020 采用的是 GOPATH,所以在运行之前,需要将 golang 默认的 GOMODULE 关掉,
$ export GO111MODULE="off"
随后,就可以进入 src/kvraft
中开始运行测试程序,
$ go test -run 3A
仅此一次的测试远远不够,可以通过 shell 循环,让测试跑个两百次就差不多了
$ for i in {1..200}; go test -run 3A
这样,如果还没错误,那应该是真的通过了。分布式的很多 bug 需要通过反复模拟才能复现出来的,它不像单线程程序那样,永远是幂等的情况。也可以用我写的脚本 test_3a.py,
import os
ntests = 200
nfails = 0
noks = 0
if __name__ == "__main__":
for i in range(ntests):
print("*************ROUND " + str(i+1) + "/" + str(ntests) + "*************")
filename = "out" + str(i+1)
os.system("go test -run 3A | tee " + filename)
with open(filename) as f:
if 'FAIL' in f.read():
nfails += 1
print("✖️fails, " + str(nfails) + "/" + str(ntests))
continue
else:
noks += 1
print("✔️ok, " + str(noks) + "/" + str(ntests))
os.system("rm " + filename)
我已经跑过两百次,无一 FAIL