实际项目集成分布式一致性协议 Raft
文章目录
- 实际项目集成分布式一致性协议 Raft
- 前言
- 1.raft 是什么?
- 2.SOFAJRaft
- 2.1 功能特性
- 3.Nacos 分布式一致性设计
- 3.1 nacos 分布式协议架构设计
- 3.1 nacos 用 jraft 做什么
- 3.2 Distro 协议
- 4.实际项目-Spring 工程
- 4.1 旧版项目的集群模式
- 4.2 为什么要升级
- 4.3 如何集成
- 4.3.1 主要参考
- 4.3.2 copy 代码
- 4.3.3 改造成 spring
- 4.3.3.1 Spring Bean 对象
- 4.3.3.2 properties 模式
- 4.3.3.3 ApplicationReadyEvent
- 4.3.4 动态感知新增节点
- 4.3.5 leader 节点信息更新事件通知
- 4.3.6 如何使用
- 4.3.7 本地测试
- 5.思考
前言
-
只要理解了共识算法,其实集成 raft 很简单,由于共识算法优秀文章已经很多了,本章就不做过多赘述了;
-
这里我们重点是讨论如何在我们的实际工程中集成 raft 协议,来实现分布式一致性;
-
前面我们只会简单的介绍分布式共识算法以及学习 Nacos 如何来集成分布式算法,重点在第4小节;
-
看完本章,大家可以根据自己项目的实际情况来决定是否需要集成 raft 协议来实现分布式一致性。
1.raft 是什么?
https://raft.github.io/
Raft is a consensus algorithm that is designed to be easy to understand. It’s equivalent to Paxos in fault-tolerance and performance. The difference is that it’s decomposed into relatively independent subproblems, and it cleanly addresses all major pieces needed for practical systems. We hope Raft will make consensus available to a wider audience, and that this wider audience will be able to develop a variety of higher quality consensus-based systems than are available today.
分布式共识算法
2.SOFAJRaft
SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。 使用 SOFAJRaft 你可以专注于自己的业务领域,由 SOFAJRaft 负责处理所有与 RAFT 相关的技术难题,并且 SOFAJRaft 非常易于使用,你可以通过几个示例在很短的时间内掌握它。
github 地址:https://github.com/sofastack/sofa-jraft
2.1 功能特性
- Leader 选举和基于优先级的半确定性 Leader 选举
- 日志复制和恢复
- 只读成员(学习者角色)
- 快照和日志压缩
- 集群线上配置变更,增加节点、删除节点、替换节点等
- 主动变更 Leader,用于重启维护,Leader 负载平衡等
- 对称网络分区容忍性
- 非对称网络分区容忍性
- 容错性,少数派故障,不影响系统整体可用性
- 多数派故障时手动恢复集群可用
- 高效的线性一致读,ReadIndex/LeaseRead
- 流水线复制
- 内置了基于 Metrics 类库的性能指标统计,有丰富的性能统计指标
- 通过了 Jepsen 一致性验证测试
- SOFAJRaft 中包含了一个嵌入式的分布式 KV 存储实现
3.Nacos 分布式一致性设计
3.1 nacos 分布式协议架构设计
一致性协议顶层接口
- com.alibaba.nacos.consistency.ConsistencyProtocol
- com.alibaba.nacos.core.distributed.AbstractConsistencyProtocol
CP 协议接口
- com.alibaba.nacos.consistency.cp.CPProtocol
- com.alibaba.nacos.core.distributed.raft.JRaftProtocol
AP 协议接口
- com.alibaba.nacos.consistency.ap.APProtocol
3.1 nacos 用 jraft 做什么
- leader 领导选举
- 日志复制
- 注册中心里面的服务节点信息的复制(同步)
- 配置中心里面的配置信息的复制(同步)
3.2 Distro 协议
Distro 其实是一种 AP 协议的实现,类似的有 Eureka,Consul Gossip 等等
Distro 协议的主要设计思想如下:
-
Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
-
每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。
-
每个节点独立处理读请求,及时从本地发出响应。
4.实际项目-Spring 工程
4.1 旧版项目的集群模式
我们的项目为 data-porter,是一个分布式的数据迁移工具,采用配置文件的方式来确定集群中谁是 leader 节点,在某一个服务实例的配置中增加以下环境变量
data.porter.flow.client.tag=leader
- leader 主节点
- follower 从节点
leader 节点的功能
- 用来管理所有 follower 节点
- 用来分配 follower 节点任务
- follower 迁移进度上报
- 整体任务的监控
- 任务卡死
- 任务异常处理
follower 节点功能
- 任务(业务)执行
4.2 为什么要升级
主要是为了 leader 选举功能。之前的 leader 是通过配置文件来控制,有明显的缺陷
- 配置文件可能配置错误
- 配置的 leader 节点挂掉了,整个分布式系统,需要 leader 节点完成的功能都将无法运行
4.3 如何集成
本项目只需要 leader 选举功能,所以并未实现日志复制功能,在最后的总结有提及原因
4.3.1 主要参考
- 官网用户指南 https://www.sofastack.tech/projects/sofa-jraft/jraft-user-guide/
- 官网示例代码
- github:https://github.com/sofastack/sofa-jraft
- com.alipay.sofa.jraft.example.election.*
- Nacos 实现
4.3.2 copy 代码
将官网示例中的 com.alipay.sofa.jraft.example.election 包下面的所有代码拷贝到我们的项目中
以 data-porter 项目为例,我们可以新增 core 模块,再将代码复制过来
4.3.3 改造成 spring
只需要改造 com.xxx.xxx.dp.core.election.DataPorterElection 这一个类即可
官网示例代码如下:
- FollowerStateListener 为本人自己扩展,不需要关注
- 可以看到下面的示例是一个 main 启动的,而我们的项目是 java web + spring 项目,显然不符合我们的要求
改造的思路如下
- 将整个启动类改造成一个 Spring Bean 的对象
- 环境变量修改为 Spring properties 模式
- 启动方式修改为 Spring Bean 构建的时候来进行初始化启动
4.3.3.1 Spring Bean 对象
类上增加 @Component 注解即可
4.3.3.2 properties 模式
DataPorterServiceProperties 对象
@ConfigurationProperties("data.porter.service")
@Data
public class DataPorterServiceProperties {
private String dataPath = "/tmp/data-porter";
private String groupId = "data-porter";
private String serverAddress = "127.0.0.1:8081";
/**
* 集群 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
*/
private String serverAddressList = "127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083";
}
DataPorterElection 增加 @EnableConfigurationProperties(DataPorterServiceProperties.class) 激活配置;properties.getDataPath() 获取属性
@Component
@Slf4j
@EnableConfigurationProperties(DataPorterServiceProperties.class)
public class DataPorterElection implements ApplicationEventPublisherAware, ApplicationListener<ApplicationReadyEvent> {
@Autowired
private DataPorterServiceProperties properties;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
final ElectionNodeOptions electionOpts = new ElectionNodeOptions();
electionOpts.setDataPath(properties.getDataPath());
electionOpts.setGroupId(properties.getGroupId());
electionOpts.setServerAddress(properties.getServerAddress());
electionOpts.setInitialServerAddressList(properties.getServerAddressList());
...
}
}
4.3.3.3 ApplicationReadyEvent
在 spring 容器启动 Ready 之后,再初始化 JRaft 组件
实现 ApplicationListener<ApplicationReadyEvent> 接口
@Component
@Slf4j
@EnableConfigurationProperties(DataPorterServiceProperties.class)
public class DataPorterElection implements ApplicationEventPublisherAware, ApplicationListener<ApplicationReadyEvent> {
...
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
...
// Jraft 初始化
}
}
4.3.4 动态感知新增节点
这一段的代码就是参考 nacos 中的实现
官网示例还有一个问题就是,集群配置是在启动的时候,配置好的,如下
serverAddressList = "127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083";
如果此时集群新增一个节点,其他节点不会感知到这个新的节点,会抛出 WARN 信息,提示接收到的 request 请求不是集群 conf 中的节点,如下。
Node <data-porter/127.0.0.1:19212> ignore PreVoteRequest from 127.0.0.1:19215 as it is not in conf <ConfigurationEntry [id=LogId [index=1, term=1], conf=127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214, oldConf=]>.
那么如何让新增的节点添加到集群中呢?
在启动过程中,将本地节点注册到集群即可,核心代码如下
localPeerId = PeerId.parsePeer(properties.getServerAddress());
Configuration configuration = new Configuration();
for (String address : properties.getServerAddressList().split(",")) {
PeerId peerId = PeerId.parsePeer(address);
configuration.addPeer(peerId);
}
executorService.execute(()->{
// 动态将本地服务注册到 raft 实例集群
registerSelfToCluster(properties.getGroupId(), localPeerId, configuration);
});
registerSelfToCluster 方法
void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
for (; ; ) {
try {
List<PeerId> peerIds = cliService.getPeers(groupId, conf);
if (peerIds.contains(selfIp)) {
return;
}
Status status = cliService.addPeer(groupId, conf, selfIp);
if (status.isOk()) {
return;
}
log.warn("Failed to join the cluster, retry...");
} catch (Exception e) {
log.error("Failed to join the cluster, retry...", e);
}
try {
Thread.sleep(1_000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
增加此段代码之后,日志会打印,表示新增的 config 被 Committed,可以看到 127.0.0.1:19215 被成功添加到集群中
c.a.sofa.jraft.core.StateMachineAdapter : onConfigurationCommitted: 127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214,127.0.0.1:19215.
4.3.5 leader 节点信息更新事件通知
在服务实例 leader 节点发生变动的时候,将变动信息通知到业务侧,方便业务侧做后续处理
发送 spring event 事件
@Slf4j
public class DataPorterLeaderStateListener implements LeaderStateListener {
private ApplicationEventPublisher applicationEventPublisher;
public DataPorterLeaderStateListener(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void onLeaderStart(Node node, long leaderTerm) {
if (null == node) {
log.warn("node is null,term: " + leaderTerm);
return;
}
PeerId serverId = node.getLeaderId();
String ip = serverId.getIp();
int port = serverId.getPort();
log.info("[DataPorterElection] Leader's ip is: " + ip + ", port: " + port);
log.info("[DataPorterElection] Leader start on term: " + leaderTerm);
applicationEventPublisher.publishEvent(new LeaderStartedEvent(ip, port));
}
@Override
public void onLeaderStop(Node node, long leaderTerm) {
System.out.println("[DataPorterElection] Leader stop on term: " + leaderTerm);
}
}
监听 LeaderStartedEvent 事件,进行业务侧处理
@Component
public class LeaderStartedEventListener implements ApplicationListener<LeaderStartedEvent> {
@Override
public void onApplicationEvent(LeaderStartedEvent event) {
// 业务处理
}
}
4.3.6 如何使用
以 capture 为例
pom 新增依赖
<dependency>
<groupId>com.megvii.cbg</groupId>
<artifactId>data-porter-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
配置文件增加
data.porter.service.dataPath=/tmp/data-porter-capture1
data.porter.service.serverAddress=127.0.0.1:19212
data.porter.service.serverAddressList=127.0.0.1:19212,127.0.0.1:19213,127.0.0.1:19214
需要注意这个里面的端口号是 GRPC 服务端的端口号,不能和 web 服务 server.port 一样。
4.3.7 本地测试
这里用了5个节点来进行测试。分别是 CaptureApplication1 - CaptureApplication5
环境变量如下
文件存储路径
5.思考
1.类似 Raft 这一类的分布式共识算法,在我们的平常业务开发中是很难用到的
- 一般我们的业务每个服务实例都是平等的,不需要区分 Leader 和 Follower 节点。负载均衡等一些功能也都是通过注册中心,zuul ,riibon,loadbalance 等组件来实现
- 日志复制(数据复制)更用不上了,业务数据一般都是存储在数据库中
- 单个数据库的情况,完全就不需要数据复制了
- 多个数据库,也完全可以利用数据库的能力来完成数据的一致性,而不需要利用 raft 协议中日志复制功能
2.Raft 更多使用是在一些分布式中间件的工具,这类工具使用 Raft 可以很容易的解决分布式中的各种问题。