Part A 分片控制器
1. 整体思路
-
和lab3A一样,shardctler也是一个服务,由客户端调用。这个服务建立在raft集群上,保证容错。
-
shardctler也应该保证线性一致性和重复请求的问题,因此也需要记录clientid和messageid。
-
shardctler保存了当前的分片信息,称为配置
- Num:当前配置号
- Shards:每一个分片对应的副本组id—gid
- Groups:每个组(gid)对应哪些服务器
type Config struct { Num int // config number Shards [NShards]int // shard -> gid [gid1, gid2, gid3,...] Groups map[int][]string // gid -> servers[] {gid1:[x1,y1,z1], gid2:[x2,y2,z2], ...} }
-
shardcltr包含4个操作,Join、Leave、Move、Query
- Join:添加组 { gid : servers, …},新配置应尽可能将分片均匀地分配到整组组中,并应移动尽可能少的分片以实现该目标
- Leave:移除组,并将该组上的分片均匀分配给其他组
- Move:转移分片**,**一些组可能比其他组负载更多,因此需要移动分片来平衡负载
- Query:查询当前的Config
-
一些需要注意的地方
- Join和Leave都涉及到对Groups 的修改,因此需要重新调整Shards(这里涉及到负载均衡),因为有可能gid已经没有对应的服务器了,因此分片也不应该由该gid管理
- 除了Qeury,其他三种操作都涉及到修改config,因此需要添加新的config
2 流程
- 客户端对shardctler发出请求
- shardctler通知raft Start()
- raft执行AppendEntries,通知其他raft节点日志复制
- raft收到过半确认后,通知shardctler执行请求 applyCh管道
- shardctler执行 收到信号(实际上shardctler会在applyRoutine这个协程上一直等applyCh),根据clientid,messageid判断是否执行过该条指令,没有执行过就执行
- shardctler执行完后,通知 可以 返回结果给客户端 broadcastCh管道
3 一些细节
- 通常来说,分片比组多,相当于每个组都会负责多个分片。
- Join操作
- 向当前Config中新增一些组
- 新增后,需要调整shard,保证负载均衡
- Leave操作
- 移除一些组
- 移除后,需要调整shard,保证负载均衡
- 如何实现负载均衡?
-
每当Join、Leave后,Groups都会发生变化,因此需要重新调整Shard。由于分片比组多,因此每个组都会负责多个分片。为了让服务器实现均衡,通过shard%len(gids)找到,shard对应的gid,这样,每个组负责的shard就是尽可能平衡的
sort.Ints(gids) // 假设:gids = [1,2,3,4] for shard := range config.Shards { // 0-9 config.Shards[shard] = gids[shard%len(gids)] // 负载均衡算法,shard_0映射到gids[0], shard_1映射到gids[1], ..., shard_5映射到gids[1], ..., }
-
4 问题
- 如何实现负载均衡?
-
方案一:取模映射 O(nlogn),但是移动的并不是最少
-
通过shard%len(gids)找到,shard对应的gid,这样,每个组负责的shard就是尽可能平衡的
sort.Ints(gids) // 排序后,才能保证移动最少 for shard := range config.Shards { // 0-9 config.Shards[shard] = gids[shard%len(gids)] // 负载均衡算法,shard_0映射到gids[0], shard_1映射到gids[1], ..., shard_5映射到gids[1], ..., }
-
-
方案二:直接移动 O(n^2),移动的次数最少
-
需要一个map,记录当前的gid有多少个shard
gid_num_shard_map{ 1:5 2:5 } minNum = 5; maxNum = 5;
-
循环让拥有最多shard的Group分一个给拥有最少的Group,直到maxNum - minNm=1
-
-
Part B 多集群KV存储—Multi-Raft
1 整体思路
-
更新配置
-
什么时候会更新配置??
- 当此时分片没有分配对应组时(shard[i]=0),kv需要询问分配控制器,最新的配置
-
代码:作为一个协程单独跑,定时检查是否要更新
func (kv *ShardKV) configurationRoutine() { for !kv.killed() { kv.mu.Lock() defaultShard := true // 默认的切片,就是初始化的时候,此时切片还没分配组 for _, shard := range kv.state { if shard.Status != Default { defaultShard = false break } } num := kv.config.Num kv.mu.Unlock() _, isLeader := kv.rf.GetState() if isLeader && defaultShard { // 询问分配控制器,当前配置的下一个配置 nextConfig := kv.clerk.Query(num + 1) nextNum := nextConfig.Num // 如果获取了新配置,需要通知kv,改变当前的配置,因为这是一个命令,需要raft进行日志复制,然后再执行 if num+1 == nextNum { command := Configuration{ Config: nextConfig, } index, _, isLeader := kv.rf.Start(command) if isLeader { // 等到kv执行了更新config请求,或者超时才会返回 kv.consensus(index) } } } time.Sleep(100 * time.Millisecond) } }
-
得到新配置后,需要处理一下两种情况
- 设置Pull状态:新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来.
- 设置WaitDelete状态:新配置shard不属于kv,旧配置shard属于kv,也就是说,此后kv不应该负责当前分片,但数据库中(kv.state)有数据,此时需要删除。但也不是马上删除,而是在删除完对方的数据后 **DeleteShardRPC()**返回,再删除自己的
-
-
拉取数据
-
什么时候才会拉取成为拉取状态?
- 新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来
-
更新配置后,并不表示所属分片可以立刻对外提供服务,还需要等待在上一个版本的Config中不属于自身的shard从它之前所属的Group中迁移到本Group
-
拉取数据的方法应该怎么进行,同步?异步?
- 不能同步,因为如果在更新日志的时候同步阻塞整个协程,会影响其他的对外请求
- 不能异步,leader 可能会在 新配置之后 到 新数据被异步拉取到并提交日志之前宕机,而 follower 虽然会apply 配置但是不会去拉数据,这样这些数据将永远无法被更新。
- 在更新配置中,我们只根据情况改变状态
for shard, gid := range nextConfig.Shards { targetGid := kv.config.Shards[shard] // 当前配置shard分片的gid // 新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来 if gid == kv.gid && targetGid != kv.gid && targetGid != 0 { kv.state[shard].Status = Pull } // 新配置shard不属于kv,旧配置shard属于kv,也就是说,此后kv不应该负责当前分片,但数据库中(kv.state)有数据,此时需要删除 if gid != kv.gid && targetGid == kv.gid && gid != 0 { kv.state[shard].Status = Push } }
-
在拉取数据,更新配置时,如何保证线性一致性,也就是如何让用户不会发现这个过程?
-
在拉取数据时,将kv的此时已经接受的客户端请求信息也给对方
// 将给当前kv发送过请求的client信息给对方,因为后续客户端可以再向对方请求数据,这样在更新配置时,也能保证线性一致性 client := make(map[int64]int) for clientId, messageId := range kv.client { client[clientId] = messageId }
-
-
-
数据删除
- 什么时候要删除数据?
-
当拉取数据成功后,需要将通知数据来源方删除自己的数据,标记为Collection状态
// 把数据给 对应的Pull状态的分片 for shard := range state { if kv.state[shard].Status == Pull { for k, v := range state[shard].State { kv.state[shard].Put(k, v) } // 拉取数据后,成为Collection状态 kv.state[shard].Status = Collection } }
-
当对方删除完毕后,**DeleteShardRPC()**返回,此时还要删除自己WaitDelete状态的数据
-
- 什么时候要删除数据?
2 流程
- 初始下,共有10个分片,3个shardkv集群组,1个分片控制器集群
- 客户端通过key映射得到分片,然后更具配置,找到对应的集群组
- 集群组初始化下还没有配置,发出更新配置请求 configurationRoutine()
- 更新配置后,如果新配置shard属于kv && 旧配置shard分片不属于kv,需要拉取数据 pullShardRoutine()
- 拉取数据后,还需要让数据来源方删除shard对应的数据 deleteShardRoutine()
- KV数据库执行请求,返回数据给客户端
4 问题
-
为什么要使用多集群?
- 之前,只有一个集群时,所有请求都由leader负责,当数据量大后,请求增多,leader面临的压力非常大,请求响应的时间也会延长,这种情况下,简单增加机器并不会由性能提升。因此可以将数据分开存储到不同集群,将不同请求引流到不同集群,降低单一集群的压力。
-
如何保证一个集群组丢失了一个分片,需要立即停止向该分片中的键提供请求?
-
每次kv在响应请求前,都会通过staleShard(shard int),判断当前shard是不是自己负责
/*准备响应Get请求*/ kv.mu.Lock() if kv.staleShard(shard) { response.Err = ErrWrongGroup kv.mu.Unlock() return } if method == "Get" { response.Value = kv.state[shard].Get(key) } kv.mu.Unlock() /*准备响应Put、Append请求*/ if !kv.staleShard(shard) && kv.client[clientId] < messageId { switch method { case "Put": kv.state[shard].Put(key, value) case "Append": kv.state[shard].Append(key, value) } kv.client[clientId] = messageId }
-
-
如何保证在分片移动后的线性一致性?
- kv在拉取数据时,对端也会把client信息返回,这样当前kv就有了当前客户的信息,如果该客户端发送了一个已经执行过的请求,将不再执行。
-
如何实现在配置更改时,对于那些没移动的分片,可以正常响应给客户端呢?
- 只要是Collection和Default状态的切片,都认为是拥有此分片的
- Collection只会在拉取到数据后被设置
- Default就是拥有这些切片
- staleShard会去判断是否拥有当前切片,只要拥有,就可以对外提供服务