Distro协议背景
Distro 协议是 Nacos 社区自研的⼀种 AP 分布式协议,是面向临时实例设计的⼀种分布式协议,
其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。作为⼀种有状态
的中间件应用的内嵌协议,Distro 保证了各个 Nacos 节点对于海量注册请求的统⼀协调和存储。
设计思想
1.Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
2. 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。
3. 每个节点独立处理读请求,及时从本地发出响应。
原理
数据初始化
新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机
器发送请求拉取全量数据。
在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数
据。
数据校验
在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据
的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在⼀个较低水平)。这
种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起⼀次数据校验请求。
⼀旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不⼀致,则会发起⼀次全量拉
取请求,将数据补齐。
写操作
对于⼀个已经启动完成的 Distro 集群,在⼀次客户端发起写操作的流程中,当注册非持久化的实例
的写请求打到某台 Nacos 服务器时,Distro 集群处理的流程图如下
整个步骤包括几个部分(图中从上到下顺序):
前置的 Filter 拦截请求,并根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点,
并将该请求转发到所属的 Distro 责任节点上。
责任节点上的 Controller 将写请求进行解析。
Distro 协议定期执行 Sync 任务,将本机所负责的所有的实例信息同步到其他节点上。
读操作
由于每台机器上都存放了全量数据,因此在每⼀次读操作中,Distro 机器会直接从本地拉取数据。
快速响应。
这种机制保证了 Distro 协议可以作为⼀种 AP 协议,对于读操作都进行及时的响应。在网络分区
的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的
数据进行合并恢复。
Distro总结
Distro 协议是 Nacos 对于临时实例数据开发的⼀致性协议。其数据存储在缓存中,并且会在启动
时进行全量数据同步,并定期进行数据校验
在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请
求场景主要分为三种情况:
- 当该节点接收到属于该节点负责的实例的写请求时,直接写入。
- 当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,
从而完成读写。 - 当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机
器上)。
Distro 协议作为 Nacos 的内嵌临时实例⼀致性协议,保证了在分布式环境下每个节点上面的服务
信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和⼀致性。
Distro寻址
什么是Distro寻址
寻址是指当检测到Naocs集群节点变化时能后及时更新节点信息
寻址模式
Nacos支持两种寻址模式分别为「文件寻址」和「地址服务器寻址」
文件寻址
默认为文件寻址,可以通过参数「nacos.core.member.lookup.type」设置取值为「file」或者「address-server」
文件寻址路径默认为 「${user.home}/nacos/conf/cluster.conf」
文件寻址cluster.conf配置文件的内容格式为「ip1:port,ip2:port」
地址寻址
地址服务器寻址默认为:http://jmenv.tbsite.net:8080/serverlist;其中域名、端口、url均可自定义
检测到集群节点变更时会更新缓存并发布MembersChangeEvent事件
为防止新节点没有初始化好,当检测到新节点加入时先设置该节点状态为DOWN,该节点不参与通信
过几秒通过节点之间通信将已初始化的新节点状态由DOWN设置为UP,该节点正式参与通信
Distro启动
DistroClientComponentRegistry和DistroHttpRegistry会在
Bean构造完成之后会执行doRegister方法进行Distro的注册
// DistroClientComponentRegistry
@PostConstruct
public void doRegister() {
// 创建distro处理器
DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol,
upgradeJudgement);
// 创建distro传输代理对象
DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
serverMemberManager);
// 创建失败处理器,该处理器主要是添加失败重试任务
DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
// 注册Nacos:Naming:v2:ClientData类型数据的数据仓库实现
componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
// 注册Nacos:Naming:v2:ClientData类型的DistroData数据处理器
componentHolder.registerDataProcessor(dataProcessor);
// 注册Nacos:Naming:v2:ClientData类型数据的数据传输代理对象实现
componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
// 注册Nacos:Naming:v2:ClientData类型的失败任务处理器
componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
}
// DistroHttpRegistry
@PostConstruct
public void doRegister() {
// 创建distro数据存储对象
componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroDataStorageImpl(dataStore, distroMapper));
// 创建distro传输代理对象http
componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
// 创建失败处理器,该处理器主要是添加失败重试任务
componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));
// 添加distro的http延时任务处理器
taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
// 注册数据处理器
componentHolder.registerDataProcessor(consistencyService);
}
DistroTaskEngineHolder
Distro的任务执行引擎他去做Distro的执行
这个类中有一个延时执行引擎和一个立即执行引擎,在创建类时会给延时执行引擎设置一个默认的任务处理器。DistroHttpRegistry中会注册一个任务处理器,则v1版本的话不会使用默认的任务处理器。
@Component
public class DistroTaskEngineHolder {
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
// 延时任务引擎设置默认任务处理器
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
return delayTaskExecuteEngine;
}
public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
return executeWorkersManager;
}
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}
}
DistroProtocol 加了@Component注解所以在bean实例化的时候会调用构造方法
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
//启动Distro任务
startDistroTask();
}
private void startDistroTask() {
// 判断是否是单机模式,如果是单机模式,不需要进行同步 直接设置初始化完成
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
// 开启验证任务
startVerifyTask();
// 开始加载全量拉取distro数据快照
startLoadTask();
}
Distro 定时校验同步节点数据
DistroProtocol
private void startVerifyTask() {
//开启一个定时线程池 定时五秒执行执行DistroVerifyTimedTask的run方法 去做校验同步
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
distroTaskEngineHolder.getExecuteWorkersManager()),
DistroConfig.getInstance().getVerifyIntervalMillis());
}
DistroVerifyTimedTask
@Override
public void run() {
try {
//获取集群其他节点
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
//遍历数据类型 在Nacos server启动时初始化时两种类型HTTP和gRPC
for (String each : distroComponentHolder.getDataStorageTypes()) {
// 对dataStorage内的数据进行验证
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
// 根据类型拿到对应的数据存储器 这里以GRPC为例
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
// 判断该存储器有没有初始化完成(在启动时会全量拉取数据,拉去完则会设置FinishInitial为true)
if (!dataStorage.isFinishInitial()) {
Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
dataStorage.getClass().getSimpleName());
return;
}
// 拿到要校验的distro数据集合
List<DistroData> verifyData = dataStorage.getVerifyData();
if (null == verifyData || verifyData.isEmpty()) {
return;
}
for (Member member : targetServer) {
// 获取到对应的传输代理http or rpc
DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
if (null == agent) {
continue;
}
// 获取到distro传输代理对象,依次添加任务到立即执行引擎
executeTaskExecuteEngine.addTask(member.getAddress() + type,
new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
}
}
@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>(); // 一组DistroData
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) { // 无效client或者非临时节点
continue;
}
// 判断client是否为本几点负责的逻辑为ClientManagerDelegate#isResponsibleClient。即:属于ConnectionBasedClient并且isNative为true表示该client是直连到该节点的。
if (clientManager.isResponsibleClient(client)) {
// 构造Verify Data 主要信息为clientId,还有一个版本信息作为保留字段,目前都是0。
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); // 序列化校验数据
data.setType(DataOperation.VERIFY);
result.add(data);
}
}
return result;
}
DistroVerifyExecuteTask
@Override
public void run() {
//遍历要校验的distro数据集合
for (DistroData each : verifyData) {
try {
// // 判断是否支持回调传输数据 grpc支持回调
if (transportAgent.supportCallbackTransport()) {
// 发送验证数据带回调
doSyncVerifyDataWithCallback(each);
} else {
// 发送验证数据
doSyncVerifyData(each);
}
} catch (Exception e) {
Loggers.DISTRO
.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
}
}
}
private void doSyncVerifyData(DistroData data) {
transportAgent.syncVerifyData(data, targetServer);
}
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
//判断是否存在目标服务器
if (isNoExistTarget(targetServer)) {
return true;
}
// replace target server as self server so that can callback.
verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
//创建请求
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
//获取要发送请求的成员
Member member = memberManager.find(targetServer);
//节点状态检查需UP状态,即:可通信状态
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
return false;
}
try {
//发送请求
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
}
return false;
}
发送RPC请求
public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
if (client != null) {
return client.request(request, timeoutMills);
} else {
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
}
}
客户端处理校验请求
DistroDataRequestHandler
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
//校验请求
case VERIFY:
return handleVerify(request.getDistroData(), meta);
......
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
DistroDataResponse result = new DistroDataResponse();
//如果缓存存在client则校验成功,刷新client保鲜时间,否则校验失败。
if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
}
return result;
}
public boolean onVerify(DistroData distroData, String sourceAddress) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
}
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
// 使用DistroData处理器来处理校验请求
return dataProcessor.processVerifyData(distroData, sourceAddress);
}
DistroClientDataProcessor
@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
//获取Distro客户端校验信息
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
//根据clientId来验证不同的客户端
if (clientManager.verifyClient(verifyData.getClientId())) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
return false;
}
ConnectionBasedClientManager
@Override
public boolean verifyClient(String clientId) {
ConnectionBasedClient client = clients.get(clientId);
// 如果已经存在,则更新最近一次有效连接时间
if (null != client) {
client.setLastRenewTime();
return true;
}
//如果不存在返回false
return false;
}
Distro定时校验总结
节点之间发送校验数据是在全量同步后进行的;发送校验的频率默认为5秒钟一次;校验数据包括clientId和version,其中version为保留字段当前为0;接受到校验数据后如果缓存中存在该client表示校验成功,同时更新保鲜时间,否则校验失败
节点刚加入全量拉取数据快照
启动拉取任务
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}
DistroLoadDataTask
数据初始化读取任务
@Override
public void run() {
try {
//从集群中其他节点全量加载数据
load();
//如果没有加载成功延迟30秒钟重新执行一次,可以通过参数「nacos.core.protocol.distro.data.load_retry_delay_ms」指定
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
从其他节点拉取全量数据
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
//不同的数据类型缓存快照,此处有gRPC和http两类数据类型
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
return false;
}
//获取集群中除了本节点的其他节点,循环重试获取快照,直到有成功节点返回快照,成功后设置状态状态完成初始化
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
//获取集群其他节点数据
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//处理数据同步到本地
boolean result = dataProcessor.processSnapshot(distroData);
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 设置为完成初始化
return true;
}
} catch (Exception e) {
}
}
return false;
}
发起读取数据快照请求
@Override
public DistroData getDatumSnapshot(String targetServer) {
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
DistroDataRequest request = new DistroDataRequest();
// 设置请求操作为SNAPSHOT
request.setDataOperation(DataOperation.SNAPSHOT);
try {
// 发起请求快照数据
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
服务端接收读取快照请求
DistroDataRequestHandler
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
......
case SNAPSHOT:
return handleSnapshot();
......
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
private DistroDataResponse handleSnapshot() {
DistroDataResponse result = new DistroDataResponse();
//读取快照数据
DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
result.setDistroData(distroData);
return result;
}
@Override
public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
// 把本节点的所有client数据全部封装
for (String each : clientManager.allClientId()) {
// 获取对应客户端
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
// 针对客户端生成数据
datum.add(client.generateSyncData());
}
// 封装响应数据
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
// 将数据序列化成json字符串 再转成字节数据
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
// 返回响应数据
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
生成服务数据
client数据信息,命名空间、分组名称、服务名称、节点Instance信息(IP、端口等等)。
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
总结
集群中每个节点都拥有所有的快照数据;在节点启动时会从集群中其他节点中的一个节点同步快照数据并缓存在Map中;缓存的数据类型分类两类分别为HTTP和gRPC;具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。
增量数据同步
当某台机器发现其他机器上的数据与本地数据不⼀致 会触发一次
校验数据失败处理
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
//注意这里有一个DistroVerifyCallbackWrapper的回调类当调用结束之后会调用到onResponse方法
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
clusterRpcClientProxy.asyncRequest(member, request, wrapper);
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
@Override
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
//校验失败处理
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
//发送一个校验失败的事件
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
//更新为失败
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}
DistroClientDataProcessor 订阅了
DistroClientDataProcessor会监听到ClientVerifyFailedEvent事件
@Override
public void onEvent(Event event) {
// 必须是集群
if (EnvUtil.getStandaloneMode()) {
return;
}
//必须是grpc请求
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
//校验失败事件
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
//同步事件
syncToAllServer((ClientEvent) event);
}
}
校验失败任务客户端处理
private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {
// 判断客户端是否存在
Client client = clientManager.getClient(event.getClientId());
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
// 如果存在,则开始进行同步
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
// 这里也是发送的 action为add的事件 注意后续的处理流程 他们最后和syncToAllServer方法一样最后都由DistroDelayTaskProcessor处理
distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 添加到延时任务执行引擎来执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
同步集群的其他节点
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 当客户端断开连接事件ClientDisconnectEvent时,向其他节点同步DELETE操作
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 当客户端变更事件ClientChangedEvent时,向其他节点同步CHANGE操作
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//遍历所有需要同步的节点
for (Member each : memberManager.allMembersWithoutSelf()) {
//执行数据增量更新
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
/**
* Start to sync to target server.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
* @param targetServer target server
* @param delay delay time for sync
*/
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//添加任务让执行器去做执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
后续看任务队列执行同步数据操作
任务队列执行同步数据操作
DistroDelayTaskProcessor会去执行处理延时任务 因为之前发的Action是ADD所以会走ADD的逻辑
@Override
public boolean process(NacosTask task) {
// 不处理非延迟任务
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE:
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
// 添加任务到立即执行任务引擎
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
DistroSyncChangeTask继承了AbstractDistroExecuteTask而AbstractDistroExecuteTask又继承AbstractExecuteTask ,AbstractExecuteTask 内容实现了Runnable所以会执行AbstractDistroExecuteTask的run方法
@Override
public void run() {
String type = getDistroKey().getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
if (null == transportAgent) {
Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
return;
}
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
grpc支持回调
if (transportAgent.supportCallbackTransport()) {
doExecuteWithCallback(new DistroExecuteCallback());
} else {
executeDistroTask();
}
}
@Override
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
//获取DistroData
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return;
}
//执行同步数据
getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
}
DistroClientTransportAgent
@Override
public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
// 构造请求数据并设置数据类型
DistroDataRequest request = new DistroDataRequest(data, data.getType());
// 查找目标节点缓存数据
Member member = memberManager.find(targetServer);
// 节点状态检查需UP状态,即:可通信状态
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
return false;
}
try {
// 向目标节点发送数据
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
}
return false;
}
客户端节点处理增量同步请求
DistroDataRequestHandler ADD、CHANGE、DELETE都是由handleSyncData方法进行处理
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
......
case ADD:
case CHANGE:
case DELETE:
return handleSyncData(request.getDistroData());
......
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
//同步信息
return dataProcessor.processData(distroData);
}
DistroClientDataProcessor
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
//将同步过来的Client信息进行缓存
handlerClientSyncData(clientSyncData);
return true;
//响应删除操作,从clients缓存中移除。
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
移除信息
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
同步信息
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
//Client信息缓存 需要的是从其他节点通过过来的Client信息,ConnectionBasedClient属性isNative为false表示该连接时从其他节点同步过来的;true表示该连接客户端直接连接的。
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
//获取Client信息
Client client = clientManager.getClient(clientSyncData.getClientId());
//更新Client的Service以及Instance信息
upgradeClient(client, clientSyncData);
}