利用Go语言模拟实现Raft协议

news2025/1/10 23:06:27

  近来学习到区块链,想要模拟实现 Raft 协议。但是发现网上教程很杂,或者说很多教程并不适合于新手从零开始进行实现。

  本文将从头开始复现个人模拟实现 Raft 的过程,完成后整个模拟后,读者应该学会 Go 语言的基本语法、Rpc 编程的基本概念与用法、简易 Raft 协议的过程。

  系统实现:本地 Raft 节点注册,Raft 节点的投票和选举,心跳监听,超时选举,Http监听,日志复制。

  Tips:本文是按照作者实现过程进行复现,因此如果想要学习可以通篇慢慢读下去,全篇大概1.2W字左右。如果需要直接的代码可以参考以下链接,每个部分都放在不同的文件夹中,理论上可以直接运行。不拒绝白嫖代码,但如果有用请点一个星

Gitee 地址:https://gitee.com/moisten-white/go-raft

GitHub 地址:https://github.com/huang-juanjuan/Go-Raft

文章目录

  • 一、Raft 关键算法
  • 二、创建 Raft 节点组
    • 2.1 Raft 节点的创建
    • 2.2 Rpc 服务注册
    • 2.3 Peers 的构建
    • 2.4 测试主函数
  • 三、投票功能的实现
    • 3.1 阻塞以及心跳检测功能
    • 3.2 判断能否成为 Candidate
    • 3.3 Candidate 进行选举
    • 3.4 节点回应投票
    • 3.5 Candidate 收集投票
    • 3.6 选举定时器
    • 3.7 测试主函数
  • 四、心跳检测
    • 4.1 Leader 发送心跳消息
    • 4.2 Follower 回应心跳消息
    • 4.3 心跳定时器
    • 4.4 测试结果
  • 五、创建日志
    • 5.1 监听 Http
    • 5.2 取消监听
    • 5.3 测试结果
  • 六、日志复制
    • 6.1 Leader 广播新日志
    • 6.2 Leader 广播日志复制命令
    • 6.3 Follower 回应日志复制
    • 6.4 Leader 更改日志复制命令
    • 6.5 Follower 复制日志
    • 6.6 测试结果
  • 七、日志提交
  • 八、小结

一、Raft 关键算法

  本文按照主要 9 个算法模拟实现 Raft 协议。

  这一步很关键,对于第一次尝试的初学者,最开始往往不知道从何处下手,这也是网上很多 Raft 协议模拟的文章的问题。许多文章按照整个 Raft 实现的思路,只有完成所有代码才能知道项目是否完成,这对初学者来说是非常不友好的。

  本文将按照本人实现的过程,分成若干个可以验证的部分进行讲解,读者可以同时完成每个一级标题后进行验证代码的正确性
在这里插入图片描述

二、创建 Raft 节点组

2.1 Raft 节点的创建

  初学者针对 Raft 实现时,应该针对当前需要一步步进行舔砖加瓦。对于 Raft 结构体的元素,一开始包含基本的 ID、Peers、currentRole 等即可,而 log 可以先用 []string 进行代替。log 具体的结构体等内容,后续再进行逐步添加。以下是个人最开始使用的 Raft 结构体,以及如何进行初始化,供参考:

// Raft 结构
type Raft struct {
mu sync.RWMutex // 互斥锁
id string       // 端口号

peers         []string // 所有节点的端口号
currentTerm   int      // 当前 Term 号
currentRole   Role     // 当前角色
currentLeader string   // 当前 Leader 的端口号
Logs          []string // 日志

votedFor     string   // 当前 Term 中给谁投了票
voteReceived []string // 收到的同意投票的端口号

sentLength  map[string]int // 每个节点日志复制的插入点
ackedLength map[string]int // 每个节点已接收的日志长度

electionTimer     *time.Timer  // 选举定时器
heartBeatTimer    *time.Ticker // 心跳计时器
lastHeartBeatTime int64        // 上次收到心跳的时间
timeout           int          // 心跳超时时间
}

// NewRaft 创建并初始化一个新的 Raft 实例
func NewRaft(id string) *Raft {
rf := &Raft{
	id:                id,
	peers:             []string{},
	currentTerm:       0,
	currentRole:       Follower,
	currentLeader:     "null",
	Logs:              []string{},
	votedFor:          "null",
	voteReceived:      []string{},
	sentLength:        make(map[string]int),
	ackedLength:       make(map[string]int),
	electionTimer:     time.NewTimer(time.Duration(rand.Intn(5000)+5000) * time.Millisecond),
	heartBeatTimer:    time.NewTicker(1000 * time.Millisecond),
	lastHeartBeatTime: time.Now().UnixMilli(),
	timeout:           5,
}
return rf
}

  先有了 Raft 的初始化,我们先进行的尝试是创建 Raft 节点,并使若干个节点之间能够相互更新,形成一个群组。在学习过程中,本人了解到的是 Raft 节点有一个上位节点作为中心节点,用于 Term 号的广播和维护 Peers 记录所有节点端口号。

  本人在此的思路是“使用命令行参数,在创建节点的时候指定节点的端口号,并指定中心节点的端口号,新节点通过和指定的中心节点通信创建自身 Peers,中心节点广播通知已有节点添加新节点”。这样便能在中心节点挂掉时,能够继续维护节点组的运行。同时这种方法对于初学者比较友好,省略了构建中心节点的不同属性、方法和接口等。

2.2 Rpc 服务注册

  为了能实现广播等功能,我们需要使用 Rpc 编程,对于初学者来说,这部分也是最难入门的一点。

  首先是如何理解 Rpc 编程,Rpc 全程 Remote Procedure Call,即“远程过程调用”。本人的理解是,把 Raft 节点想象成若干个物理上分散的节点,每个节点就像一个实体类,Rpc 函数就是类的方法,Rpc 就是连接到实体类,通过连接调用实体类的方法

下列代码中 client 就相当于实体类的一个引用,使用 client.call 调用 Raft 节点的 ReplyVote 函数。

client, err := rpc.DialHTTP("tcp", peer)
if err != nil {
	log.Println("Dialing error: ", err)
	rf.mu.Lock()
	rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
	rf.mu.Unlock()
		return
}
var reply RequestVoteReply
err = client.Call("Raft.ReplyVote", args, &reply)
if err != nil {
	log.Println("RPC error: ", err)
	return
}

  本人在注册 Rpc 时也经历了非常长的时间去理解,此处借鉴了另一个人的用法,同时注册了 Http 请求的监听,便于之后的日志的创建。
借鉴地址:https://github.com/corgi-kx/blockchain_consensus_algorithm/tree/master/raft

// startRaftServer 启动一个基于 HTTP 的 RPC 服务器,用于处理 Raft 节点之间的 RPC 请求。
func startRaftServer(rf *Raft, port string) {
	// 将 Raft 实例 rf 注册为一个 RPC 服务,以便其他节点可以通过 RPC 调用 rf 的方法。
	err := rpc.Register(rf)
	if err != nil {
		log.Panic(err)
	}

	// 将 RPC 服务绑定到 HTTP 协议上,允许通过 HTTP 协议处理 RPC 调用。
	// 这会为 RPC 服务创建一个 "/rpc" 端点,并监听来自 HTTP 请求的 RPC 调用。
	rpc.HandleHTTP()

	// 在指定的端口上启动 TCP 监听器,以便接收来自其他 Raft 节点的连接请求。
	listener, err := net.Listen("tcp", ":"+port)
	if err != nil {
		log.Fatalf("Error starting server: %s", err)
	}

	// 使用 HTTP 服务器处理通过监听器接收到的连接。
	// nil 代表使用默认的 HTTP 请求多路复用器 (http.DefaultServeMux) 来分发请求。
	// 这个操作在一个新的 Go routine 中执行,以便服务器可以并行处理多个请求。
	go http.Serve(listener, nil)
}

  此处在讲解一下 go function() 的用法,本人的理解就是将 go routine 当做一个线程,go function() 就是启动了一个 function() 函数作为线程运行,此时原本调用 go function() 的函数即使退出也不影响 function() 的运行,除非整个程序的终止或满足退出条件。还可以通过信号机制进行管理,但是此项目并未使用。

2.3 Peers 的构建

  这一小节是这部分最关键的内容,即对于一个已有的节点组,新节点如何加入这个群组。按照上文提到的方法,关键在于设置一个中心节点,让新节点通过中心节点得到已有节点,再通过中心节点让所有已有节点能更新新节点。

  为此我们需要两个函数,一个用于已有节点进行更新,一个用于新节点的注册。

  首先是已有节点进行更新,即添加一个新节点的端口号,在本人的 Raft 结构体中可以看到 Peers 是一个 string 数组用于记录端口号,添加新节点的过程其实类似于数组的添加,但是需要注意的是避免重复添加的过程,因此实际上可以利用 map 进行记录

以下为添加节点的 Rpc 函数,对于 Rpc 函数我们需要注意的是,函数名前面的 (rf *Raft),表示这是一个 Raft 的函数。Rpc 函数的返回值也必须为 error,其参数也必须为两个指针类型,一个用于传参,一个可以用于记录返回值,因此通常将两个参数用结构体进行包装

// 定义请求和响应结构
type AddPeerArgs struct {
// 新节点的端口号
	NewPeer string
}

type AddPeerReply struct {
// 更新后的新群组 peers
	Peers []string
}

// Rpc 函数用于广播通知已有节点添加新节点
func (rf *Raft) AddPeer(args *AddPeerArgs, reply *AddPeerReply) error {
// 先对 rf 上锁,避免出现广播中 peers 改变导致出现重复添加的情况
	rf.mu.Lock()
// defer 表示函数退出时再执行解锁操作
	defer rf.mu.Unlock()

	// 遍历 peers 检查是否已经存在该 peer
	for _, peer := range rf.peers {
		if peer == args.NewPeer {
			fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)
			reply.Peers = rf.peers
			return nil
		}
	}

	// 如果不存在则添加
	rf.peers = append(rf.peers, args.NewPeer)
	reply.Peers = rf.peers

	// 打印更新后的 peers 列表,此处可以作为验证性打印,检查是否正确更新节点
	fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)
	return nil
}

  以下为新节点的注册函数,新节点将端口号包装后,调用中心节点的该函数进行注册,通过 reply 将已有的所有节点告知新节点进行注册,再将添加了新节点的端口号广播给已有节点进行更新。

// 该方法会将新节点添加到当前节点的 peers 列表中,并将新节点信息广播给其他节点。
func (rf *Raft) RegisterNode(args *AddPeerArgs, reply *AddPeerReply) error {
	rf.mu.Lock()  
	defer rf.mu.Unlock()  

	// 检查是否已经存在该 peer
	for _, peer := range rf.peers {
		if peer == args.NewPeer {
			fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)
			reply.Peers = rf.peers
			return nil
		}
	}
	// 如果不存在则添加
	rf.peers = append(rf.peers, args.NewPeer)
	// 打印当前节点的 peers 列表,用作调试
	fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)


	// 将新节点信息广播给当前节点的其他 peers
	for _, peer := range rf.peers {
		// 跳过新节点本身和当前节点
		if peer == args.NewPeer || peer == "localhost:"+rf.id {
			continue
		}
  
		// 尝试通过 RPC 连接到其他节点
		client, err := rpc.DialHTTP("tcp", peer)
		if err != nil {
			log.Println("Dialing:", err)  // 如果连接失败,记录错误并继续下一个节点
			continue
		}
		// 准备将新节点添加到其他节点的 peers 列表中
		addPeerArgs := &AddPeerArgs{NewPeer: args.NewPeer}
		var addPeerReply AddPeerReply
		// 调用其他节点的 AddPeer 方法,将新节点信息发送过去
		err = client.Call("Raft.AddPeer", addPeerArgs, &addPeerReply)
		if err != nil {
			log.Println("RPC error:", err)
		}
	}

	return nil  // 方法执行成功,返回 nil 表示没有错误
}

2.4 测试主函数

  在完成上面三个小节之后,我们的系统应该能基本做到构建一个 Raft 节点组,能实现新节点的添加,这部分可以通过如下的主函数进行测试。需要注意的是,在广播新节点端口号时,在端口号前添加了 “localhost:”,使本地节点上可以相互进行连接通信。

func main() {
	// 定义命令行参数:端口号和中心节点的地址
// flag 使用 import 导入
	portPtr := flag.String("port", "0", "端口号")
	centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")
	flag.Parse()  // 解析命令行参数

	// 检查是否提供了有效的端口号,如果端口号为 "0",程序退出
	if *portPtr == "0" {
		fmt.Println("请确认端口号")
		os.Exit(1)
	}

	// 创建一个新的 Raft 实例,使用提供的端口号
	rf := NewRaft(*portPtr)

	// 启动 Raft 服务器,监听 RPC 请求
	go startRaftServer(rf, *portPtr)

	// 尝试通过 RPC 连接到中心节点
	client, err := rpc.DialHTTP("tcp", *centralNodePtr)
	if err != nil {
		log.Fatalf("Error connecting to central node: %s", err)  // 如果连接失败,打印错误信息并退出程序
	}

	// 创建请求参数,将当前节点的信息传递给中心节点
	args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}
	var reply AddPeerReply

	// 调用中心节点的 RegisterNode 方法,将当前节点注册到集群中
	err = client.Call("Raft.RegisterNode", args, &reply)
	if err != nil {
		log.Fatalf("RPC error: %s", err)  // 如果调用失败,打印错误信息并退出程序
	}

	// 更新当前节点的 peers 列表,使用从中心节点获取的 peers 列表
	rf.mu.Lock()
	rf.peers = reply.Peers
	fmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)
	rf.mu.Unlock()

	// 阻塞 main 函数,防止程序退出
	select {}
}

  在文件目录下使用 go build -o test.exe 后生成可执行文件,调用验证 Peers 能否正确更新,命令行参数的使用,以及是否会出现重复添加等情况。

在这里插入图片描述

三、投票功能的实现

  在这部分,我们将实现基本的投票功能。当然,由于心跳检测功能还不够完善,所以还不能算是一次完整的选举过程,但为了尽量每一部分都可以验证,本节还是非常有必要的。

3.1 阻塞以及心跳检测功能

  在投票功能中,一个 Follower 首先需要通过变成 Candidate 进行收集投票才能进一步变为 Leader,因此我们需要考虑什么情况下变成 Candidate。

在这里插入图片描述

  因此我们需要修改主函数,让 Raft 节点在创立后循环判断能否成为 Candidate;如果不能,是否收到心跳消息,这部分的伪代码如下:

// 循环的标签
Circle:
	for {
  if rf 能否变为 Candidate {
			// 成为候选人节点后,向其他节点请求选票进行选举
			rf 开始进行选举

			rf.mu.Lock()
			if rf 选举成功 {
				fmt.Printf("Node %s has become the leader\n", rf.id)
				rf.mu.Unlock()
				break
			} else {
				// 选举过程中失败,此时可能是已有更新的 Term 或是选举超时
	            // 如果出现更新的 Term,则下一次判断能否成为 Candidate 时退出循环
          // 如果选举超时或群组只有当前节点一个节点,则重复进行选举
				rf.mu.Unlock()
			}
		} else {
			// 变为 Candidate 失败
			break
		}
	}

	// 进行心跳检测
	for {
		// 5秒检测一次
		time.Sleep(5 second)
		rf.mu.Lock()

		if rf 不是领导者 && rf 收到过心跳消息 && 上一次心跳消息距离现在已超时 {
			fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)
			fmt.Println("即将重新开启选举")
			// 重新初始化 Raft 节点信息,准备变为 Candidate
   	goto Circle
		} else {
			rf.mu.Unlock()
		}
	}

3.2 判断能否成为 Candidate

  在上一小节中我们提到如何判断能否成为 Candidate,为此我们需要一个专门的函数进行处理。在这一步,我们需要使用随机退避原则处理一种会发生活锁现象的情况

  假设现有两个节点A、B,A 启动后变为 Candidate,随后 B 启动后变为 Candidate。由于开始收集投票时,节点首先会为自己投票,因此 A.VoteFor = A,B.VoteFor = B。对于A 发起的选举,B 已给自己投过票,因此拒绝投票,同理 A 也因此拒绝给 B 投票。于是 A、B 在成为 Candidate 的过程中都失败了,相同时间的心跳检测后,上述过程循环进行,也就产生了活锁。

  为解决矛盾,应该设置选举前,随机休眠一段时间然后重新开始选举,避免term号不停更新。此处需要注意的是休眠随机时间的位置,不可放在上锁之后。原因在于之后收集投票,为了保证安全也是要对数据上锁保护的,如果此时在上锁后进行休眠,则之后收集投票会被阻塞,导致收集投票时,节点已经成为 Candidate 了,休眠因此是无意义的。

// 修改节点为候选人状态,返回值不是 error,说明函数可以类比为 Raft 类的成员函数
// 可以使用rf.becomCandidate() 进行调用,可以访问 rf 结构体的所有成员
func (rf *Raft) becomeCandidate() bool {
	//休眠随机时间后,再开始成为候选人
	r := rand.Int63n(3000) + 1000
	time.Sleep(time.Duration(r) * time.Millisecond)

	rf.mu.Lock()
	defer rf.mu.Unlock()

// 只能从 Follwer 变为 Candidate,并且当前 Term 中没有 Leader 且节点未投过票
	if rf.currentRole == Follower && rf.currentLeader == "null" && rf.votedFor == "null" {
		rf.currentRole = Candidate
		rf.votedFor = rf.id
		rf.currentTerm += 1
		rf.voteReceived = append(rf.voteReceived, "localhost:"+rf.id)
		fmt.Println("本节点已变更为候选人状态")
		return true
	} else {
		return false
	}
}

3.3 Candidate 进行选举

  当节点从 Follower 变为 Candidate 后,即将开始进行选举。这个过程要对当前 Peers 中的所有节点收集投票,每收集一个节点的结果,就判断是否需要进行退出选举过程。可以看到

在这里我们需要使用一个锁的组进行控制,确保先完成选举投票过程,再进行下一步的过程。

// 收集投票信息结构体
type RequestVoteArgs struct {
	NodeId       string // 候选人的ID
	Term         int    // 发起请求节点的 Term
	LastLogIndex int    // 最新的日志索引
	LastLogTerm  int    // 最新日志的 Term
}

type RequestVoteReply struct {
	NodeId      string // Follower 的 ID
	Term        int    // 响应节点的 Term
	VoteGranted bool   // 是否同意投票
}

func (rf *Raft) startElection(args *RequestVoteArgs) {
	// 开始选举时,同时开始选举定时器
	rf.resetElectionTimer()
	fmt.Print("开始选举Leader\n")

	var wg sync.WaitGroup
	var mu sync.Mutex

	// 决定选举是否中断
	cancelled := false

	for i := 0; i < len(rf.peers); i++ {
		peer := rf.peers[i]
		if peer == "localhost:"+rf.id {
			continue
		}
  // 添加一把锁
		wg.Add(1)
		go func(peer string, index int) {
			// 退出时解开一把锁
      defer wg.Done()
			
      // 遇到更新的 Term 时,节点会从 Candidate 退出,此时结束选举
			if rf.currentRole != Candidate {
				return
			}
      // 调试信息打印
			fmt.Printf("节点 %s 向 %s 发起投票请求\n", rf.id, peer)

      // 连接到远程 Raft 节点
			client, err := rpc.DialHTTP("tcp", peer)
			if err != nil {
				log.Println("Dialing error: ", err)
				rf.mu.Lock()
          // 如果连接失败则从 Peers 中删除对应 peer
				rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
				rf.mu.Unlock()
				return
			}
			var reply RequestVoteReply
      // 远程调用节点回应是否投票
			err = client.Call("Raft.ReplyVote", args, &reply)
			if err != nil {
				log.Println("RPC error: ", err)
				return
			}

      // 收到回应后进行收集投票的判断
			if rf.CollectVotes(&reply) {
				mu.Lock()
          // 当收集到过半票数时返回为 true,此时修改 cancelled 为 true 进行退出
				cancelled = true
				mu.Unlock()
				return
			} else {
				rf.mu.Lock()
          // 如果节点变为 Follower 说明已有更新的 Term,此时退出选举
				if rf.currentRole == Follower {
					fmt.Println("结束收集投票")
					rf.mu.Unlock()
					return
				} else {
              // 继续收集投票,此时不一定收集完了投票,所以不需要退出
					rf.mu.Unlock()
				}
			}
		}(peer, i)
	}

// 等待所有的锁都解开,即收集了所有的投票
// 收集所有投票也是为了避免收集了部分投票,但是余下的节点中存在更新的 Term
	wg.Wait()

	rf.mu.Lock()
	defer rf.mu.Unlock()
	// 无论是否选举成功都结束选举定时器
	rf.electionTimer.Stop()

	if cancelled {
		fmt.Println("选举成功")
		for _, follower := range rf.peers {
			if follower == "localhost:"+rf.id {
				continue
			}
			rf.sentLength[follower] = len(rf.Logs)
			rf.ackedLength[follower] = 0
			fmt.Println("replicatedLogs", follower)
		}
	} else {
		fmt.Println("选举失败")
		
  rf.currentRole = Follower
  // 如果选举中发现更新的 Term,则会立刻修改 currentLeader 和 votedFor
  // 这种情况下是不需要重新初始化当前节点的,否则会有更新的 Term,但是当前节点拒绝投票
  // 而后当前节点在下一轮又重新进入选举,重复上述过程,导致始终无法出现 Leader,出现活锁
		if rf.currentLeader == rf.id {
			rf.currentLeader = "null"
		}
		if rf.votedFor == rf.id {
			rf.votedFor = "null"
		}
		rf.voteReceived = []string{}
	}
}

3.4 节点回应投票

  在上一小节我们远程调用了 Peers 中节点的 ReplyVote 函数,相当于 Raft 算法中的 Raft-2,从这里我们开始将用到 log 结构体。

// log 结构体
type LogEntry struct {
	Term    int // 日志生成时的 Term号
	Index   int // 日志的序号
	Command string // 日志的内容
}

// Raf-2
func (rf *Raft) ReplyVote(args *RequestVoteArgs, reply *RequestVoteReply) error {
	rf.mu.Lock()  
	defer rf.mu.Unlock() 

	// 打印消息,表示收到来自其他节点的投票请求。
	fmt.Printf("节点 %s 收到来自 %s 的投票请求\n", rf.id, args.NodeId)

	// 确定当前节点最后一条日志的任期号。
	myLogTerm := 0
	if len(rf.Logs) > 0 {
		myLogTerm = rf.Logs[len(rf.Logs)-1].Term
	}

	// 检查候选节点的日志是否比当前节点的日志更新。
	// 这是为了确保候选节点的日志至少与当前节点的日志一样完整。
	logOK := args.LastLogTerm > myLogTerm || (args.LastLogTerm == myLogTerm && args.LastLogIndex >= len(rf.Logs))

	// 检查候选节点的任期是否大于当前节点的任期
// 或者如果它们相等,但当前节点没有投票给其他候选人或已经投票给该候选节点。
	termOK := args.Term > rf.currentTerm || (args.Term == rf.currentTerm && (rf.votedFor == "null" || rf.votedFor == args.NodeId))

	// 如果日志和任期条件都满足,则授予投票。
	if termOK && logOK {
		rf.currentTerm = args.Term  // 将当前节点的任期更新为候选节点的任期。
		rf.currentRole = Follower  // 将当前节点的角色更改为跟随者。
		rf.votedFor = args.NodeId  // 记录候选节点的 ID 作为当前节点投票的对象。
		fmt.Print("接受投票\n")
		reply.VoteGranted = true  // 在回复中设置投票授予标志为 true。
	} else {
		fmt.Print("拒绝投票\n")
		reply.VoteGranted = false  // 在回复中设置投票授予标志为 false。
	}

	// 将回复的 NodeId 和 Term 设置为当前节点的 ID 和任期。
	reply.NodeId = rf.id
	reply.Term = rf.currentTerm

	return nil
}

3.5 Candidate 收集投票

  这部分和 3.3 节不同,3.3 节管理的是选举的过程,本小节则关注于收到 3.4 节的回应后,Candidate 进行统计处理的过程。在 Raft 关键算法中则对应于 Raft-3,需要注意的是在下列函数中,Candidate 有可能会变为 Follower,并修改了 votedFor。这也就是 3.3 节中的 startElection() 函数中,为什么要在选举失败后添加两个 if 语句的原因。

// Raft-3
func (rf *Raft) CollectVotes(reply *RequestVoteReply) bool {
	rf.mu.Lock()  
	defer rf.mu.Unlock() 

	// 检查当前节点是否仍是候选者,并且收到的投票回复的任期与当前节点的任期一致且授予了投票。
	if rf.currentRole == Candidate && reply.Term == rf.currentTerm && reply.VoteGranted {
		// 将投票给当前节点的节点ID添加到已收到的投票列表中。
		rf.voteReceived = append(rf.voteReceived, reply.NodeId)
		
		// 如果收到的投票数量超过总节点数的一半,当前节点将成为领导者。
		if len(rf.voteReceived) > len(rf.peers)/2 {
			rf.currentRole = Leader  // 将当前节点的角色设置为领导者。
			rf.currentLeader = rf.id  // 设置当前节点为领导者。
			fmt.Println("已获得超过二分之一票数")
			return true  // 返回 true 表示当前节点已成为领导者。
		}
	} else if reply.Term > rf.currentTerm {
		// 如果收到的投票回复包含比当前节点更高的任期,则当前节点放弃竞选,变为跟随者。
		fmt.Println("存在更新的 Term")
		rf.currentTerm = reply.Term  // 更新当前节点的任期。
		rf.currentRole = Follower  // 将当前节点的角色更改为跟随者。
		rf.votedFor = "null"  // 清除当前节点的投票记录。

		return false // 这里的返回是因为选举失败导致
	}
// 这里的返回是因为选举的票数未达到
// 或是在另一个 goroutine 中变为了 Follower
	return false 
}

3.6 选举定时器

  首先说明,在本项目中,这部分难以验证,原因在于无法模拟网络拥塞等情况,即连接上一个 Raft 节点后,选举时间非常快,也无法阻塞一个节点,如果取消一个节点的运行,那么 Candidate 在无法连接上远程节点后,就会从本地 Peers 中删掉对应的 peer。因此本小节仅供参考 Raft-1 算法。

// Raft-1
func (rf *Raft) initRaft() {
	rf.mu.Lock() 
	defer rf.mu.Unlock()

	// 增加当前的任期号,表示进入新的选举周期。
	rf.currentTerm += 1
	// 将当前节点的角色设置为候选者,以便发起选举。
	rf.currentRole = Candidate
	// 将当前节点的 ID 设置为已投票的对象,表示为自己投票。
	rf.votedFor = rf.id
	// 重置收到的选票列表,仅包含当前节点自己的一票。
	rf.voteReceived = []string{rf.id}
}

// 启动选举超时计时器,并在选举超时的情况下重新初始化 Raft 状态。
func (rf *Raft) electionTimerStart() {
	// 监听选举计时器的触发信号,每当计时器触发时进入循环。
	for range rf.electionTimer.C {
		rf.mu.Lock()
		// 检查当前节点的角色是否不是候选者,如果不是,解锁互斥锁并跳过此次循环。
		if rf.currentRole != Candidate {
			rf.mu.Unlock()  
			continue  // 跳过此次循环,等待下一次计时器触发。
		}

		// 如果当前节点是候选者且计时器触发,表示选举超时。
		fmt.Println("选举超时")
		rf.mu.Unlock() 
		// 重新初始化 Raft 状态,以准备再次发起选举。
		rf.initRaft()
	}
}

// 重置选举超时计时器,使其在随机时间后再次触发。
func (rf *Raft) resetElectionTimer() {
	rf.mu.Lock()  // 锁定互斥锁,以确保对共享状态的线程安全访问。
	defer rf.mu.Unlock()  // 在函数结束时自动解锁互斥锁。
	// 生成一个随机的超时时间,范围在 5 到 10 秒之间。
	timeout := time.Duration(rand.Intn(5000)+5000) * time.Millisecond
	// 停止当前的选举计时器。
	rf.electionTimer.Stop()
	// 使用新的超时时间重置选举计时器。
	rf.electionTimer.Reset(timeout)
}

3.7 测试主函数

  完成上面的内容后,我们将 3.1 的伪代码进行填充,得到如下主函数,此时启动程序,我们可以基本实现投票选举的过程,对于心跳检测我们将在下一节中进行实现。

func main() {
	portPtr := flag.String("port", "0", "端口号")
	centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")
	flag.Parse()

	if *portPtr == "0" {
		fmt.Println("请确认端口号")
		os.Exit(1)
	}

	rf := NewRaft(*portPtr)
	go startRaftServer(rf, *portPtr)

	client, err := rpc.DialHTTP("tcp", *centralNodePtr)
	if err != nil {
		log.Fatalf("Error connecting to central node: %s", err)
	}
	args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}
	var reply AddPeerReply
	err = client.Call("Raft.RegisterNode", args, &reply)
	if err != nil {
		log.Fatalf("RPC error: %s", err)
	}

	rf.mu.Lock()
	rf.peers = reply.Peers
	fmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)
	rf.mu.Unlock()

	go rf.electionTimerStart()
	rf.electionTimer.Stop()

Circle:
	for {
		if rf.becomeCandidate() {
			// 成为候选人节点后,向其他节点请求选票进行选举
			rf.mu.Lock()
			args := &RequestVoteArgs{
				Term:         rf.currentTerm,
				NodeId:       rf.id,
				LastLogIndex: len(rf.Logs),
				LastLogTerm:  0,
			}
			if len(rf.Logs) > 0 {
				args.LastLogTerm = rf.Logs[len(rf.Logs)-1].Term
			}
			rf.mu.Unlock()

			rf.startElection(args)

			rf.mu.Lock()
			if rf.currentRole == Leader {
				fmt.Printf("Node %s has become the leader\n", rf.id)
				rf.mu.Unlock()
				break
			} else {
				rf.mu.Unlock()
			}
		} else {
			break
		}
	}

	// 进行心跳检测
	for {
		// 5秒检测一次
		time.Sleep(time.Millisecond * 5000)
		rf.mu.Lock()

		if rf.currentRole != Leader && rf.lastHeartBeatTime != 0 && (time.Now().UnixMilli()-rf.lastHeartBeatTime) > int64(rf.timeout*1000) {
			fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)
			fmt.Println("即将重新开启选举")
			rf.currentRole = Follower
			rf.currentLeader = "null"
			rf.votedFor = "null"
			rf.voteReceived = []string{}
			rf.lastHeartBeatTime = 0
			rf.mu.Unlock()
			goto Circle
		} else {
			rf.mu.Unlock()
		}
	}
}

在这里插入图片描述

四、心跳检测

  在第三节最后的测试中我们很明显发现,即时一个节点选举成功了,其他节点由于接收不到心跳消息,所以过一段时间后也会开始重新进行选举。在本节,我们将晚上心跳信息的发送和检测,实现完整的选举成功过程。

4.1 Leader 发送心跳消息

  心跳消息的目的在于 Leader 告知 Follower 自身的存在,并更新 Term,同时监听是否出现了更新 Term 的 Leader,保证系统中只存在一个 Leader。显然,告知其他节点并收到回应是需要 Rpc 调用的,故设计函数如下。

// Leader 发送的心跳消息结构体
type HeartBeatArgs struct {
	Term     int    // 领导者的任期
	LeaderId string // 领导者的ID
}

// Follower 回应心跳消息的结构体
type HeartBeatReply struct {
	Term    int  // 跟随者的当前任期
	Success bool // 心跳回应
}

func (rf *Raft) SendHeartBeat() {
 rf.mu.Lock()

 fmt.Println(rf.id + " start heartBeat")
 // 如果当前节点不是领导者,则退出心跳发送
 // 例如收到了 A1 节点的消息,发现有更新的 Term,于是在给 A2 节点发送消息前变为了 Follower
 if rf.currentRole != Leader {
     // 设置心跳时间标志不为0,如果节点经历过主函数中的心跳监听超时,则 lastHeartBeatTime = 0
     // 而当前 Leader 变为 Follower 之后,可能在未收到心跳消息前,其他的 Leader 就挂掉了
     // 主函数中限制了 lastHeartBeatTime != 0 才能重新选举,导致本节点始终处于监听心跳状态
     rf.lastHeartBeatTime = 1 
     rf.mu.Unlock()            // 解锁
     return
 }

 // 由于 Leader 在无法连接到指定 peer 时,会将其从本地 Peers 删除
 // 因此如果当前节点是集群中唯一的节点,降级为跟随者
 if len(rf.peers) == 1 {
     rf.lastHeartBeatTime = 1 // 更新心跳时间标志
     rf.currentLeader = "null" // 清空当前领导者
     rf.votedFor = "null"      // 清空投票记录
     rf.currentRole = Follower // 降级为跟随者角色
     rf.mu.Unlock()            // 解锁
     return
 }

 rf.mu.Unlock() // 解锁,准备并行处理心跳发送

 // 构造心跳请求参数
 args := &HeartBeatArgs{
     Term:     rf.currentTerm, // 当前任期
     LeaderId: rf.id,          // 领导者ID
 }

 // 遍历所有的节点,向它们发送心跳
 for i := 0; i < len(rf.peers); i++ {
     peer := rf.peers[i]
     if peer == "localhost:"+rf.id {
         // 跳过当前节点本身
         continue
     }

     // 验证 Leader 给谁发送了心跳消息
     fmt.Printf("%s send heartbeat to %s\n", rf.id, peer)

     // 使用并行的goroutine来发送心跳,以提高效率
     go func(peer string, index int) {
         client, err := rpc.DialHTTP("tcp", peer) // 建立与目标节点的RPC连接
         if err != nil {
             log.Println("Dialing error:", err) // 连接错误处理
             rf.mu.Lock()
             // 如果连接失败,从节点列表中移除该节点
             rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)
             rf.mu.Unlock()
             return
         }

         var reply HeartBeatReply
         // 发送心跳消息,并等待回复
         err = client.Call("Raft.ReceiveHeartBeat", args, &reply)
         if err != nil {
             log.Println("RPC error:", err) 
             return
         }

         rf.mu.Lock()
         defer rf.mu.Unlock()

         // 如果收到的回复任期大于当前节点的任期,降级为跟随者
         if reply.Term > rf.currentTerm {
             rf.currentTerm = reply.Term   // 更新当前任期
             rf.currentRole = Follower     // 降级为跟随者角色
             rf.votedFor = "null"          // 清空投票记录
             rf.currentLeader = "null"     // 清空当前领导者
         }
     }(peer, i)
 }
}

4.2 Follower 回应心跳消息

Follower 的回应则相对简单,只需要更新相应信息并把相关信息回应即可。

func (rf *Raft) ReceiveHeartBeat(args *HeartBeatArgs, reply *HeartBeatReply) error {
 rf.mu.Lock()
 defer rf.mu.Unlock()

 // 如果接收到的心跳消息中的任期小于当前节点的任期,拒绝心跳
 if args.Term < rf.currentTerm {
     reply.Term = rf.currentTerm // 返回当前节点的任期
     reply.Success = false       // 标记心跳处理失败
     return nil                  // 直接返回,不进行进一步处理
 }

 // 打印接收到心跳消息的日志信息
 fmt.Printf("节点 %s 收到来自 %s 的心跳消息\n", rf.id, args.LeaderId)

 // 更新节点的状态为跟随者,并同步心跳消息中的任期和领导者信息
 rf.currentTerm = args.Term          // 更新当前节点的任期为心跳消息中的任期
 rf.currentLeader = args.LeaderId    // 更新当前领导者为心跳消息中的领导者ID
 rf.currentRole = Follower           // 将当前节点角色设置为跟随者
 rf.lastHeartBeatTime = time.Now().UnixMilli() // 记录接收到心跳的时间戳

 // 设置回复消息的内容,表示心跳消息处理成功
 reply.Term = rf.currentTerm // 返回当前节点的任期
 reply.Success = true        // 标记心跳处理成功

 return nil
}

4.3 心跳定时器

  对于 Leader 而言,需要定时发送心跳消息,为此设置一个定时器,每间隔一段时间,检测该节点是否为 Leader,如果是则开始发送心跳消息,需要注意的是此处的判断间隔需要小于之前在主函数中设置的时间,完成函数后,在主函数创建节点后,将心跳定时器启动为 goroutine 即可。

// 心跳定时器
func (rf *Raft) heartBeatTimerStart() {
 // 通过循环不断地监听心跳定时器
 for range rf.heartBeatTimer.C {
     rf.mu.Lock() 

     // 如果当前节点的角色不是领导者,则跳过本次心跳发送
     if rf.currentRole != Leader {
         rf.mu.Unlock() // 解锁
         continue        // 跳过当前循环,继续监听下一次定时器触发
     }

     rf.mu.Unlock() // 如果是领导者,解锁并继续进行心跳发送
     rf.SendHeartBeat() // 调用发送心跳消息的函数
 }
}

4.4 测试结果

在主函数中 go rf.heartBeatTimerStart() 放在 go rf.electionTimerStart() 之上即可,此外主函数无其他改动,在此给出测试结果图。

在这里插入图片描述

五、创建日志

5.1 监听 Http

  在 2.2 节中,我们提到在注册服务时,我们还注册了 Http 的监听。因此我们在 Raft 结构体中再添加一个 http.serve 元素,于是对于日志消息,我们可以通过 Http 发送一个字符串模拟日志的命令。同样需要注意的是,Http 的监听只能注册一次,因此设置一个标记符号用于标记,避免重复注册

// getRequest 处理 HTTP 请求的回调函数
func (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {
 // 解析请求的表单数据
 request.ParseForm()

 // 检查请求的 URL 是否包含 "message" 参数,并且当前有领导者节点
 // 例如,http://localhost:8080/req?message=ohmygod
 if len(request.Form["message"]) > 0 && rf.currentLeader != "null" {
     message := request.Form["message"][0] // 提取消息内容
     m := LogEntry{
         Command: message, // 创建一个日志条目
     }
     // 添加到本地日志
		rf.Logs = append(rf.Logs, m)

     fmt.Println("http监听到了消息")
     writer.Write([]byte("ok!!!")) // 向客户端返回确认消息
 }
}

// HandleFuncFlag 确保 http.HandleFunc 只被调用一次的标志变量
var HandleFuncFlag = true

// httpListen 启动 HTTP 服务器监听指定端口
func (rf *Raft) httpListen() {
 // 创建一个 http.Server 实例,监听端口 8080
 rf.server = &http.Server{
     Addr:    ":8080", // 服务器监听的地址和端口
     Handler: nil,     // 使用默认的 http.DefaultServeMux 处理请求
 }

 // 如果 HandleFuncFlag 为 true,设置处理函数
 if HandleFuncFlag {
     HandleFuncFlag = false // 设置标志,确保此代码块只执行一次
     http.HandleFunc("/req", rf.getRequest) // 注册路径 "/req" 的处理函数
 }

 // 启动 HTTP 服务器,使用 goroutine 以非阻塞方式运行
 go func() {
     fmt.Println("监听8080")
     if err := rf.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
         fmt.Println("Server error:", err) // 如果服务器出错且不是正常关闭,记录错误
     }
 }()
}

5.2 取消监听

  我们知道日志是由 Leader 进行广播复制的,因此日志只能由 Leader 产生,所以我们针对上面的 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。需要注意的是,取消监听后服务依旧存在,因此上一小节中需要设置一个标志符号避免重复注册服务

// stopListening 停止 HTTP 服务器的监听
func (rf *Raft) stopListening() {
 // 创建一个带有 1 秒超时的上下文,用于控制服务器的关闭操作,避免被阻塞
 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
 defer cancel() // 在函数退出时自动取消上下文

 // 尝试关闭服务器,传入上下文控制关闭的超时时间
 if err := rf.server.Shutdown(ctx); err != nil {
     // 如果关闭过程中发生错误,输出错误信息
     fmt.Println("Shutdown error:", err)
 } else {
     // 如果关闭成功,输出提示信息
     fmt.Println("监听已停止")
 }
}

在这里插入图片描述

5.3 测试结果

  我们提到 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。所以我们启动监听应该设置在 starElection() 函数中选举成功后。而一个节点如果要从 Leader 状态退出,那么只能在收到心跳消息的回应时发现存在更新的 Term 才会退出,所以我们要在此设置取消监听。

  完成上述内容后,启动程序,当我们在浏览器输入“http://localhost:8080/req?message=第一条日志”时,可以看到当前 Leader 的日志多出一条 “第一条日志”,但是 Follower 并未进行监听。结束进程后,新的 Leader 将重新开始监听,结果如下。

图6

六、日志复制

  这部分内容,我们将实现完整的日志复制,包括 Leader 收到新消息后进行广播,新节点加入后如何复制日志等,这部分的难点在于 Raft-5、Raft-6、Raft-8 之间的循环调用

6.1 Leader 广播新日志

  在上一大节中,我们已经完成了 Leader 监听 http 获取新日志,但是只保存到了本地,因此我们需要使用 Rpc 调用其他 Follower 也能及时更新本地日志,这里需要将原本 getRequest() 函数中本地日志添加的代码换成以 goroutine 形式调用下列函数。

// Raft-4
func (rf *Raft) Boradcast(newLog LogEntry) {
	rf.mu.Lock()

	// 如果当前节点是领导者
	if rf.currentRole == Leader {
		// 设置日志条目的任期和索引,并将其添加到日志中
		newLog.Term = rf.currentTerm
		newLog.Index = len(rf.Logs) + 1
		rf.Logs = append(rf.Logs, newLog)
		rf.mu.Unlock()

		// 异步地将日志复制给其他所有节点
		for i := 0; i < len(rf.peers); i++ {
			peer := rf.peers[i]
			// 跳过自己节点的复制
			if peer == "localhost:"+rf.id {
				continue
			}
			// 启动协程复制日志到其他节点
			go rf.Replicating(peer)
		}
	} else {
		rf.mu.Unlock()
	}
}

6.2 Leader 广播日志复制命令

  这一小节为 Raft-5,是由 Leader 广播日志日志的命令,通过询问 Follower,逐步确定每个 Follower 需要复制的日志,这部分内容关键点在于修改 sentLength 来确定日志复制的节点。同时为了后续日志复制完成后的提交,在 Raft 结构体中再添加一个 CommitLength 元素。

// Leader 发起日志复制命令
type LogRequestArgs struct {
	LeaderId     string     // Leader 的 Id
	CommitLength int        // Leader 已经提交的日志
	Term         int        // Leader 当前 Term 号
	LogLength    int        // 日志长度
	LogTerm      int        // 日志复制点的 Term
	Entries      []LogEntry // 日志列表
}

// Follower 回应命令复制
type LogReplyArgs struct {
	NodeId      string // Follower 的 Id
	CureentTerm int    // Foller 当前的 Term
	Ack         int    // 接收复制后的日志长度
	Flag        bool   // 是否接收复制
}

// Raft-5
func (rf *Raft) Replicating(peer string) error {
	// 获取已发送日志的索引
	i := rf.sentLength[peer]
	// 获取当前日志的末尾索引
	ei := len(rf.Logs) - 1
	prevLogTerm := 0

	// 选择需要复制的日志条目
	var entries []LogEntry
	if ei >= i {
		entries = rf.Logs[i : ei+1]
	} else {
		entries = []LogEntry{}
	}

	// 如果不是第一条日志,则获取前一条日志的任期
	if i > 0 {
		prevLogTerm = rf.Logs[i-1].Term
	}

	// 与指定节点建立 RPC 连接
	client, err := rpc.DialHTTP("tcp", peer)
	if err != nil {
		log.Println("Dialing error: ", err)
		return nil
	}

	// 准备 RPC 请求的参数
	var reply LogReplyArgs
	args := &LogRequestArgs{
		LeaderId:     "localhost:" + rf.id,
		CommitLength: rf.CommitLength,
		Term:         rf.currentTerm,
		LogLength:    i,
		LogTerm:      prevLogTerm,
		Entries:      entries,
	}

	// 通过 RPC 发送日志条目给指定节点
	err = client.Call("Raft.Replying", args, &reply)
	if err != nil {
		log.Println("RPC error: ", err)
		return nil
	}
	return nil
}

6.3 Follower 回应日志复制

  在日志复制中,关键点在于如何找到日志复制的切入点,通过在 Follower 从后向前逐个检测中,找到最合适的日志切入点开始复制。如果找到了切入点,则同时开启 Follower 的日志复制,即 AppendEntries() 函数。

// Raft-6
func (rf *Raft) Replying(args *LogRequestArgs, reply *LogReplyArgs) error {
	// 锁定 Raft 实例以防止并发修改
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// 如果请求中的任期比当前节点的任期更高,更新任期并将当前角色设为 Follower
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = "null"
		rf.currentRole = Follower
		rf.currentLeader = args.LeaderId
	}

	// 如果任期相同且当前节点角色为 Candidate,更新角色为 Follower
	if args.Term == rf.currentTerm && rf.currentRole == Candidate {
		rf.currentRole = Follower
		rf.currentLeader = args.LeaderId
	}

	// 验证日志条目
	logOk := (len(rf.Logs) >= args.LogLength) && (args.LogLength == 0 || args.LogTerm == rf.Logs[args.LogLength-1].Term)

	// 准备响应
	reply.NodeId = "localhost:" + rf.id
	reply.CureentTerm = rf.currentTerm // 注意:此处 `CureentTerm` 可能有拼写错误,建议修正为 `CurrentTerm`
	reply.Ack = 0
	reply.Flag = false

	// 如果任期相同且日志条目有效,追加日志并设置确认号和标志
	if args.Term == rf.currentTerm && logOk {
		rf.AppendEntries(args.LogLength, args.CommitLength, &args.Entries)
		ack := args.LogLength + len(args.Entries)
		reply.Ack = ack
		reply.Flag = true
	}

// 远程调用 Leader 节点的 Raft-8 对回应进行处理
	client, err := rpc.DialHTTP("tcp", args.LeaderId)
	if err != nil {
		log.Println("Dialing error: ", err)
		return nil
	}
	flag := false // 并无实际作用,仅作为参数占位
	err = client.Call("Raft.ReceivingAck", &reply, &flag)
	if err != nil {
		log.Println("RPC error: ", err)
		return nil
	}
	return nil
}

6.4 Leader 更改日志复制命令

  在 Leader 收到 Follower 的回应后,要根据回应对日志复制的命令做出一定修改,逐步找到每个 Follower 日志复制的切入点。在这一步中,如果 Follower 没有开始日志复制,Leader 则将日志复制的切入点提前一位,然后再次调用 Replicating 尝试进行复制;反之,则将 Follower 节点的复制切入点记录,并开始提交日志进程。

// Raft 8
func (rf *Raft) ReceivingAck(reply *LogReplyArgs, flag *bool) error {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// 检查响应的任期是否与当前节点的任期匹配,并且节点角色是 Leader
	if reply.CureentTerm == rf.currentTerm && rf.currentRole == Leader {
		// 如果响应有效且确认号大于或等于当前确认长度,则更新发送和确认长度
		if reply.Flag && reply.Ack >= rf.ackedLength[reply.NodeId] {
			rf.sentLength[reply.NodeId] = reply.Ack
			rf.ackedLength[reply.NodeId] = reply.Ack
			fmt.Println("开始提交日志")
         // go rf.CommitEntries() // 提交日志的操作可以在这里被调用
		} else if rf.sentLength[reply.NodeId] > 0 {
			// 如果响应无效或确认号不足,减少发送长度并重新尝试复制日志
			rf.sentLength[reply.NodeId] = rf.sentLength[reply.NodeId] - 1
			go rf.Replicating(reply.NodeId)
		}
	} else if reply.CureentTerm > rf.currentTerm {
		// 如果响应中的任期大于当前任期,更新节点状态为 Follower
		rf.currentTerm = reply.CureentTerm
		rf.currentRole = Follower
		rf.votedFor = "null"
	}
	return nil
}

6.5 Follower 复制日志

  在 6.3 节中,如果节点接受了日志复制命令,则将会从切入点后,丢弃之前的无效日志,并复制当前 Leader 的日志条目,这一步也对应于 Raft-7 算法。

// Raft-7
func (rf *Raft) AppendEntries(logLength int, leaderCommit int, entries *[]LogEntry) {
	// 检查是否需要截断现有日志条目
	if len(*entries) > 0 && len(rf.Logs) > logLength {
		// 如果现有日志的条目与新条目的条目不匹配,则截断现有日志
		if rf.Logs[logLength].Term != (*entries)[0].Term {
			rf.Logs = rf.Logs[:logLength]
		}
	}

	// 将新的日志条目追加到现有日志中
	if logLength+len(*entries) > len(rf.Logs) {
		startIndex := logLength
		rf.Logs = append(rf.Logs, (*entries)[startIndex:]...)
	}

	// 更新提交日志的长度
	if leaderCommit > rf.CommitLength {
		// 打印正在提交的日志条目索引
		for i := rf.CommitLength; i < leaderCommit; i++ {
			fmt.Println("Follower Commit Log ", i)
		}
		// 更新提交长度
		rf.CommitLength = leaderCommit
	}
}

6.6 测试结果

  在这里我们已经完成了通常情况下的日志复制,但是对于新加入群组的节点我们还没有进行处理,因此新节点还无法获取到日志。为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制;反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是则对新节点进行日志复制。

  为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制。反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是,则对新节点进行日志复制。日志复制的过程则是直接调用 Replicating 函数。

  在这个过程中我们可以发现,之前 RegisterNode 中,检测到新节点存在就直接返回的操作是不可行的。举例来说,假设现在有 A、B、C 三个节点,其中 A 是 Leader,由 A 向其他两个节点发送心跳消息。此时 A 挂掉,B 和 C 监听到没有心跳消息之后开始重新选举,并且 B 成为了新的 Leader,此时 B 发现无法连接上 A 节点,于是将其从本地 peers 中删除,但是 C 节点并对 A 节点连接,因此 C 的 peers 中还保留了 A 的端口号。之后 A 节点选择 C 节点作为中心节点,C 节点发现 A 节点存在后,在 RegisterNode 函数直接返回,因此 B 节点无法得知 A 节点的信息,A 节点也无法加入群组。为了解决这个问题,在 RegisterNode 函数中,应该修改为本地 peers 中如果存在新节点的端口号,则本地不添加,但是仍然进行广播告知其他节点。

  此外,假设现有 A、B 节点正在进行选举阶段,并且在此之前两个节点中有日志消息存在。此时 C 节点尝试加入群组,并且成功了,但是由于选举过程中没有 Leader,导致 C 节点并未收到日志复制。所以为了解决此问题,需要在每个节点成功当选 Leader 之后,对 peers 中的每个节点进行日志复制,一方面可以更新日志,丢弃无效日志,另一方面可以避免选举过程中节点加入未进行日志复制的情况。

  完成上述内容后,系统已经基本完整,对于投票和日志复制功能基本完善,部分测试结果如下。

在这里插入图片描述

七、日志提交

  日志的提交,本项目通过打印参数来进行代替,仅供参考。

  在 6.4 节的 ReceivingAck 函数中,当 Leader 收到的回应是接受日志复制时,我们将开始进行日志的提交判定。当一个日志被超过半数 Follower 复制的时候,即可提交。

  值得一提的是,在我根据所学内容探究时,发现 Follower 是在 Leader 监听到新日志时,检测 Leader 的日志提交数量,但是 Leader 是在 Follower 接受日志复制后才进行提交,这意味着 Leader 的 CommitLength 会比 Follower 的大一个。但是当有新节点加入时,由于此时对新节点的日志复制不是在监听到新日志后,所以 Leader 的 CommitLength 不会发生改变,此时新节点和 Leader 的 CommitLength 就是相同的。

// Acks 计算并返回已经确认了指定日志长度 (lengths) 的节点数量。
// 如果一个节点的日志长度等于或超过指定的 lengths,或者它已经确认了指定的日志长度,则计入返回值。
func (rf *Raft) Acks(lengths int) int {
	rf.mu.Lock() // 加锁以确保并发安全
	defer rf.mu.Unlock() // 函数退出时自动解锁

	ret := 0 // 记录满足条件的节点数量
	for _, peer := range rf.peers { // 遍历所有节点
		// 如果是当前节点且日志长度大于或等于指定长度,或其他节点已经确认了该长度
		if (peer == "localhost:"+rf.id && len(rf.Logs) >= lengths) || rf.ackedLength[peer] >= lengths {
			ret += 1 // 满足条件的节点数量加1
		}
	}
	return ret // 返回满足条件的节点数量
}

// Raft-9
func (rf *Raft) CommitEntries() {
	minAcks := len(rf.peers)/2 + 1 // 多数节点数量,即需要多少节点确认日志才能提交
	ready := []int{} // 用于记录可以提交的日志索引

	// 检查从第1条到当前最后一条日志,判断哪些日志已经被大多数节点确认
	for i := 1; i <= len(rf.Logs); i++ {
		if rf.Acks(i) >= minAcks { // 如果确认的节点数量大于或等于多数节点
			ready = append(ready, i) // 将该日志索引添加到 ready 列表中
		}
	}

	// 找到最大可提交的日志索引
	maxReady := max(ready)

	// 如果有可提交的日志,并且它的索引大于当前的提交索引,并且该日志是在当前任期产生的
	if len(ready) > 0 && maxReady > rf.CommitLength && rf.Logs[maxReady-1].Term == rf.currentTerm {
		for i := rf.CommitLength; i < maxReady; i++ { // 提交所有可提交的日志项
			fmt.Println("Leader Commit Log ", i) // 打印日志提交的索引
		}
		rf.CommitLength = maxReady // 更新提交的日志索引
		fmt.Println(rf.id, rf.CommitLength) // 打印当前节点 ID 和新的提交索引
	}
}

// max 返回一个整数数组中的最大值。
// 如果数组为空,则返回 0。
func max(arr []int) int {
	if len(arr) == 0 {
		return 0 // 如果数组为空,返回 0
	}
	maxVal := arr[0] // 假定第一个元素为最大值
	for _, val := range arr { // 遍历数组
		if val > maxVal { // 如果发现更大的值
			maxVal = val // 更新最大值
		}
	}
	return maxVal // 返回数组中的最大值
}

部分测试结果如下。

在这里插入图片描述

八、小结

  ​在本次实现 Go-Raft 协议过程中,本人其实收获了很多东西。

  ​首先是不能盲目在网上找资料,资料都是参差不齐的,特别是学习到现在一些协议的实现,能真正有效借鉴的文章已经比较难找到了。个人在前期从 GitHub 上到处翻找有小半个月有余,但是最后自己从头实现起来也就一周不到的时间。个人认为,如果已经知道了想完成的代码的原理,不妨一点点尝试钻研,遇到不会的去搜索如何实现某个功能,这样反而能更快的完成项目。

  此外,对于项目的复盘和测试是非常重要的,在写这篇博客前,本人已经完成了一遍实现,当时粗略测试下来没有什么问题,但是实际写博客过程中,为了截图我重头实现了一遍,这下就发现了很多问题。特别是对于一些类似于选举期间无 Leader 的情况,或是 Leader 短时间挂掉再恢复的情况,或多或少都出现了一些 Bug,甚至有一些功能在第一、二、三、四部分都完成得不错,但是到了第五部分才忽然发现问题。但是为了避免整体逻辑的问题,并未做出修改,不过在 GitHub 和 Gitee 上对每个大节的测试代码均提交到了不同的文件夹,可以自行按照需求提取。

​   如果在拉取代码后发现了部分运行错误,也许可以尝试一下用下个部分的代码,也欢迎各位提问,如若看到必定尽快回答。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2097910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

路由器内部到底是啥结构?不懂不算网工人

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 下午好&#xff0c;我的网工朋友。 在现代互联网中&#xff0c;路由器作为连接不同网络的枢纽&#xff0c;发挥着至关重要的作用。无论是简单的家…

通用文字识别API如何通过Java进行调用?(一)

一、什么是通用文字识别&#xff1f; 通用文字识别又叫通用文字OCR识别&#xff0c;文字识别&#xff0c;文字图片识别&#xff0c;通用文字识别是一种算法识别技术&#xff0c;它能够将图像中的文字转换为可编辑的文本格式&#xff0c;可支持多种类型图片类型。 二、通用文字…

java 基于Swing的随机点名

由于教学的原因&#xff0c;编写的一个随机点名程序&#xff0c;废话不多说&#xff0c;直接上代码&#xff1a; package org.example;import java.awt.Color; import java.awt.Font; import java.awt.GridLayout; import java.awt.event.ActionEvent; import java.awt.event.…

CAN(江科大CAN学习)

1.CAN CAN简介 • CAN 总线&#xff08; Controller Area Network Bus &#xff09;控制器局域网总线 CAN总线构建的是一种局域网网络&#xff0c;每个挂载在can总线上的设备&#xff0c;都可以利用这个局域网去发送自己的信息&#xff0c;也可以接受局域网的各种消息&#x…

Unity中保存数据的方法

一、概述 Unity中可用于持久化的方式有&#xff1a; 1&#xff09;通过ScriptableObject在可编辑模式下保存数据 2&#xff09;通过excel、json等文件实现数据的可持久化 二、ScriptableObject的使用 1、使用背景 假如需要制作子弹预设体&#xff0c;每个子弹上有speed速…

windows 10安装GPU版本pytorch

一、下载Anaconda 1.由于anaconda的服务器都在国外&#xff0c;推荐大家使用镜像源进行下载&#xff0c;清华的conda镜像链接&#xff1a;​​​​​​ anaconda | 镜像站使用帮助 | 清华大学开源软件镜像站 | Tsinghua Open Source Mirrora 2.使用命令新建一个虚拟环境&#…

一个简单的 NLP 神经网络

如何搭建一个简单的 NLP 神经网络&#xff1f; 假设我们一个变量名列表&#xff0c;根据这个变量名列表&#xff0c;学习其中的特征并生成新的变量名。训练一个模型用于预测下一个字符并生成新的变量名。使用一个单层的神经网络实现&#xff0c;假设我们的变量名只能用英文字母…

Python爬虫02

xml 和html 区别 jsonpath模块 场景 多层嵌套的复杂字典直接提取数据 安装 pip install jsonpath使用 from jsonpath import jsonpathret jsonpath(dict, jaonpath语法规则字符串)语法规则 eg: lxml模块&xpath语法 谷歌浏览器 xpath helper 插件 作用对当前页面…

d3dcompiler_47.dll缺失的可能原因多种多样,那么d3dcompiler_47.dll缺失怎么修复

在数字世界的深处&#xff0c;d3dcompiler_47.dll文件扮演着至关重要的角色&#xff0c;它是Direct3D编译器的一部分&#xff0c;负责处理图形渲染和游戏运行中的关键任务。然而&#xff0c;当用户启动某个程序或游戏时&#xff0c;屏幕上突然弹出的错误提示“d3dcompiler_47.d…

DevOps学习笔记

记录以下DevOps学习笔记&#xff0c;这里是笔记的入口汇总&#xff0c;可以直观的看到所有的笔记&#xff0c;还没有入口的部分&#xff0c;在下正在努力编写中。 gitlab jenkins docker docker安装 artifactory 1.artifactory安装 2.artifactory使用 计算机网络 1.dn…

世界上最快的端口扫描器masscan,如何使用?如何进行分布式使用部署?如何集成到web系统?

世界上最快的端口扫描器masscan,如何使用?如何进行分布式使用部署?如何集成到web系统? Masscan是一个高速的端口扫描工具,其主要功能和特点包括: 高速扫描:Masscan采用异步扫描技术,能够以非常快的速度扫描大量的主机和端口。 支持大规模扫描:Masscan可以同时扫描数十…

获得SSH秘钥和SSL环境的可见性和控制权

未经管理的密钥和证书如何损害分层安全防御系统 多年来&#xff0c;由于技术的不断发展和演变&#xff0c;网络威胁和安全漏洞也在不断变化。企业大多都在精心研究各种解决方案和战略&#xff0c;以加强其安全基础设施。但是&#xff0c;安全问题是没有灵丹妙药的。 目前&…

自动化工程案例01:8工位插针装配机01

机器主要作用是对充电枪中的插头进行加工&#xff0c;主要实现对插头进行压橡胶帽和安装密封圈。主要对两种不同的工件进行装配 旋转盘工位&#xff1a; 控制旋转盘每次旋转角度是2个气动夹爪之间的角度。 1.旋转盘共有10个气动夹爪和10个安装密封圈辅助固定工位。 2.通过接…

谷器数据产品入选《沈阳市工业领域大规模设备更新供给设备清单》

近日&#xff0c;沈阳市举行了工业领域大规模设备更新产需对接暨制造业新型技术改造城市试点启动大会&#xff0c;旨在促进制造业企业扩大需求、拓展市场、抢抓订单&#xff0c;推动制造业高端化、智能化、绿色化发展。 会上&#xff0c;沈阳市工信局首次对外发布了《沈阳市工…

43款最新泛微Ecology9精品应用(一键导入,轻松上手)

泛微E9精品应用建模——高效管理从这里开始,支持二次开发 简介 在现代企业管理中&#xff0c;系统化、标准化的流程管理已成为各类企业提升竞争力的必经之路。为了帮助企业快速搭建并优化业务流程&#xff0c;我们推出了泛微E9的精品应用建模Demo。这款Demo展示了从业务需求…

甜羊浏览器:抖店多店铺管理与自动回复的最佳解决方案

随着短视频平台的蓬勃发展&#xff0c;抖音旗下的电商平台——抖店&#xff0c;已成为许多商家的重要销售渠道。然而&#xff0c;对于拥有多个抖店店铺的商家而言&#xff0c;如何高效管理这些店铺以及处理大量的客户咨询&#xff0c;成为了亟待解决的问题。此时&#xff0c;甜…

audiocraft - 免费文本转音乐、AI音乐生成、AI音乐创作工具,Facebook开源,本地一键整合包下载

AudioCraft 是一个由Facebook Research开发的PyTorch库&#xff0c;专注于深度学习在音频生成领域的研究。这个强大的工具包集成了两个最新的AI音频生成模型&#xff1a;AudioGen和MusicGen&#xff0c;能够产生高质量的声音和音乐。 今天的一键包也包含了 AudioGen 和 MusicG…

x264 编码器 AArch64汇编系列:quant 量化相关汇编函数

quant x264_quant_init函数中初始化时指向不同的具体实现: 以4x4块量化为例 c 语言版本实现 4x4 块量化:quant_4x4#define QUANT_ONE( coef, mf, f ) \

SprinBoot+Vue校园活动报名微信小程序的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue3.6 uniapp代码 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平…

c# checkbox的text文字放到右边

checkbox的text文字放到右边 实现方法如下图 特此记录 anlog 2024年9月2日