Background
主要是完成一个可以根据group数量,动态调整shard所属的group的分布式kv键值引擎。其中shard->group的配置由shardctrler集群来管理,底层也是通过raft group来容错(分布式嘛)
然后这个shardkv就是要完成的是根据shardctrler管理的Config,来更新拉取和gc shard kv。
拉取Config
我们记录了lastconfig和currentconfig。因为我们的分片转移和切片gc要依赖旧的和新的config。
然后这个只允许config.Num递增,并且要求切片都是serving状态,也就是要完成了这一轮的切片转移和gc才行。保障了整个过程的完整性。
func (kv *ShardKV) applyConfiguration(nextConfig *shardctrler.Config) *CommandResponce {
if nextConfig.Num == kv.currentConfig.Num+1 {
DPrintf("Node{%v} group{%v} config apply success Num{%v} shards{%v}", kv.me, kv.gid, nextConfig.Num, nextConfig.Shards)
kv.updateShardStatus(nextConfig)
kv.lastconfig = kv.currentConfig
kv.currentConfig = *nextConfig
return &CommandResponce{"", OK}
}
DPrintf("Node{%v} group{%v} config apply fail Num{%v} {%v}", kv.me, kv.gid, nextConfig.Num, kv.currentConfig.Num+1)
return &CommandResponce{"", ErrOutDated}
}
分片拉取过程
本实验的设计就是通过一个拉取协程和一个gc协程作为背景协程来做处理。applyConfig时我们先把切片标记一下,如果一个切片的最新gid是当前gid,并且旧版本没有,那么说明我们要到别的组里面拉取,所以我们把这个切片所属的状态机状态标记为Pulling。同理,新分片不属于目前group但旧分片属于,所以我们要将其状态设为BePulling.
func (kv *ShardKV) updateShardStatus(config *shardctrler.Config) {
for i := 0; i < shardctrler.NShards; i++ {
if config.Shards[i] == kv.gid && kv.currentConfig.Shards[i] != kv.gid {
if kv.currentConfig.Shards[i] != 0 {
kv.stateMachines[i].Status = Pulling
}
}
if config.Shards[i] != kv.gid && kv.currentConfig.Shards[i] == kv.gid {
if config.Shards[i] != 0 {
kv.stateMachines[i].Status = BePulling
}
}
}
}
拉取,通过一个协程来定时远程拉取,我们要旧的配置上的servers,因为我们是处于过渡期,然后我们要拉取完所有我切片才能往下,所以就用到了waitgroup
func (kv *ShardKV) migrationAction() {
kv.mu.Lock()
g2s := kv.getShardIDsByStatus(Pulling)
var wg sync.WaitGroup
for gid, shardsIDs := range g2s {
DPrintf("Node{%v} start pulltask", kv.me)
wg.Add(1)
go func(servers []string, configNum int, shardIDs []int) {
defer wg.Done()
pullTaskRequest := ShardOperationRequest{configNum, shardIDs}
for _, server := range servers {
var pullTaskResponse ShardOperationResponce
srv := kv.make_end(server)
if srv.Call("ShardKV.GetShardsData", &pullTaskRequest, &pullTaskResponse) && pullTaskResponse.Err == OK {
DPrintf("Node{%v} get pulltaskResponce", kv.me)
kv.Execute(NewInsertSHardsCommand(&pullTaskResponse), &CommandResponce{})
}
}
}(kv.lastconfig.Groups[gid], kv.currentConfig.Num, shardsIDs)
}
kv.mu.Unlock()
wg.Wait()
}
拉取别的group时,我们要拉取kv存储,和clientinformation。同时我们不阻塞写入的话,我们应该应用kv的话是插入kv,而不是直接赋值。然后把把他标记为gcing,等待gcing协程来处理,同时也保障了serving状态才是默认状态,简单说,在waitgroup下,我们可以完成一轮migration和gc。
func (kv *ShardKV) applyInsertShards(shardsInfo *ShardOperationResponce) *CommandResponce {
if shardsInfo.ConfigNum == kv.currentConfig.Num {
for shardId, shardData := range shardsInfo.Shards {
shard := kv.stateMachines[shardId]
if shard.Status == Pulling {
for key, value := range shardData {
shard.KV[key] = value
}
shard.Status = Gcing
} else {
DPrintf("Node{%v} encounters duplicated shards insert", kv.me)
break
}
}
for clientId, client_info := range shardsInfo.ClientInformation {
if info, ok := kv.clientInfomation[clientId]; !ok || info.CommandId < client_info.CommandId {
kv.clientInfomation[clientId] = client_info
}
}
return &CommandResponce{OK, ""}
}
DPrintf("Node{%v} rejects outdated shards", kv.me)
return &CommandResponce{ErrOutDated, ""}
}
gc协程同理,我们来看看gc的shardkv状态转移吧。把之前的gcing转换成serving,然后把要清除的清除。在删除分片的 handler 中,首先仅可由 leader 处理该请求,其次如果发现请求中的配置版本小于本地的版本,那说明该请求已经执行过,否则本地的 config 也无法增大,此时直接返回 OK 即可,否则在本地提交一个删除分片的日志。
for _, shardId := range shardsInfo.ShardIDs {
shard := kv.stateMachines[shardId]
if shard.Status == Gcing {
shard.Status = Serving
} else if shard.Status == BePulling {
kv.stateMachines[shardId] = NewShard()
} else {
DPrintf("Node{%v} encounters duplicated deletion", kv.me)
break
}
}
shardkv状态变化图
同时做了空日志优化,因为提交必须以当前任期的日志,有时候可能会被卡住,所以我们起一个协程定期做一个空日志,也可以加速提交。
分布式的探讨
还是通过raft日志来确保大多数同样,所以上面每个协程的行为都是通过raft日志传播,applier协程的applychan来确保大多数接收了,然后应用,然后对于上层快照保存的kv的哪些元素------------看哪些元素是command命名被应用后被修改的,这样快照传过去,就相当于command被应用了。所以这一个lab,我们要多保留currentconfig和lastconfig。
正确性证明
你可能怀疑,如果gc协程先执行了,那么Bepulling的那个将被拉取的kv不是被清空了吗,其实不然。下面的代码。
实际上我们gc协程是在拉取端,所以我们发现其转为了gcing(实际上gcing状态还是挺重要的)
然后对远端(非拉取端的raft group里面的节点)的bepulling(因为拉取端是gcing,也就是远端被拉取了,可以放心删除远端了)清空,变serving,对本地gcing转serving, i然后才能向shardctrler拉取下一个config。还有一点我们对于是不是gcing的group to shards是上一个版本 lastconfig的,而kv.statemachine是这个上个版本往下个版本过度的,所以我们改为gcing,然后得到gid ,shardsId。实际上是上一个config的,所以我们可以找到bepulling。对于改变我们的gcing到serving的话,实际意义上,我们用的是拉取端的gcaction 对于shardId执行的。因为我们有两个地方都有kv.Execute。上面的是调用本地的gcing---->serving。
RPC对远端的才是bepulling------》清空,serving,如下面
有人问,那不是本地执行了多次execute吗,其实不然,因为我们每次rpc都是group to shards。
实际上本地gcing转serving,实际上远程一样是分多次的。实际上还是多次了,因为我们这个有多个servers,还是多次,但是只有当对面服务器是leader时才行,所以就一次execute。
一些之前的误解
newchannel----》notifychann是和commandId配合,一个notify_chan只用一次,用完可以马上删除,减少内存占用。
还可以参考:
GitHub - 1797818494/MIT6.824: to build a distributed system based on raft, mapReduce and so on using go
MIT6.824-2021/lab4.md at master · OneSizeFitsQuorum/MIT6.824-2021 · GitHub