MIT 6.5840(6.824) Lab 4:Fault-tolerant Key/Value Service 设计实现

news2024/9/27 15:19:29

1 实验要求

本实验旨在利用lab 3中的Raft库,构建一个具备容错能力的键值存储服务。服务将作为一个复制状态机,由多个服务器组成,各服务器通过Raft协议同步数据库状态。即使在部分故障或网络隔离的情况下,只要大多数服务器正常,服务仍需继续响应客户端请求。在lab 4完成后,你将实现图中Raft交互的所有部分(Clerk、Service和Raft)。

客户端通过Clerk与键值服务交互,发送RPC请求,支持Put、Append和Get三种操作。服务需确保这些操作线性化,如果逐个调用,这些方法应表现得好像系统只有一个状态副本,每个调用都应观察到前序调用序列对状态的修改。对于并发调用,返回值和最终状态必须与操作按某种顺序逐个执行时相同。如果调用在时间上重叠,则认为是并发调用。

为单一服务器提供线性化相对容易,但如果服务是复制的,则较为困难,因为所有服务器必须为并发请求选择相同的执行顺序,避免使用过时的状态回复客户端,并在故障恢复时以保留所有确认的客户端更新为前提。Raft 作者的博士论文的第 6.3 小节介绍了如何实现线性化语义,在知乎上也有关于这方面的讨论,可以参考 dragonboat 作者的回答。

实验分为两个阶段:A阶段实现基于Raft的键值服务,不使用快照;B阶段则集成快照功能,优化日志管理。

我的实验代码仓库:https://github.com/HeZephyr/MIT6.5840/tree/main/src/kvraft,已通过压力测试,代码严格遵守上述按要求实现。

注意:下述所贴代码为了简洁以及分块,进行了一定程度的删减,如果需要复现,可以前往仓库。

2 实验设计

2.1 思路

lab4需要我们基于lab3实现的Raft,实现一个可用的KV服务,这意味着我们需要保证线性一致性(要求从外部观察者的角度来看,所有操作都按照某个全局顺序执行,并且结果与这些操作按该顺序串行执行的结果相同)。尽管 Raft 共识算法本身支持线性化语义,但要真正保证线性化语义在整个系统中生效,仍然需要上层服务的配合。

例如,在下面这张图中:x初始值为0,client1发送put请求(x,1),client2发送put请求(x,2),并在put请求前后发送get请求,此时如果put请求因为超时不断重发,如果在client2的put请求之后才被应用,则导致最后client2读到的是1,RaftKV的结果也是1,这就违背了线性一致性。

image-20240822144442922

这是因为当客户端向服务端提交command时,服务端在Raft层中同步、提交并应用后,客户端因为没有收到请求回复,会重试此操作,这种重试机制会导致相同的命令被执行多次。注意,这里讨论的都是写请求,因为读请求不会改变系统状态,可以重复执行多次。

为了解决重复执行命令导致线性一致性破坏的问题,Raft 作者提出了一种解决方案:客户端为每个命令分配一个唯一的序列号。状态机会记录每个客户端的最新序列号及其对应的执行结果。如果一个命令的序列号已经被处理过,则系统会直接返回先前的结果,而不会重新执行该命令。这样可以确保每个命令只被应用到状态机一次,避免了重复执行可能带来的线性一致性问题。

在这个lab中,我们可以按照如下机制具体实现:

  1. 客户端命令唯一化:每个客户端发送给服务端的每个command请求都携带一个由ClientIdCommandId组成的二元组。ClientId是客户端的唯一标识符,CommandId是一个递增的整数,用于唯一标识客户端发出的每一个命令。
  2. 服务器端状态记录:在服务器端,维护一个映射表,这个映射表以ClientId作为主键,其值是一个结构体包含:
    • 最近执行的来自该客户端的CommandId
    • 对应的命令执行结果。
  3. 重复命令检测与处理
    • 当一个新命令到达时,首先检查映射表中是否存在对应的ClientId条目。
    • 如果存在,则比较新命令的CommandId与映射表中记录的CommandId
      • 如果新命令的CommandId小于或等于记录的CommandId,则说明这是一个重复命令,服务器可以直接返回之前存储的结果。
      • 如果新命令的CommandId大于记录的CommandId,则说明这是新的命令,服务器应该正常处理这个命令,并更新映射表中对应ClientIdCommandId及结果。
    • 如果不存在对应的ClientId条目,则将此命令视为首次出现的命令进行处理,并添加一个新的条目到映射表中。

2.2 lab4A:无快照

整体的时序图如下所示:

image-20240829224033039

2.2.1 客户端

对于客户端,需要有(clientId, commandId)来标识唯一命令,对于clientId,通过lab提供的随机数生成器nrand生成即可,对于commandId,可以采用递增的方式进行管理。这意味着每当客户端发送一个新的命令时,commandId都会递增一次,从而确保每个命令都有一个唯一的标识符,这样也需要保证如果这条命令没处理完(请求的server不是leader或者请求超时)需重复执行的时候,不能改变commandId。

type CommandArgs struct {
	Key       string
	Value     string
	Op        OpType
	ClientId  int64
	CommandId int64
}

type CommandReply struct {
	Err   Err
	Value string
}

type Clerk struct {
	servers []*labrpc.ClientEnd
	leaderId  int
	clientId  int64
	commandId int64
}


func (ck *Clerk) Get(key string) string {
	return ck.ExecuteCommand(&CommandArgs{Key: key, Op: OpGet})
}

func (ck *Clerk) Put(key string, value string) {
	ck.ExecuteCommand(&CommandArgs{Key: key, Value: value, Op: OpPut})
}

func (ck *Clerk) Append(key string, value string) {
	ck.ExecuteCommand(&CommandArgs{Key: key, Value: value, Op: OpAppend})
}

func (ck *Clerk) ExecuteCommand(args *CommandArgs) string {
	args.ClientId, args.CommandId = ck.clientId, ck.commandId
	for {
		reply := new(CommandReply)
		if !ck.servers[ck.leaderId].Call("KVServer.ExecuteCommand", args, reply) || reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {
			ck.leaderId = (ck.leaderId + 1) % len(ck.servers)
			continue
		}
		ck.commandId += 1
		return reply.Value
	}
}
2.2.2 服务端

KVServer结构体被设计成一个基于Raft一致性协议实现的键值存储服务。为了确保客户端请求的幂等性,并且能够正确地处理来自客户端的重复请求,lastOperations映射表用于跟踪每个客户端(由clientId标识)的最后已应用的commandId以及相应的reply。这使得服务器能够在接收到重复请求时返回之前的结果而无需再次执行相同的命令。

状态机stateMachine在此处被实现为内存中的键值对存储MemoryKV,这意味着所有的键值对数据都保存在内存中,这对于快速读写操作是非常有效的,但可能不是持久化存储的最佳选择,因为如果服务器重启或崩溃,所有数据都会丢失。

lastApplied字段被用来记录最后应用到状态机的日志条目的索引,以此来避免处理那些已经被应用过的过期日志条目。

notifyChs是一个映射,它的键是日志条目的索引,值是一个channel。用于通知Raft的处理结果(机即复制到大多数副本并且应用到状态机之后)。

type KVServer struct {
	mu      sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big
	lastApplied  int //record the last applied index to avoid duplicate apply

	stateMachine   KVStateMachine
	lastOperations map[int64]OperationContext
	notifyChs      map[int]chan *CommandReply
}

type KVStateMachine interface {
	Get(key string) (string, Err)
	Put(key, value string) Err
	Append(key, value string) Err
}

type OperationContext struct {
	MaxAppliedCommandId int64
	LastReply           *CommandReply
}

ExecuteCommandRPC实现如下,这段首先检查是否不是Get请求且为重复的命令,如果是则返回上次的结果,否则通过Raft的Start方法复制并应用日志,如果Start方法返回结果告知当前server不是Leader,则返回ErrWrongLeader,否则,去注册一个channel去阻塞等待执行结果(因为Start返回只是代表日志被复制到大多数节点中,有没有应用还不知道),这个执行结果由applier协程push。

func (kv *KVServer) ExecuteCommand(args *CommandArgs, reply *CommandReply) {
	kv.mu.RLock()
	if args.Op != OpGet && kv.isDuplicatedCommand(args.ClientId, args.CommandId) {
		lastReply := kv.lastOperations[args.ClientId].LastReply
		reply.Value, reply.Err = lastReply.Value, lastReply.Err
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()
	index, _, isLeader := kv.rf.Start(Command{args})
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	kv.mu.Lock()
	ch := kv.getNotifyCh(index)
	kv.mu.Unlock()

	select {
	case result := <-ch:
		reply.Value, reply.Err = result.Value, result.Err
	case <-time.After(ExecuteTimeout):
		reply.Err = ErrTimeout
	}
	go func() {
		kv.mu.Lock()
		kv.deleteNotifyCh(index)
		kv.mu.Unlock()
	}()
}
2.2.3 applier

applier协程实现如下,主要是监控applyCh,根据Raft的应用结果来进行响应处理,需要注意的就是检测是否为重复的命令,如果不是,则需要应用到状态机,并保存最近的响应结果。最后,如果当前节点是领导者,并且该日志条目属于当前任期,则通知相关的客户端。

func (kv *KVServer) applier() {
	for kv.killed() == false {
		select {
		case message := <-kv.applyCh:
			DPrintf("{Node %v} tries to apply message %v", kv.rf.GetId(), message)
			if message.CommandValid {
				kv.mu.Lock()
				if message.CommandIndex <= kv.lastApplied {
					DPrintf("{Node %v} discards outdated message %v because a newer snapshot which lastApplied is %v has been restored", kv.rf.GetId(), message, kv.lastApplied)
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = message.CommandIndex

				reply := new(CommandReply)
				command := message.Command.(Command) // type assertion
				if command.Op != OpGet && kv.isDuplicatedCommand(command.ClientId, command.CommandId) {
					DPrintf("{Node %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v", kv.rf.GetId(), message, kv.lastOperations[command.ClientId], command.ClientId)
					reply = kv.lastOperations[command.ClientId].LastReply
				} else {
					reply = kv.applyLogToStateMachine(command)
					if command.Op != OpGet {
						kv.lastOperations[command.ClientId] = OperationContext{
							MaxAppliedCommandId: command.CommandId,
							LastReply:           reply,
						}
					}
				}

				// just notify related channel for currentTerm's log when node is leader
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
					ch := kv.getNotifyCh(message.CommandIndex)
					ch <- reply
				}
				kv.mu.Unlock()
			}
		}
	}
}

2.2 lab4B:有快照

实现了lab4A,lab4B就好做了,只需要修改applier,每次应用了command之后,都需要检查是否达到maxraftstate,如果达到,则调用snapshot来制作快照,需要注意,快照中,不仅需要保存状态机的状态,还需要包含用来去重的lastOperations,这也是为了防止应用快照后的节点成为leader后,由于没有lastOperations导致重复执行命令。

然后,applyCh中还有Leader发来的快照,我们需要进行验证,如果有效,则需要更新相应的状态,具体实现代码如下:

func (kv *KVServer) applier() {
	for kv.killed() == false {
		select {
		case message := <-kv.applyCh:
			DPrintf("{Node %v} tries to apply message %v", kv.rf.GetId(), message)
			if message.CommandValid {
				kv.mu.Lock()
				if message.CommandIndex <= kv.lastApplied {
					DPrintf("{Node %v} discards outdated message %v because a newer snapshot which lastApplied is %v has been restored", kv.rf.GetId(), message, kv.lastApplied)
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = message.CommandIndex

				reply := new(CommandReply)
				command := message.Command.(Command) // type assertion
				if command.Op != OpGet && kv.isDuplicatedCommand(command.ClientId, command.CommandId) {
					DPrintf("{Node %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v", kv.rf.GetId(), message, kv.lastOperations[command.ClientId], command.ClientId)
					reply = kv.lastOperations[command.ClientId].LastReply
				} else {
					reply = kv.applyLogToStateMachine(command)
					if command.Op != OpGet {
						kv.lastOperations[command.ClientId] = OperationContext{
							MaxAppliedCommandId: command.CommandId,
							LastReply:           reply,
						}
					}
				}

				// just notify related channel for currentTerm's log when node is leader
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
					ch := kv.getNotifyCh(message.CommandIndex)
					ch <- reply
				}
				if kv.needSnapshot() {
					kv.takeSnapshot(message.CommandIndex)
				}

				kv.mu.Unlock()
			} else if message.SnapshotValid {
				kv.mu.Lock()
				if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
					kv.restoreStateFromSnapshot(message.Snapshot)
					kv.lastApplied = message.SnapshotIndex
				}
				kv.mu.Unlock()
			} else {
				panic(fmt.Sprintf("Invalid ApplyMsg %v", message))
			}
		}
	}
}

3 压测结果

网上提供了一个测试脚本,功能强大。我的压测结果如下所示:

image-20240820095009785

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2089365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通过MindOpt APL建模求解组合优化问题中的常见问题:图着色问题

组合优化问题&#xff1a;图着色问题 通过MindOpt APL建模求解组合优化问题中的常见问题&#xff1a;图着色问题 1. 背景知识 1.1. 组合优化问题 在之前发布的《组合优化问题&#xff1a;装箱问题》中&#xff0c;我们讲解了什么是组合优化&#xff08;Combinatorial Optimi…

【练习1】数字统计

题目&#xff1a; 分析&#xff1a; 枚举法数字拆分&#xff08;tmp % 10&#xff0c;tmp / 10&#xff09; 代码实现&#xff1a; public class Main {public static void main(String[] args) {Scanner in new Scanner(System.in);int l in.nextInt(), r in.nextInt();…

【2024 CCF编程能力等级认证(GESP)Python 】一级大纲

目录 1. 背景2. 考核知识块3. 考核内容3.1 计算机基础知识3.2 编程规范3.3 基础语法3.4 数据类型3.5 三大基本结构3.6 运算符3.7 模块导入与输入输出3.8 Turtle绘图4. 考核目标5. 题型分布6. 考试时长7. 认证时间与报名8. 政策与福利9. GESP一级认证形式 1. 背景 官网&#xff…

11 对话模型微调

提问&#xff1a;其实我一直觉的数据是最费事的一个&#xff0c;现在都是使用别人的数据&#xff0c;如果对于实际场景中那么我们该如何获取处理数据呢&#xff01; 1 数据处理&#xff1b; 2 模型选择&#xff0c;调参数&#xff1b; 数据 llm-wizard/alpaca-gpt4-data-zh …

简单的二叉树问题——二叉树的最大深度

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;root [1,null,2] 输出…

【二叉树进阶】--- 前中后序遍历非递归

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 算法Journey 本篇博客我们将来了解有关二叉树前中后序遍历的非递归版本。 &#x1f3e0; 前序遍历 要迭代非递归实现二叉树的前序遍历&#xff0c;首先还…

【pgAdmin4】创建/删除:数据库Database和数据库表Table

目录 0.环境 1.简介 2.详细步骤 1&#xff09;创建数据库 法一&#xff1a;UI界面创建 法二&#xff1a;sql语句创建数据库 2&#xff09;创建数据库表 查看数据库表 查看数据库表内容 法一&#xff1a;UI界面创建数据库表 法二&#xff1a;sql语句创建数据库表 3&…

C_09_字符操作

字符串相关函数 头文件都是 string.h 概述&#xff1a; 由系统提供的由字符串处理的函数 属于库函数 所属头文件 string.h 1 strlen 测量字符串长度 语法&#xff1a; size_t strlen(const char *s);参数&#xff1a;要测量的字符串返回值&#xff1a;长度注意:测量的长度…

Screenshot Software,截屏软件

一.截屏软件 1.1 自带的一些截屏软件 &#xff08;1&#xff09;微信 &#xff08;2&#xff09;QQ 有一次无意中测试了下&#xff0c;截屏软件的截屏质量&#xff0c;发现对于同一个页面截全屏&#xff0c;微信截的屏质量相对于 win 自带的截图软件还要好的&#xff0c;所以…

Elasticsearch 开放推理 API 增加了对 Anthropic 的 Claude 的支持

作者&#xff1a;来自 Elastic Jonathan Buttner 我们很高兴地宣布 Elasticsearch Open Inference API 的最新功能&#xff1a;集成 Anthropic 的 Claude。这项功能使 Elastic 用户能够直接连接到 Anthropic 平台&#xff0c;并使用 Claude 3.5 Sonnet 等大型语言模型来构建 Ge…

Java 应用服务器有哪些?

应用服务器 Java 应用服务器是专门用来运行基于Java技术的Web应用程序的服务器。 这些服务器支持Java EE&#xff08;Java Platform, Enterprise Edition&#xff09;规范&#xff0c;提供了多种服务&#xff0c;如事务管理、Java Naming and Directory Interface (JNDI)、数…

机器之心 | 五倍吞吐量,性能全面包围Transformer:新架构Mamba引爆AI圈

本文来源公众号“机器之心”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;五倍吞吐量&#xff0c;性能全面包围Transformer&#xff1a;新架构Mamba引爆AI圈 屹立不倒的 Transformer 迎来了一个强劲竞争者。 在别的领域&#xff…

探索 HarmonyOS NEXT Developer Beta6,开启创新应用

华为的鸿蒙 NEXT 的发展越来越快 HarmonyOS NEXT Developer Beta6 版本的推出&#xff0c;为开发者们带来了全新的机遇和挑战。这个版本不仅拥有诸多令人振奋的新特性&#xff0c;还提供了丰富的新增资源和精彩的开发者活动。 新特性方面&#xff0c;DevEco Studio NEXT Dev…

算法练习题06:leetcode793每日温度

单调栈解法 class Solution {public int[] dailyTemperatures(int[] temperatures) {int length temperatures.length;int[] ans new int[length];Stack<Integer> stack new Stack<>();for(int i 0;i<length;i){int temperature temperatures[i];while(!…

系统功能性能优化:从问题定位到解决方案的系统性分析

引言 在现代软件系统中&#xff0c;性能优化是确保系统稳定、响应迅速和资源高效利用的关键。面对复杂的系统架构和业务逻辑&#xff0c;进行性能优化往往需要遵循一系列系统性的步骤&#xff0c;以确保问题被准确识别&#xff0c;解决方案被有效实施。以下是一套专业的系统功…

Linux下的使用字符设备驱动框架编写ADC驱动 ——MQ-4传感器

ADC的原理 ADC 的作用&#xff1a;模拟信号转换为数字信号 模拟信号一般是指连续变化的电压信号&#xff0c;其数值在一定范围内变化。 而数字信号是由一系列离散的数字表示&#xff0c; 只能取有限的值&#xff0c;通常以二进制形式表示。 ADC通常由一个采样保持电路、一个…

C++(Qt)-GIS开发-QGraphicsView显示瓦片地图简单示例2

C(Qt)-GIS开发-QGraphicsView显示瓦片地图简单示例2 文章目录 C(Qt)-GIS开发-QGraphicsView显示瓦片地图简单示例21、概述2、实现效果3、主要代码4、源码地址 更多精彩内容&#x1f449;个人内容分类汇总 &#x1f448;&#x1f449;GIS开发 &#x1f448; 1、概述 支持多线程…

Android 事件分发:为什么有时候会出现事件冲突?事件的顺序是如何的?出现事件冲突如何解决呢?比如为什么左右可以滑动,而上下却不行?

目录&#xff1a; 一、为什么要学习事件呢&#xff1f; 1.在开发复杂的应用时&#xff0c;经常需要处理复杂的用户交互逻辑。学习事件分发机制可以帮助你更好地控制事件的传递和处理流程&#xff0c;从而解决一些复杂的交互问题&#xff0c;如滑动冲突、点击穿透等。 2.面试需…

NLP笔记:BLEU

1 介绍 bleu是一种文本评估算法&#xff0c;它是用来评估机器翻译跟专业人工翻译之间的对应关系核心思想就是机器翻译越接近专业人工翻译&#xff0c;质量就越好&#xff0c;经过bleu算法得出的分数可以作为机器翻译质量的一个指标 2 BLEU原理 2.1 N-gram BLEU采用了N-gram…

NLP(三):词向量

自然语言处理&#xff0c;处理的是自然的需要&#xff0c;通过分词后得到我们想要的词&#xff0c;但是不可能直接把这种自然语言传递给计算机来理解。这时候就有一个概念叫词向量&#xff0c;用来表示词的特征向量或表征。 一&#xff0c;词向量的表示 词向量的表示主要有两…