文章目录
- Nacos CP集群
- 说明
- Raft协议
- leader选举
- 重新选举leader
- 多个Candidate情况
- 更新操作,日志复制
- 网络分区
- 源码实现
- 服务注册
- leader选举
- leader心跳包
Nacos CP集群
说明
CAP原则
- C 一致性 Consistency
- A 可用性 Availability
- 分区容错性 Partition tolerance
分区容错性:在集群架构下,如果出现了网络中断,某些节点之间不能交互了,此时整个集群服务就不可用了,这是肯定不行的。分区容错性就是出现这种情况但是集群还能提供服务。
一致性和可用性就是当出现网络分区之后,集群中某个时间点数据不一致,此时我是应该先暂时停掉某些节点来保证集群各个节点一致性嘞(或先不对外提供服务,网络分区恢复数据同步完成后再提供服务),还是优先保证集群的可用性(能容忍短时间内的数据不一致性)
在NacosClient端,在配置文件中有一个spring.cloud.nacos.discovery.ephemeral = true
配置项来指定当前服务实例是否为临时实例,默认是临时实例,如果我们改为false则变为持久化实例,它就会进行CP的架构流程
在AP架构下,服务实例数据是写入在内存中的。而CP架构下的持久化实例除了会写入在内存中,它还会写入在file文件中,
文件保存的默认路径是\$nacosHome\data\naming\data\namespaceId\XXXX文件
BASE原则
- BA 基本可用 Basically Available
- S 软状态 Soft State
- E 最终一致性 Eventual Consistency
CAP原则是三选二、BASE原则是CAP的折中,C、A、P三个都要,但不用100%保证每一个原则
分布式系统肯定优先保证P,多数时候是在C和A之间做权衡选择
Raft协议
Raft协议和Zookeepr使用的ZAB协议很相似,主要包括两部分:
- leader选举,半数以上节点投票同意
- 集群写入数据同步,两阶段提交,先记录日志,待提交状态,同步其他节点,半数以上节点写入成功再变为提交状态,响应客户端
Raft协议详细情况:
leader选举
-
刚开始,集群中各个节点都是follower 追随者/属下 状态
-
如果follower节点没有收到leader的消息,那么它就可以成为leader
-
集群中各个节点都有自己的一个随机倒计时,一般是150ms~300ms之间
-
当某个倒计时到了之后,它会变为候选者Candidate状态,首先给自己投一票
-
然后发送请求给其他follower 节点
-
如果接收节点尚未在这一阶段投票,那么它将投票给候选人,follower 节点将回复投票,并且会重新进行随机倒计时
-
当接收到半数以上节点的投票响应后,它就会从Candidate状态变为leader。
-
leader开始定时向其follower 发送心跳消息,心跳包中包含了一些数据
-
follower接收到leader的心跳包之后会返回一些信息,并重置倒计时时间
与ZAB协议的区别是,ZAB的各个节点都会发起投票,它没有倒计时休眠设计,各个节点拿到选票后还要进行pk,pk完后当半数以上的节点支持某个节点成为leader它才会真正成为leader
重新选举leader
leader停止后,将会进行重新选举流程
-
follower接收不到leader的心跳包了,所以倒计时会一直进行
-
当倒计时到了之后,就会变为Candidate候选者,并先给自己投一票,在去给其他follower节点发送请求
-
其他follower节点当前还未在这一阶段投票,就会把票投给请求方
-
当Candidate节点接收到半数以上节点的票后就会变为leader,接下来它便开始发送心跳包了
-
接收follower节点的心跳响应
多个Candidate情况
各个节点都有一个随机的倒计时,可能会出现一种特殊情况,多个节点的随机倒计时一样,在同一时间了从follower状态变为了Candidate候选者状态,并先给自己投一票,在给其他follower节点发送投票请求。
每一个Candidate候选者节点都比另一个节点先到达一个follower追随者节点
现在每个候选人都有2票,不能再获得更多的选票,这种情况下就根本选不出leader
此时就会进行新的一轮选举,因为各个节点的倒计时都还在进行,在新的一轮选举中,总不会还出现了随机倒计时一样的情况吧
此时某个Candidate就会变为leader,并开始发送心跳包了
更新操作,日志复制
-
更新操作只能在leader节点进行,leader节点需要将所有的更新操作,复制给其他follower节点。
-
首先一个客户端发送一个更新操作给leader
-
进行更新操作时,会先写入进一个日志中,该日志项当前未提交状态,因此不会更新leader节点的值
-
leader节点将在下一次心跳时将更改发送给追随者follower 节点
-
然后leader等待,直到大多数节点都写了该数据,follower节点这里也是写的日志,并响应给leader
-
此时leader节点才会将该条记录变为提交状态,并真正进行更新操作
-
响应给客户端
-
leader在通知其他follower 节点该数据已经是提交状态了,其他节点也在将各自日志中的数据进行真正的更新操作
网络分区
Raft协议可以在网络分区时保持一致,比如集群的初始状态如下所示,NodeB为leader,并正在进行发送心跳包
网络分区后,A和B在一边,CDE三个节点在另一边
此时只有NodeA能接收到leader的心跳了,其他三个节点都没有接收到心跳包
其他三个节点的倒计时因为一直没有接收到leader的心跳包,所以也就不会重新开始,那么此时就会出现某一个节点当倒计时到时间后就会从follower状态变为Candidate状态,并开始进行投票机制,它先给自己投一票,然后再发送请求,收到另外的两票,再从Candidate状态变为leader
添加另一个客户端,并尝试更新两个leader。
一个客户端尝试将leader节点的值设置为3,该leader记录日志后,进行日志复制,一直接收不到超过半数节点的响应,当前的更新也就一直是未提交状态
此时其他客户端尝试将leader节点的值设置为8,这将会成功,因为它可以复制到大多数节点
现在让我们修复网络分区
此时节点B将看到更高的选举期限并退出。两个节点A和B都将回滚它们未提交的条目,并匹配新的leader的日志。
此时整个集群的数据一致了。
源码实现
服务注册
服务实例进行注册,调用NacosServer端的接口时,此时的实例会分为临时实例和持久化实例。注册实例时会从controller层进入到service层,会进入到ServiceManager.addInstance()
方法中来
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据ephemeral的值来决定生成上面key,默认情况下NacosClient传递过来的都是true,一般微服务的实例都是临时实例,不是持久化实例
// 持久化实例 key = 一些字符串常量 + namespaceId + “##” + serviceName
// 临时实例 key = 一些字符串常量 + “ephemeral” + namespaceId + “##” + serviceName
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 这里会进入到下一个逻辑中
consistencyService.put(key, instances);
}
}
上方会调用进DelegateConsistencyServiceImpl
实现类的put()
方法中,在这里就会根据key中是否为临时实例,进而决定调用不同实现类的put()方法
public void put(String key, Record value) throws NacosException {
// mapConsistencyService(key)根据是否为临时实例,进而去调用不同实现类的put()方法
mapConsistencyService(key).put(key, value);
}
// 根据是否为临时实例,返回不同的实现类
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
-
这里如果是临时实例就会进入到
DistroConsistencyServiceImpl
实现类的put()方法中 -
如果是持久化实例就会进入到
PersistentConsistencyServiceDelegateImpl
实现类的put()方法中
我们需要知道一个点,Nacos服务注册中心是对内存的操作,数据库中存储的是配置中心的数据
PersistentConsistencyServiceDelegateImpl
实现类中有一些小改动,我们可以先直接去看他原生的Raft协议RaftConsistencyServiceImpl
类
下面代码具体的实现,RaftConsistencyServiceImpl
类(弃用了,了解即可):
- 在这个类中如果新增了数据它会先进行写文件的操作
- 然后直接发布一个ValueChangeEvent事件
- 处理该事件的方法中会对内存中的数据进行更改
- 发布完事件后再去通知其他节点。
相当于这里没有使用二阶段提交,而是直接一阶段就完成了。这是有一些问题的,就比如其他节点并没有更新成功,但本机却以及更新完成了。
下面的代码可以学一下,这里就使用了CountDownLatch来实现的半数以上节点的正常响应
// RaftConsistencyServiceImpl类
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
// 核心方法入口
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);
}
}
------------------------------------------------------------------------------------------------------
// 进入到RaftCore.signalPublish()方法
// 写文件 --> 直接更新内存,未向nacos其他节点同步数据 --> 再向nacos其他节点同步数据
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 当前节点如果不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
// 直接把请求转发给leader,让leader去进行写数据
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
// 是leader节点的处理逻辑
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
// 核心方法,进行发布
// 该方法中会先对更新操作写入到磁盘文件中,然后发布一个ValueChangeEvent事件,处理该事件的方法中会对内存中的数据进行更改
// 相当于这里没有进行两阶段提交,直接一阶段
onPublish(datum, peers.local());
final String content = json.toString();
// 这里再去通知其他节点,
// peers就是各个Nacos集群节点,取peers.size() / 2 + 1 个数的CountDownLatch
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
// 遍历除了自己之外的nacos节点
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
// 调用各个节点 发送请求
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
// 各个节点响应的回调方法,如果是正常响应则countDown()
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
// 如果并没有半数以上的节点正常响应,那么这里就会抛异常,但此时我本机的内存数据已经更改完成了。
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
------------------------------------------------------------------------------------------------------
// 先写文件 ---> 发布事件 ----> 处理事件的类会去更新内存中注册表的数据
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
// 写文件 一般是写入到\nacosHome\data\naming\data\namespaceId\XXXX文件
raftStore.write(datum);
}
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
// 文件写完之后会发布一个ValueChangeEvent事件,而处理该事件的是在PersistentNotifier类中onEvent()方法
// 在处理事件的方法中就会真正去更新内存中的数据,也就是服务注册表中的数据
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
leader选举
上面服务注册是调用的RaftCore.signalPublish()
方法,我们现在看看RaftCore.init()
方法
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
// 该节点刚启动,需要从磁盘文件中读取数据
// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的
raftStore.loadDatums(notifier, datums);
...
// 执行两个定时任务,每隔500ms执行一次,一个是选举主节点,另一个是发送心跳包
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
...
}
对于Leader的选举核心就是要看MasterElection
这个定时任务:
具体实现流程总结如下:
-
检验选举时间leaderDueMs属性是<=0
-
重置选举时间和发送心跳包时间
-
重置集群各个节点投票
-
选举周期+1
-
先投票给自己、修改为candidate
-
向各个follower节点发送投票请求,请求参数就是本机节点信息
/raft/vote 接口提供方的逻辑:
- 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)
- 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)
- 更新它的term为收到的候选者节点term;
- 将本地节点作为响应数据返回
-
发送投票请求后,处理响应数据
- 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
- 记录被选举次数最多的节点和次数
- 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
// 每执行一次任务就减一次任务执行的间隔时间,直到减到小于等于0就可以开始投票了
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
// 重置选举时间
local.resetLeaderDue();
// 重置心跳时间
local.resetHeartbeatDue();
// 核心方法 进行投票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
--------------------------------------------------------------------------------------------------------------------
// 真正发送请求 进行leader选举投票
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
// 重置集群各个节点投票
peers.reset();
// 选举周期+1
local.term.incrementAndGet();
// 先投票给自己
local.voteFor = local.ip;
// 把自己的状态从follower改为候选者Candidate
local.state = RaftPeer.State.CANDIDATE;
// 请求参数为local对象
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
// 遍历除了自己之外的集群节点
for (final String server : peers.allServersWithoutMySelf()) {
// 获取各个节点的url,发送异步post请求
final String url = buildUrl(server, API_VOTE);
try {
/**
* /raft/vote 接口提供方的逻辑:
* 如果收到的候选节点term小于等于本地节点term,则本地节点的voteFor更新为自己(意思是这一票投给我自己,我更适合做leader)
* 否则这个follower将重置它的election timeout;更新它的voteFor为收到的候选者节点ip(意思是这一票就投给你了)
* 更新它的term为收到的候选者节点term;
* 将本地节点作为响应数据返回
*/
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
// 解析其他节点的响应数据
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
// 去处理其他节点的响应数据
// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
// 记录被选举次数最多的节点和次数
// 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
peers.decideLeader(peer);
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
// 遍历所有的节点,若节点的voteFor不为null,则将voteFor添加进ips中
// 记录被选举次数最多的节点和次数
// 选举最多次数 > 集群节点半数+1 就把选举次数最多的节点状态改为leader
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
// 记录被选举最多次数
int maxApproveCount = 0;
// 记录被选举次数最多的节点
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
// 当前收到的选票 > 集群节点半数+1
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
// 把maxApprovePeer的节点状态改为leader
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
leader心跳包
在RaftCore.init()
方法,执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
// 该节点刚启动,需要从磁盘文件中读取数据
// 持久化实例是会在文件中保存一份,如果当且节点宕机了再重启,肯定是需要重新读取文件进行数据恢复的
raftStore.loadDatums(notifier, datums);
...
// 执行两个定时任务,每隔5秒执行一次,一个是选举主节点,另一个是发送心跳包
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
...
}
HeartBeat定时任务发送心跳包的具体实现:
具体实现流程总结如下:
- 发送心跳包时间heartbeatDueMs是否 <= 0
- 重置heartbeatDueMs 时间
- 当前的节点如果不是leader那么就不能发送心跳
- 处理请求参数,将本机节点信息、本机中所有的Datum.key 和它所对应的时间戳进行压缩,封装为请求参数作为心跳包发送个各个节点
public class HeartBeat implements Runnable{
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
// 每执行一次任务就减少一次 任务执行的间隔时间500ms ,直到减少到小于等于0就开始向其他节点发送心跳
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
// 重置heartbeatDueMs 时间
local.resetHeartbeatDue();
// 核心方法,发送心跳的逻辑
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
--------------------------------------------------------------------------------------------------------------------
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 当前的节点如果不是leader那么就不能发送心跳
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
local.resetLeaderDue();
// build data
// 心跳包最终的数据
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
// 当前节点信息
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
// array存放当前节点中所有服务实例集合对应的datum.key
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
}
if (!switchDomain.isSendBeatOnly()) {
// 遍历当前所有的datums
for (Datum datum : datums.values()) {
// 创建一个空的ObjectNode
ObjectNode element = JacksonUtils.createEmptyJsonNode();
// 这里只存放datum.key
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
// 存放datum对应的timestamp时间戳,也就相当于一个版本号
element.put("timestamp", datum.timestamp.get());
// 添加进array中
array.add(element);
}
}
packet.replace("datums", array);
// 接下来对请求参数packet进行封装并压缩
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
// compressedContent存放的就是压缩之后的请求参数
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
compressedContent.length());
}
// 向除了自己的其他节点发送心跳包,心跳包中包含的数据就是当前节点信息 + 当前节点中所有的datum.key
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildUrl(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
// /raft/beat 发送请求
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return;
}
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
throwable);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}
我们接下来再来看看/raft/beat
接口的具体实现,看看follower节点是如何处理leader节点的心跳包的,会进入到RaftController.beat()
方法
具体实现流程总结如下:
-
解析请求参数
-
接收到的心跳包如果不是leader发送的则抛异常
-
如果本机的term 大于 心跳包term,则心跳包不进行处理
-
当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ip
-
重置本机的选举时间和心跳发送时间
-
更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)
-
创建一个Map集合:
Map<String, Integer> receivedKeysMap
- 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的
- 经过下面的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了
- 所以在最后会有一个利用该集合进行移除Datum的操作
-
将本机中的datum对象都存入receivedKeysMap集合中,value都是0
-
创建一个批处理集合batch
-
遍历心跳包中的beatDatums
- 每一次遍历都把datumKey存入到receivedKeysMap中去,value都是1
- 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大。这就表示我不需要处理这一条datumKey
- 和上面的if判断逻辑相反,则需要添加进batch这个集合中
- 批量处理逻辑,表示我的batch集合还能继续存放数据
假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的
processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素
第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理 - 当batch集合存放的数据超过50后,调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader。 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中
-
循环遍历结束后,处理receivedKeysMap集合value为0的Datum。移除Datum,先删除内存中datums,在删除磁盘文件
// 在RaftController.beat()方法中不会进行什么重要的处理,会直接调用到receivedBeat(JsonNode beat)方法中
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 解析请求参数
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer");
remote.ip = peer.get("ip").asText();
remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
remote.term.set(peer.get("term").asLong());
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();
// 接收到的心跳包如果不是leader发送的则抛异常
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
// 如果本机的term 大于 心跳包term,则心跳包不进行处理
if (local.term.get() > remote.term.get()) {
Loggers.RAFT
.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
throw new IllegalArgumentException(
"out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
// 当前节点的状态如果不是follower,那么就把自己的节点状态改为follower,voteFor改为请求参数的ip
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
// 心跳中的核心数据包
final JsonNode beatDatums = beat.get("datums");
// 重置选举时间和心跳发送时间
local.resetLeaderDue();
local.resetHeartbeatDue();
// 更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader会通过心跳通知其他节点更新leader)
peers.makeLeader(remote);
if (!switchDomain.isSendBeatOnly()) {
// 该集合中存放的是Datum.key,其中Integer的值为0表示Datum.key是本机的,如果为1则表示是远程心跳包中的
// 经过下面方法的处理之后,receivedKeysMap中可能还会存在一些Integer的值为0,而这些数据意味着在leader节点中这些数据已经被删除了
// 所以在该方法的最后会有一个利用 该集合 进行移除Datum的操作
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
// 将本机中的datum对象都存入集合中,value都是0
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums
// 创建一个批处理集合
List<String> batch = new ArrayList<>();
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT
.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
// 遍历心跳包中的beatDatums
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;
// 得到datumKey
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.get("timestamp").asLong();
// 把datumKey存入到receivedKeysMap中去
receivedKeysMap.put(datumKey, 1);
try {
// 远程心跳包中的datumKey是否在我本机的datums集合中存在,并且本机该datumKey对应的时间戳也比远程心跳包中的时间戳更大
// 这就表示我不需要处理这一条datumKey
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
// 和上面的if判断逻辑相反,则需要添加进batch这个集合中
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
// 批量处理逻辑,表示我的batch集合还能继续存放数据
// 假如远程心跳包传递过来了100条datumKey 难道我要一条一条的去处理吗?所以肯定是需要批处理的
// processedCount的数值最终是 等于 beatDatums集合中的个数,表示真正处理beatDatums中的最后一个元素
// 第二个条件是 假如的传递过来的总数只有10条嘞?那岂不是就直接遍历完跳出循环了,batch集合中存放的10条数据都还没有进行处理
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
// 当batch集合存放的数据超过50后
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
processedCount, beatDatums.size(), datums.size());
// update datum entry
// 调用远程心跳包发送过来的leader节点ip,把我需要的datumKey集合发送给leader
// 然后对leader返回的数据进行处理,写入到本机的磁盘文件和内存中
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
List<JsonNode> datumList = JacksonUtils
.toObj(result.getData(), new TypeReference<List<JsonNode>>() {
});
// 从leader中接收到本机需要的datum集合,进行遍历
for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
// 先获取本机中老的Datum
Datum oldDatum = getDatum(datumJson.get("key").asText());
// 如果远程发送过来Datum的时间戳比本机的时间戳还小,那么就不用处理
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) {
Loggers.RAFT
.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.get("key").asText(),
datumJson.get("timestamp").asLong(), oldDatum.timestamp);
continue;
}
// 解析成newDatum
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
// 先把newDatum写入到磁盘文件中
// 文件目录一般是 \nacosHome\data\naming\data\namespaceId\XXXX文件
raftStore.write(newDatum);
// 更新内存中的数据,先修改datums的数据
datums.put(newDatum.key, newDatum);
// 再去修改注册标中的数据
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
// 本机重置leaderDueMs时间
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT
.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
e);
} finally {
OPERATE_LOCK.unlock();
}
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
}
return;
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
}
@Override
public void onCancel() {
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
// 此时receivedKeysMap集合中Integer还为0的Datum意味着:leader节点中已经删除了这些Datum
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
// 移除Datum,先删除内存中datums,在删除磁盘文件
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
return local;
}