实际项目集成分布式一致性协议 Raft

news2025/1/24 22:51:37

实际项目集成分布式一致性协议 Raft

文章目录

  • 实际项目集成分布式一致性协议 Raft
    • 前言
    • 1.raft 是什么?
    • 2.SOFAJRaft
      • 2.1 功能特性
    • 3.Nacos 分布式一致性设计
      • 3.1 nacos 分布式协议架构设计
      • 3.1 nacos 用 jraft 做什么
      • 3.2 Distro 协议
    • 4.实际项目-Spring 工程
      • 4.1 旧版项目的集群模式
      • 4.2 为什么要升级
      • 4.3 如何集成
        • 4.3.1 主要参考
        • 4.3.2 copy 代码
        • 4.3.3 改造成 spring
          • 4.3.3.1 Spring Bean 对象
          • 4.3.3.2 properties 模式
          • 4.3.3.3 ApplicationReadyEvent
        • 4.3.4 动态感知新增节点
        • 4.3.5 leader 节点信息更新事件通知
        • 4.3.6 如何使用
        • 4.3.7 本地测试
    • 5.思考

前言

  • 只要理解了共识算法,其实集成 raft 很简单,由于共识算法优秀文章已经很多了,本章就不做过多赘述了;

  • 这里我们重点是讨论如何在我们的实际工程中集成 raft 协议,来实现分布式一致性;

  • 前面我们只会简单的介绍分布式共识算法以及学习 Nacos 如何来集成分布式算法,重点在第4小节;

  • 看完本章,大家可以根据自己项目的实际情况来决定是否需要集成 raft 协议来实现分布式一致性。

1.raft 是什么?

https://raft.github.io/

Raft is a consensus algorithm that is designed to be easy to understand. It’s equivalent to Paxos in fault-tolerance and performance. The difference is that it’s decomposed into relatively independent subproblems, and it cleanly addresses all major pieces needed for practical systems. We hope Raft will make consensus available to a wider audience, and that this wider audience will be able to develop a variety of higher quality consensus-based systems than are available today.

分布式共识算法

请添加图片描述

2.SOFAJRaft

SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。 使用 SOFAJRaft 你可以专注于自己的业务领域,由 SOFAJRaft 负责处理所有与 RAFT 相关的技术难题,并且 SOFAJRaft 非常易于使用,你可以通过几个示例在很短的时间内掌握它。

github 地址:https://github.com/sofastack/sofa-jraft

2.1 功能特性

  • Leader 选举和基于优先级的半确定性 Leader 选举
  • 日志复制和恢复
  • 只读成员(学习者角色)
  • 快照和日志压缩
  • 集群线上配置变更,增加节点、删除节点、替换节点等
  • 主动变更 Leader,用于重启维护,Leader 负载平衡等
  • 对称网络分区容忍性
  • 非对称网络分区容忍性
  • 容错性,少数派故障,不影响系统整体可用性
  • 多数派故障时手动恢复集群可用
  • 高效的线性一致读,ReadIndex/LeaseRead
  • 流水线复制
  • 内置了基于 Metrics 类库的性能指标统计,有丰富的性能统计指标
  • 通过了 Jepsen 一致性验证测试
  • SOFAJRaft 中包含了一个嵌入式的分布式 KV 存储实现

3.Nacos 分布式一致性设计

请添加图片描述

3.1 nacos 分布式协议架构设计

请添加图片描述

一致性协议顶层接口

  • com.alibaba.nacos.consistency.ConsistencyProtocol
    • com.alibaba.nacos.core.distributed.AbstractConsistencyProtocol

CP 协议接口

  • com.alibaba.nacos.consistency.cp.CPProtocol
    • com.alibaba.nacos.core.distributed.raft.JRaftProtocol

AP 协议接口

  • com.alibaba.nacos.consistency.ap.APProtocol

3.1 nacos 用 jraft 做什么

  • leader 领导选举
  • 日志复制
    • 注册中心里面的服务节点信息的复制(同步)
    • 配置中心里面的配置信息的复制(同步)

3.2 Distro 协议

Distro 其实是一种 AP 协议的实现,类似的有 Eureka,Consul Gossip 等等

Distro 协议的主要设计思想如下:

  • Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。

  • 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。

  • 每个节点独立处理读请求,及时从本地发出响应。

4.实际项目-Spring 工程

4.1 旧版项目的集群模式

我们的项目为 data-porter,是一个分布式的数据迁移工具,采用配置文件的方式来确定集群中谁是 leader 节点,在某一个服务实例的配置中增加以下环境变量

data.porter.flow.client.tag=leader
  • leader 主节点
  • follower 从节点

leader 节点的功能

  • 用来管理所有 follower 节点
  • 用来分配 follower 节点任务
  • follower 迁移进度上报
  • 整体任务的监控
    • 任务卡死
    • 任务异常处理

follower 节点功能

  • 任务(业务)执行

4.2 为什么要升级

主要是为了 leader 选举功能。之前的 leader 是通过配置文件来控制,有明显的缺陷

  • 配置文件可能配置错误
  • 配置的 leader 节点挂掉了,整个分布式系统,需要 leader 节点完成的功能都将无法运行

4.3 如何集成

本项目只需要 leader 选举功能,所以并未实现日志复制功能,在最后的总结有提及原因

4.3.1 主要参考

  • 官网用户指南 https://www.sofastack.tech/projects/sofa-jraft/jraft-user-guide/
  • 官网示例代码
    • github:https://github.com/sofastack/sofa-jraft
    • com.alipay.sofa.jraft.example.election.*
  • Nacos 实现

4.3.2 copy 代码

将官网示例中的 com.alipay.sofa.jraft.example.election 包下面的所有代码拷贝到我们的项目中

请添加图片描述

以 data-porter 项目为例,我们可以新增 core 模块,再将代码复制过来

请添加图片描述

4.3.3 改造成 spring

只需要改造 com.xxx.xxx.dp.core.election.DataPorterElection 这一个类即可

官网示例代码如下:

  • FollowerStateListener 为本人自己扩展,不需要关注
  • 可以看到下面的示例是一个 main 启动的,而我们的项目是 java web + spring 项目,显然不符合我们的要求

请添加图片描述

改造的思路如下

  • 将整个启动类改造成一个 Spring Bean 的对象
  • 环境变量修改为 Spring properties 模式
  • 启动方式修改为 Spring Bean 构建的时候来进行初始化启动
4.3.3.1 Spring Bean 对象

类上增加 @Component 注解即可

4.3.3.2 properties 模式

DataPorterServiceProperties 对象

@ConfigurationProperties("data.porter.service")
@Data
public class DataPorterServiceProperties {

    private String dataPath = "/tmp/data-porter";

    private String groupId = "data-porter";

    private String serverAddress = "127.0.0.1:8081";
    /**
     * 集群 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
     */
    private String serverAddressList = "127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083";
}

DataPorterElection 增加 @EnableConfigurationProperties(DataPorterServiceProperties.class) 激活配置;properties.getDataPath() 获取属性

@Component
@Slf4j
@EnableConfigurationProperties(DataPorterServiceProperties.class)
public class DataPorterElection implements ApplicationEventPublisherAware, ApplicationListener<ApplicationReadyEvent> {

    @Autowired
    private DataPorterServiceProperties properties;
    
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        final ElectionNodeOptions electionOpts = new ElectionNodeOptions();
        electionOpts.setDataPath(properties.getDataPath());
        electionOpts.setGroupId(properties.getGroupId());
        electionOpts.setServerAddress(properties.getServerAddress());
        electionOpts.setInitialServerAddressList(properties.getServerAddressList());
        ...
    }
}
4.3.3.3 ApplicationReadyEvent

在 spring 容器启动 Ready 之后,再初始化 JRaft 组件

实现 ApplicationListener<ApplicationReadyEvent> 接口

@Component
@Slf4j
@EnableConfigurationProperties(DataPorterServiceProperties.class)
public class DataPorterElection implements ApplicationEventPublisherAware, ApplicationListener<ApplicationReadyEvent> {
    ...
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        ...
        // Jraft 初始化
    }
}
    

4.3.4 动态感知新增节点

这一段的代码就是参考 nacos 中的实现

官网示例还有一个问题就是,集群配置是在启动的时候,配置好的,如下

serverAddressList = "127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083";

如果此时集群新增一个节点,其他节点不会感知到这个新的节点,会抛出 WARN 信息,提示接收到的 request 请求不是集群 conf 中的节点,如下。

Node <data-porter/127.0.0.1:19212> ignore PreVoteRequest from 127.0.0.1:19215 as it is not in conf <ConfigurationEntry [id=LogId [index=1, term=1], conf=127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214, oldConf=]>.

那么如何让新增的节点添加到集群中呢?

在启动过程中,将本地节点注册到集群即可,核心代码如下

        localPeerId = PeerId.parsePeer(properties.getServerAddress());

        Configuration configuration = new Configuration();

        for (String address : properties.getServerAddressList().split(",")) {
            PeerId peerId = PeerId.parsePeer(address);
            configuration.addPeer(peerId);
        }

        executorService.execute(()->{
            // 动态将本地服务注册到 raft 实例集群
            registerSelfToCluster(properties.getGroupId(), localPeerId, configuration);
        });

registerSelfToCluster 方法

  void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
        for (; ; ) {
            try {
                List<PeerId> peerIds = cliService.getPeers(groupId, conf);
                if (peerIds.contains(selfIp)) {
                    return;
                }
                Status status = cliService.addPeer(groupId, conf, selfIp);
                if (status.isOk()) {
                    return;
                }
                log.warn("Failed to join the cluster, retry...");
            } catch (Exception e) {
                log.error("Failed to join the cluster, retry...", e);
            }
            try {
                Thread.sleep(1_000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

增加此段代码之后,日志会打印,表示新增的 config 被 Committed,可以看到 127.0.0.1:19215 被成功添加到集群中

c.a.sofa.jraft.core.StateMachineAdapter  : onConfigurationCommitted: 127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214,127.0.0.1:19215.

4.3.5 leader 节点信息更新事件通知

在服务实例 leader 节点发生变动的时候,将变动信息通知到业务侧,方便业务侧做后续处理

发送 spring event 事件

@Slf4j
public class DataPorterLeaderStateListener implements LeaderStateListener {

    private ApplicationEventPublisher applicationEventPublisher;

    public DataPorterLeaderStateListener(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    public void onLeaderStart(Node node, long leaderTerm) {
        if (null == node) {
            log.warn("node is null,term: " + leaderTerm);
            return;
        }
        PeerId serverId = node.getLeaderId();
        String ip = serverId.getIp();
        int port = serverId.getPort();
        log.info("[DataPorterElection] Leader's ip is: " + ip + ", port: " + port);
        log.info("[DataPorterElection] Leader start on term: " + leaderTerm);
        applicationEventPublisher.publishEvent(new LeaderStartedEvent(ip, port));
    }

    @Override
    public void onLeaderStop(Node node, long leaderTerm) {
        System.out.println("[DataPorterElection] Leader stop on term: " + leaderTerm);
    }
}

监听 LeaderStartedEvent 事件,进行业务侧处理

@Component
public class LeaderStartedEventListener implements ApplicationListener<LeaderStartedEvent> {

    @Override
    public void onApplicationEvent(LeaderStartedEvent event) {
       // 业务处理
    }
}

4.3.6 如何使用

以 capture 为例

pom 新增依赖

    <dependency>
      <groupId>com.megvii.cbg</groupId>
      <artifactId>data-porter-core</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>

配置文件增加

data.porter.service.dataPath=/tmp/data-porter-capture1
data.porter.service.serverAddress=127.0.0.1:19212
data.porter.service.serverAddressList=127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214

需要注意这个里面的端口号是 GRPC 服务端的端口号,不能和 web 服务 server.port 一样。

4.3.7 本地测试

这里用了5个节点来进行测试。分别是 CaptureApplication1 - CaptureApplication5

请添加图片描述

环境变量如下

请添加图片描述

文件存储路径

请添加图片描述

5.思考

1.类似 Raft 这一类的分布式共识算法,在我们的平常业务开发中是很难用到的

  • 一般我们的业务每个服务实例都是平等的,不需要区分 Leader 和 Follower 节点。负载均衡等一些功能也都是通过注册中心,zuul ,riibon,loadbalance 等组件来实现
  • 日志复制(数据复制)更用不上了,业务数据一般都是存储在数据库中
    • 单个数据库的情况,完全就不需要数据复制了
    • 多个数据库,也完全可以利用数据库的能力来完成数据的一致性,而不需要利用 raft 协议中日志复制功能

2.Raft 更多使用是在一些分布式中间件的工具,这类工具使用 Raft 可以很容易的解决分布式中的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/433748.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SA168 3BSE003389R1

SA168 3BSE003389R1 远程终端控制系统&#xff08;RTU&#xff09;可连接到其他设备。RTU可将设备上的电气信号转换为数字的值&#xff0c;例如一个开关或阀开/关的状态&#xff0c;或是仪器量测到的压力、流量、电压或电流。也可以借由信号转换及传送信号来控制设备&#xff0…

硬盘分区怎么分?新手该如何操作?

相信很多电脑用户都遇到过硬盘分区的情况。刚拿到手的新电脑&#xff0c;基本上都是一个或两个磁盘分区&#xff0c;这不满足我们的使用习惯&#xff0c;比如我们在不同的分区存放不同的东西&#xff0c;只有一个分区就很难做到&#xff0c;所以这时候需要进行磁盘分区。那么硬…

解读“SAP集成架构咨询方法论”

如果你是SAP ERP相关工作的&#xff0c;建议大家点开原文地址去看&#xff0c;会学习到其他很多与这个行业更多的资料。 原文地址&#xff1a;解读“SAP集成架构咨询方法论” | SAP Blogs 原文地址&#xff1a;解读“SAP集成架构咨询方法论” | SAP Blogs ——————————…

经验分享:如何有效应对Facebook广告数据波动问题?

Facebook广告作为一种重要的数字营销工具&#xff0c;可以帮助企业和品牌快速获得目标受众的关注和转化。然而&#xff0c;由于广告投放过程的不稳定性&#xff0c;Facebook广告数据波动问题也经常出现。 对于广告主而言&#xff0c;如何应对Facebook广告数据波动问题&#xf…

【JVM】JMM

一、JMM JVM 内存模型是用来屏蔽掉各种硬件和操作系统的内存访问差异&#xff0c;以实现让 Java 程序在各个平台下都能达到一致的内存访问效果。JVM 内存模型规定了所有的共享变量都是存储在主内存&#xff0c;每个线程还有自己的工作内存&#xff0c;线程的工作内存保存了该线…

【教学类-34-01】拼图(运动项目)3*4格子(中班主题《个别化拼图》健康偏艺术-美术)

背景需求&#xff1a; 一个月的Python纸类学具研究中&#xff0c;我发现个别男孩喜欢把作业中的数字、图案、单元格剪成小块&#xff08;小卡片&#xff09;进几周&#xff0c;剪条、剪块的孩子人数也慢慢递增。 幼儿需求&#xff1a;锻炼手指精细动作的需求、或者获得更多物…

Idea+maven+spring-cloud项目搭建系列--13 整合MyBatis-Plus多数据源dynamic-datasource

前言&#xff1a;对于同一个系统&#xff0c;不同的租户需要自己独立分隔的数据库&#xff08;每个数据库的表结构可以是相同的&#xff09;&#xff0c;同时也要支持跨数据源的查询&#xff1b;并且支持分布式事务&#xff0c;如果这里不使用分库分表插件&#xff0c;需要怎样…

python-day4(字符串、列表、生成式和生成器、使用元组、集合、字典)

字符串和常用数据结构 简单用法 所谓字符串&#xff0c;就是由零个或多个字符组成的有限序列&#xff0c;一半记为sa1a2a3…an(0<n<∞)。在python中&#xff0c;如果我们把单个或多个字符用单引号或者双引号包围起来&#xff0c;就可以表示一个字符串。 s1 hello, wo…

【生物信息】用隐马尔可夫模型对生物序列进行建模

文章目录 Modeling biological sequences with HMMSParsing longer sequences. 举例子Our frst HMM: Detecting GC-rich regionsRunning the model: Probability of a sequence 维特比算法 Viterbi一个摸球例子回到课堂 求解参数 来自Manolis Kellis教授的课 教了隐马尔可夫在基…

FPGA开发之HDMI Transmitter接口设计

HDMI简介&#xff1a; High Definition Multimedia 高清多媒体接口&#xff0c;一种全数字化视频和声音发送接口&#xff0c;可以发送未压缩的音频及视频信号 物理接口&#xff1a; 电气介绍&#xff1a; TMDS&#xff08;Transition Minimized Differential Signaling&#x…

自定义类型——位段

什么是位段&#xff1f; 位段又叫做位域&#xff0c;具体是一种可以把数据以位的形式紧凑的存储&#xff0c;并允许程序员对此结构位进行操作的数据结构 当结构体的成员变量定义之后浪费了较大的空间 &#xff08;比如int a 2&#xff0c;则浪费了30个比特位的空间&#xff0…

Mail 服务器

Mail 服务器 1. 概念及协议2. 工具2.1 Postfix2.2 dovecot2.3 bind 3. 搭建3.1 DNS服务设置3.2 安装配置 postfix3.3 安装配置 dovecot 4. foxmail验证 1. 概念及协议 邮件服务器也采用的是C/S工作模式&#xff0c;通过SMTP,POP,IMAP协议来是实现邮件的发送和接收的。 SMTP 的…

Java入坑之IO操作

目录 一、IO流的概念 二、字节流 2.1InputStream的方法 2.2Outputstream的方法 2.3资源对象的关闭&#xff1a; 2.4transferTo()方法 2.5readAllBytes() 方法 2.6BufferedReader 和 InputStreamReader 2.7BufferedWriter 和 OutputStreamWriter 三、路径&#xff1a;…

Qt5 编译QtXlsx并添加为模块[Windows]

00.QtXlsx是什么&#xff1f;能干什么&#xff1f; QtXlsx是一个可以读写Excel文件的库。它不需要Microsoft Excel&#xff0c;可以在Qt5支持的任何平台上使用。 可以创建、读取、编辑.xlsx文件。 01.如何编译&#xff1f; 1.1编译环境&#xff1a; Windows10平台&#xff1b;…

es6笔记-let、const、var的区别

let、const、var的区别 变量提升 var 声明的变量存在变量提升,在声明前可以调用&#xff0c;直为undefindconsole.log(a); var a 1;相当于&#xff1a;var a; console.log(a); a 1;let和const不存在变量提升&#xff0c;变量要在声明前调用&#xff0c;否则报错console.log(a…

DNS服务器配置

一&#xff0c;正向解析 1>安装软件bind 提供DNS服务的软件叫bind&#xff0c;服务名是named [rootserver ~]# yum install bind -y 2>对三个配置文件进行修改 - /etc/named.conf : 主配置文件&#xff0c;共59行&#xff0c;去除注释和空行之和有效行数仅30行左右&…

Golang程序报错:fatal error: all goroutines are asleep - deadlock

文章目录 1.原始代码2.错误原因分析3. 解决方案4. 经验总结5. 练习 完整的报错信息如下&#xff1a; fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]: main.(*WorkerManager).KeepAlive(0xc000088f60)/root/go_workspace/studygoup/05.go:66 0…

Doris(7):数据导入(Load)之Routine Load

例行导入功能为用户提供了义中自动从指定数据源进行数据导入的功能 1 适用场景 当前仅支持kafka系统进行例行导入。 2 使用限制 支持无认证的 Kafka 访问&#xff0c;以及通过 SSL 方式认证的 Kafka 集群。支持的消息格式为 csv 文本格式。每一个 message 为一行&#xff0c;…

【Cpp】手撕搜索二叉树(K模型)

文章目录 二叉搜索树概念详解二叉搜索树的概念二叉搜索树的操作(大致思路)二叉搜索树的查找二叉搜索树的插入二叉搜索树的删除(最重点) 手撕搜索二叉树代码结点定义(以key型为例,KV型将在下一篇博客中介绍)树结构定义深拷贝构造函数与构造函数赋值重载析构函数遍历(结果按从小到…

软件测试的当下分析

在没有清晰能见度的情况下驾驶汽车不仅非常危险&#xff0c;也十分鲁莽。这会让我们和我们周边的人随时面临着碰撞、受伤、甚至死亡的风险。如果不能看到前方的道路&#xff0c;我们就无法预测潜在的危险或障碍&#xff0c;从而无法做出明智的决定并采取适当的行动。 同样&…