TiKV Raft 快照全流程丨TiKV 源码解读(二十二)

news2025/1/20 18:38:55

导读

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会的顶级项目。它通过 Raft 协议实现数据的高可用性和强一致性,是 TiDB 分布式数据库系统的重要组成部分。本文作为 TiKV 源码解读系列的增补,详细介绍了 TiKV 8.2.0 版本中 Raft 快照的生成、发送、接收和应用的具体实现。

在 Snapshot 的发送和接收中,我们详细介绍了 Raft 快照的发送和接收机制。本篇文章作为前文的补充,将概述 TiKV 中 Raft 快照的整体流程,并详细介绍涉及的代码路径。本文所讨论的代码基于最近的 8.2.0 版本。

图片

在 TiKV 中,数据空间被切分成各个连续的范围,称为 Region。每个 Region 由一个单独的 Raft 组管理,基于 Raft 协议保证容错性。每个 Raft 组包含多个 Peer,每个 Peer 在不同的 TiKV 节点上运行。

图片

图 1. TiKV 中的 Region 和 Raft 组

在 Raft 协议中,Leader 节点负责将最新的日志条目通过 Append RPC 发送给 Followers,以确保所有节点的数据一致性。为防止磁盘空间的无限制增长,Leader 节点会定期清理过时的本地 Raft 日志。然而,如果某个 Follower 落后太多,以至于其请求的日志条目已经被 Leader 清理,传统的 Append RPC 将无法继续同步数据。在这种情况下,Leader 节点将采取替代措施,向落后的 Follower 发送一个 Raft 快照。这个快照包含了 Region 在某一特定时间点的完整状态快照,不仅包括存储于 RocksDB 中的数据,还有 Raft 协议的状态信息,例如任期号(term)和所对应的日志索引(index)。在 TiKV 的实际应用中,快照机制通常在 Region 初始化、发生分裂或进行扩展等关键操作时被触发,以确保数据的一致性和系统的稳定性。

图片

TiKV 中的 Raft 快照过程大致分为四个阶段:

1. 生成: Raft Leader 生成一份快照,记录下 Raft 和 RocksDB 在当前时间点的状态。

**2. 发送:**Raft Leader 通过网络把快照发送给 Follower。

3. 接收: Raft Follower 接收快照并暂时存储。

4. 应用: Raft Follower 将快照应用到 Raft 状态机和 RocksDB 数据中。

以下图表更详细地描述了快照过程:

图片

图 2. TiKV 中的 Raft 快照过程

图片

本文将详细介绍 Raft 快照实现的原理和流程,这个部分将介绍核心环节的设计思路,在代码路径详解中我们会逐步介绍细节。

快照元数据和数据的分离

Raft 消息 (protobuf definition:https://github.com/pingcap/kvproto/blob/df42997c2c57536219c67253966ede4d61d25757/include/eraftpb.proto#L77) 有一个 snapshot 字段,可以存储快照数据和元数据。一个简单的做法是将快照数据嵌入到 Raft 消息中,通过 Raft Peer 之间的标准通信通道传输(如图顶部的消息队列所示)。但问题在于 Raft 快照比其它消息大得多,将快照数据放入 Raft 消息中可能会阻塞正常的 Raft 消息处理逻辑。

因此,TiKV 中的一个设计选择是,Raft 快照消息(表示为 MsgSnapshot)仅包含快照的元数据。实际的快照数据作为文件保存在磁盘上。快照文件以及快照消息通过专用的 gRPC 流连接由 Snap Worker 发送(如步骤 7 和 8 所示)。使用 gRPC 流可以将数据分成更小的块以进行高效传输。

从 ApplyFsm 调度 RegionTask::Gen

Region Worker 负责生成和应用快照数据。例如,在快照的应用过程中,PeerStorage 调度了一个 RegionTask::Apply 任务给 Region Worker(步骤 11)。但是快照的生成过程略有不同,尽管 PeerStorage 依然是整个过程的发起点,但是 RegionTask::Gen 任务是通过 ApplyFsm 来调度的(步骤 2 和 3)。为什么 PeerStorage 不直接进行 RegionTask::Gen 的调度呢?

这是为了控制快照生成的时间点,让快照尽可能地包含最新的数据。在 Raft 批处理系统中,Raft 信息是分批处理的。同一批信息中可能同时包含快照请求和新的写入请求,而我们希望快照在同批次的写入都完成之后生成。因此,Raft 批处理系统会先把同批次的所有写入任务发给 Apply 批处理系统处理,然后再分派快照任务(ApplyTask::Snapshot)。这样,因为 ApplyFsm 是对任务依次进行处理的,当它处理到快照任务的时候,同批次的写入已经完成。

图片

我们将逐步介绍不同阶段的代码路径,各步骤与上图一一对应。不过这里包含的代码片段很简化,省略了许多细节,仅用于展示大致的流程。代码基于 TiKV 8.2.0 版本。

快照生成

步骤 1: GenSnapTask

TiKV 中实现 Raft 共识协议的是 raft-rs 模块 ( https://github.com/tikv/raft-rs ),快照过程在该模块中发起。在 raft-rs 中,Leader 对每一个 Follower 维护一个 Progress 对象,其中记录了该 Follower 所需要的下一个日志索引(pr.next_idx)。Raft leader 在 maybe_send_append 中处理某个 Follower 的 Append RPC 的发送,如果它无法获取前置日志(pr.next_idx - 1)的任期(用于 Append 过程的匹配校验),则需要发送快照。此时会调用 prepare_send_snapshot 函数,触发快照过程。

raft-rs: src/raft.rs
impl RaftCore {
  fn maybe_send_append(..., pr: &mut Progress, ) {
      ...
      let term = self.raft_log.term(pr.next_idx - 1);
      match (term, ents) {
          ...
          _ => {
               // send snapshot if we failed to get term or entries.
               if !self.prepare_send_snapshot(&mut m, pr, to) { 
                   return false;
                }
          }
      }
  }
   
  fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
      let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
      // ...
   }

}

raft-rs: src/raft_log.rs
impl RaftLog {
  pub fn snapshot(&self, request_index: u64, to: u64) -> Result<Snapshot> {
      // ...
      self.store.snapshot(request_index, to)
  }

}

如上所示,快照过程经过若干调用后来到 Storage trait 的 snapshot 方法(定义:https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/storage.rs#L159)。在 TiKV 中,Storage trait 的实现是 PeerStorage,其 snapshot 实现如下:

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
  pub fn snapshot(&self, request_index: u64, to: u64) -> raft::Result<Snapshot> {
      // ...
      let task = GenSnapTask::new(...);
      let mut gen_snap_task = self.gen_snap_task.borrow_mut();
      *gen_snap_task = Some(task);
      Err(raft::Error::Store(
          raft::StorageError::SnapshotTemporarilyUnavailable,
      ))
  }
}

PeerStorage::snapshot构建了一个 GenSnapTask 并将其设置在 gen_snap_task 字段,然后它会返回一个 SnapshotTemporarilyUnavailable 错误,这个错误意味着快照正在生成过程中。在之后的过程中,snapshot() 函数会在 Raft 协议的每次心跳时被重新调用。如果快照生成未完成,它会继续返回 SnapshotTemporarilyUnavailable。当快照生成完毕后(步骤 5),snapshot() 的调用就会返回 Ok。

步骤 2: ApplyTask::Snapshot

Peer::handle_raft_ready_append 函数检查PeerStorage 的gen_snap_task 字段,并将任务发送给 ApplyFsm。如前所述,PeerFsm 会先给 ApplyFsm 发送该批次中所有的写入任务(见 handle_raft_committed_entries 函数),再发送快照任务。

components/raftstore/src/store/peer.rs
impl Peer {
  pub fn handle_raft_ready_append(...){
      // ...
      if !ready.committed_entries().is_empty() {
          self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
      }
      
      if let Some(mut gen_task) = self.mut_store().take_gen_snap_task() {
          ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
      }
  }
  
  fn handle_raft_committed_entries<T>() {
      let mut apply = Apply::new(...)
      ctx.apply_router.schedule_task(self.region_id, ApplyTask::apply(apply));
  }
}

步骤 3: RegionTask::Gen

ApplyFsm 在 ApplyFsm::handle_snapshot 函数中处理快照任务。它将快照任务转换为 RegionTask::Gen ,并发送给 Region Worker。

components/raftstore/src/store/fsm/apply.rs
impl ApplyFsm {
  fn handle_tasks() {
      loop {
          match msg {
              Msg::Snapshot(snap_task) => self.handle_snapshot(..., snap_task),
          }
      }
  }

  fn handle_snapshot(..., snap_task: GenSnapTask) {
      snap_task.generate_and_schedule_snapshot()
  }
}

impl GenSnapTask {
  pub fn generate_and_schedule_snapshot(){
      let snapshot = RegionTask::Gen {...}
      region_sched.schedule(snapshot)
  }
}

步骤 4 和 5: do_snapshot() and notify

Region Runner 定义了 Region 任务的处理逻辑。一系列函数调用到达 do_snapshot,由它完成实际的快照生成工作,包括从 RocksDB 扫描 Region 的数据并写入 SST 文件。注意,快照生成工作(ctx.handle_gen)是在一个单独的线程池中,主要的考虑是因为它耗时较长,避免阻塞其它任务。

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task<EK::Snapshot>) {
      match task {
          Task::Gen {...} => { 
              let ctx = SnapGenContext {...}
              self.pool.spawn(async move {
                  ctx.handle_gen(...)
              }
          }
      }
  }
}

impl SnapGenContext {
  fn generate_snap( ... ) { 
      let snap = box_try!(store::do_snapshot::<EK>(...));
      notifier.try_send(snap)
  }
  fn handle_gen( ... ) {
      self.generate_snap(...)
  }
}

生成完成后,notifier.try_send(snap) 将快照生成结果发送到一个通道,结果将使 PeerStorage::snapshot() 在下一次调用中返回 Ok。

快照发送

步骤 6 和 7: MsgSnapshot and send_snap()

PeerStorage::snapshot() 成功后返回一个 Snapshot 结果。不过这个快照只包含元数据,快照的数据依然以 SST 文件的形式存储在磁盘上。

src/server/raft_client.rs
impl AsyncRaftSender {
  fn fill_msg(&mut self, ctx: &Context<'_>) {
      // ...
      if msg.get_message().has_snapshot() {
          self.send_snapshot_sock(msg);
          continue;
      }
      // ...
  }
  
  fn send_snapshot_sock(&self, msg: RaftMessage) {
      if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {...}) {
          // ...
      }
  }    
}

AsyncRaftSender 拦截快照消息并将其转换为快照发送任务。任务被发送到 Snap Scheduler,由 Snap Worker 来处理。

src/server/snap.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task) {
      match task {
          Task::Send { addr, msg, cb } => {
              let send_task = send_snap(...);
              let task = async move {
                  let res = match send_task {
                      Err(e) => Err(e),
                      Ok(f) => f.await,
                  };
                  // ...
              }
              self.pool.spawn(task);
          }
      }
  }
}

pub fn send_snap(...) {
  // ... 
}

Snap Runner 中定义了 Snap Worker 在处理不同任务时的处理逻辑,对于快照发送任务(Task::Send),Snap Runner 生成一个新的异步任务来运行 send_snap 函数。send_snap 通过打开一个新的 gRPC 流连接来传输快照消息及快照数据。

快照接收

步骤 8 和 9: recv_snap() and MsgSnapshot

在接收端,TiKV 实例看到传入的 gRPC 请求,通过调度一个 recv snap 任务来将请求转发给 Snap Worker。

src/server/service/kv.rs
impl Tikv for Service {
  fn snapshot(...) {
      let task = SnapTask::Recv { stream, sink };
      if let Err(e) = self.snap_scheduler.schedule(task) {...}
  }
}

Snap Worker 在 recv_snap 函数中接收快照元数据和内容。

src/server/snap.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task) {
      match task {
          Task::Recv { ... } => {
              let task = async move {
                  let result = recv_snap(...).await;
              }
              self.pool.spawn(task);
          }
      }
  }
}

fn recv_snap(...) {
  let mut context = RecvSnapContext::new(head, &snap_mgr)?;
  while let Some(item) = stream.next().await {
      // ...
  }
  context.finish(raft_router)
}

快照接收后,context.finish(raft_router) 将快照消息发送到 Raftstore 以触发快照的应用。

快照应用

步骤 10 到 12: apply_snapshot()和 apply_snap()

快照在不同层级和不同地方被应用:

  1. 快照信息被 Raftstore 收到之后,会调用 raft_rs 的 step 函数。经过一系列的调用,到达 Raft::handle_snapshot ,在该函数中恢复 Raft 状态机的日志和配置。
raft-rs: src/raft.rs
impl Raft {
  fn handle_snapshot(&mut self, mut m: Message) {
      if self.restore(m.take_snapshot()) {...}
  }
  
  pub fn restore(&mut self, snap: Snapshot) -> bool {
      ...
      self.raft_log.restore(snap);
  }
}
  1. Raftstore 在对 ready 处理时 Peer::handle_raft_ready_append 会调用 PeerStorage::handle_raft_ready函数,该函数会调用PeerStorage::apply_snapshot 来更新 Peer 的状态。在 Snapshot 被持久化之后,PeerStorage::on_persist_snapshot 函数会被调用,它会进一步调用PeerStorage::persist_snapshot 将快照应用任务发送给 Region Worker。
components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
  pub fn apply_snapshot() {...}
  
  pub fn persist_snapshot(&mut self, res: &PersistSnapshotResult) {
      self.schedule_applying_snapshot();
  }
  
  pub fn schedule_applying_snapshot(&mut self) {
      let task = RegionTask::Apply {}
      if let Err(e) = self.region_scheduler.schedule(task) {...}
  }    
}
  1. Region Worker 对快照应用任务进行处理,经过一系列调用会来到 Runner::apply_snap,它会将 Region 中的数据更新为快照中的数据。
components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task<EK::Snapshot>) {
      match task {
          Task::Apply { .. } => {
              self.pending_applies.push_back(task);
              self.handle_pending_applies(false);    
          }
      }
  }
  
  fn handle_pending_applies(&mut self, is_timeout: bool) {
      while !self.pending_applies.is_empty() {
          if let Some(Task::Apply { region_id, .. }) = self.pending_applies.front() {
              if let Some(Task::Apply {}) = self.pending_applies.pop_front() {
                  self.handle_apply(region_id, peer_id, status);
              }
          }
      }
  }
  
  fn handle_apply(&mut self, region_id: u64, peer_id: u64, status: Arc<AtomicUsize>) {
      match self.apply_snap(region_id, peer_id, Arc::clone(&status)) {...}
  }
  
  fn apply_snap(&mut self, region_id: u64, peer_id: u64, abort: Arc<AtomicUsize>) -> Result<()> {
      // ...    
  }
}

图片

以上便是 TiKV 中与快照相关的代码路径概览。希望通过本文的介绍,能帮助读者更深入地理解 TiKV 中的 Raft 快照机制及其实现细节,从而更有效地进行源码阅读和学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1978696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024华数杯C题保姆级分析完整思路+代码+数据教学

2024华数杯C题保姆级分析完整思路代码数据教学 C题题目&#xff1a;老外游中国 接下来我们将按照题目总体分析-背景分析-各小问分析的形式来 1 总体分析&#xff1a; 题目要求本题目基于中国境内旅游景点数据&#xff0c;旨在通过数学建模解决外国游客在中国旅游时可能遇到的…

安装pytorch GPU方法

参考全网最详细的安装pytorch GPU方法&#xff0c;一次安装成功&#xff01;&#xff01;包括安装失败后的处理方法&#xff01;-CSDN博客 整体来看&#xff0c;一共下面三个安装步骤&#xff1a; 显卡驱动&#xff08;nvidia-smi&#xff09;-》显卡深度学习驱动&#xff08…

三十种未授权访问漏洞复现 合集( 四 )

未授权访问漏洞介绍 未授权访问可以理解为需要安全配置或权限认证的地址、授权页面存在缺陷&#xff0c;导致其他用户可以直接访问&#xff0c;从而引发重要权限可被操作、数据库、网站目录等敏感信息泄露。---->目录遍历 目前主要存在未授权访问漏洞的有:NFS服务&a…

日志系统——整体框架

日志等级模块&#xff1a; 该模块描述了日志消息的各种等级debug,info,warn,error,fatal,off&#xff08;off为最高等级&#xff0c;屏蔽一切日志消息&#xff09;,并提供描述日志等级的方法 日志消息模块: 该模块负责构建日志消息对象&#xff0c;此对象管理着一条日志中的各项…

【Python网络爬虫案例】python爬虫之爬取豆瓣电影信息

&#x1f517; 运行环境&#xff1a;PYTHON &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 #### 防伪水印——左手の明天 #### &#x1f497; 大家好&#x1f917;&#x1f91…

【C语言】计算四则运算,中缀表达式转换为后缀表达式

C语言编程—中缀表达式转换为后缀表达式 思路&#xff1a; 中缀转后缀保存结果栈&#xff1a;stack&#xff0c;保存数据和-*/ 操作符栈&#xff1a;op_stack&#xff0c;保存-*/() 场景一&#xff1a;遇到数据&#xff0c;直接入栈stack 场景二&#xff1a;遇到"(&qu…

海康笔试题

1. 2. 块设备&#xff1a;磁盘设备驱动、SD设备驱动 字符设备&#xff1a;终端设备驱动 网络设备&#xff1a;网络设备驱动 &#xff08;1&#xff09;linux操作系统驱动程序分为三大类&#xff1a;字符设备驱动、快设备驱动和网络设备驱动 &#xff08;2&#xff09;字符设…

2024 年华数杯全国大学生数学建模竞赛C 题 老外游中国 完整思路 源代码 模型结果(仅供学习)

最近&#xff0c;“city 不 city”这一网络流行语在外国网红的推动下备受关注。随着我国过境免签政策的落实&#xff0c;越来越多外国游客来到中国&#xff0c;通过网络平台展示他们在华旅行的见闻&#xff0c;这不仅推动了中国旅游业的发展&#xff0c;更是在国际舞台上展现了…

基于X86+FPGA助力实现电力系统的智能监测与高效管理

电力监控 信迈提供基于Intel平台、Xilinx平台、Rockchip平台、NXP平台、飞腾平台的Mini-ITX主板、Micro-ATX主板、ATX主板、嵌入式准系统/工业整机等计算机硬件。产品算力强大&#xff0c;支持高速存储&#xff0c;提供丰富串口、USB、LAN、PCIe扩展接口、显示接口等I/O接口&am…

【python】数据类型之列表类型(上)

本篇文章将讲解列表类型。 列表&#xff08;list&#xff09;&#xff0c;是一个有序且可变的容器&#xff0c;在里面可以存放多个不同类型的元素。 列表中的元素之间用逗号&#xff08;英文中的逗号&#xff09;相隔。 1、定义&#xff1a; 例如&#xff1a; user_list[]…

stl容器 vector的基本操作

目录 1.vector构造 1.1默认构造函数 1.2 fill 填充构造函数 ​编辑 1.3 范围构造函数&#xff08;Range Constructor&#xff09; 1.4拷贝构造函数 2.initializer_list初始化vector 3.迭代器 4.常用的几个成员 4.1 size()统计当前有效字符个数 4.2 capacity ve…

青甘环线游记|day(2)|西宁、青海湖

坐动车 早上7:30醒来&#xff0c;在8:00左右起床&#xff0c;下楼吃兰州拉面。面煮的很好吃&#xff0c;就是还是不是很适应。看到8元的牛肉面感觉很震惊&#xff0c;没想到是面8元&#xff0c;牛肉另加&#xff0c;10元。 坐上动车前往西宁&#xff0c;12点左右到了。虽然在…

PXE实现自动安装部署操作系统

PXE&#xff08;Preboot eXecution Environment&#xff09;是一种在计算机启动时使用网络接口从远程服务器获取操作系统安装和启动信息的技术。通过PXE&#xff0c;计算机可以从局域网中的PXE服务器上下载操作系统安装文件&#xff0c;并进行自动化的操作系统部署或故障排除。…

25考研数据结构复习·7.4B树和B+树7.5散列(Hash)表

目录 B树和B树 B树 m阶B树的核心特性 B树的插入 B树的删除 非终端结点关键字 终端结点关键字 低于下限 B树 散列&#xff08;Hash&#xff09;表 基本概念 散列函数的构造 &#x1f469;‍&#x1f4bb; 除留余数法 直接定址法 数字分析法 平方取中法 处理冲突…

动手学深度学习V2每日笔记(池化层)

本文主要参考沐神的视频教程 https://www.bilibili.com/video/BV1EV411j7nX/spm_id_from333.999.0.0&vd_sourcec7bfc6ce0ea0cbe43aa288ba2713e56d 文档教程 https://zh-v2.d2l.ai/ 本文的主要内容对沐神提供的代码中个人不太理解的内容进行笔记记录&#xff0c;内容不会特别…

Linux基础环境开发工具(二)

目录 一、前言二、make和makefile工具1.快速认识一下2.依赖关系和依赖方法3.执行原理 三、Git工具1.快速认识一下2.git的使用 四、gdb工具1.快速认识一下2、类比Windows使用 一、前言 在开发工具第一篇中我们介绍了yum&#xff0c;vim&#xff0c;gcc/g编译器这几种工具&#…

C++ 继承 派生类的拷贝构造

继承 派生类的拷贝构造构造顺序拷贝构造 引例1: 当子类,不自实现拷贝构造时,默认调用父类的拷贝构造引例2: 子类自实现拷贝构造,不做特殊处理时,只会调用父类的构造器.引例3: 显示的调用父类的拷贝构造器。案例: 内嵌函数的拷贝构造 引例1 :当内嵌子对象,子类不自实现拷贝构造时…

Netty二

Netty 问题分析 bootstrap serverBootstrap pipeline和channelPipeline EventLoopGroup和实现类NioEventLoopGroup

U2net论文复现-简单解读-以及奇奇怪怪的改进-测试roc以及pr

论文地址&#xff1a;U2net论文地址 显著性目标检测&#xff1a; Salient ObjectDetetion(SOD)显著性目标检测&#xff0c;就是要把图片中最显著的物体分割出来&#xff0c;所以是二分类任务&#xff0c;只需要背景和前景。 1、Introduce 1.1、目前存在的2个挑战&#xff1…

Day-11 员工管理案例 增删改查、配置文件

SpringBootWeb案例 前面我们已经实现了员工信息的条件分页查询以及删除操作。 关于员工管理的功能&#xff0c;还有两个需要实现&#xff1a; 新增员工修改员工 首先我们先完成"新增员工"的功能开发&#xff0c;再完成"修改员工"的功能开发。而在"新…