目录
1. 配置引起变更的两种方式
1.1 后台管理直接操作
1.2 NacosClient 调用 RPC 接口
2. 变更事件处理 AsyncNotifyService
2.1 HTTP 任务
2.2 RPC任务
2.3 NacosServer 其他节点接收到消息后如何处理
3. 客户端推送实现:DumpService.dump
接着上一篇 Nacos 配置中心源码讲解 继续
当配置发生变更时,NacosServer 会对配置变更的客户端主动推送消息。
那么 Nacos 是如何实现的呢?
1. 配置引起变更的两种方式
配置变更?配置如何才会产生变化呢?有两种方式
1.1 后台管理直接操作
在后台管理修改配置并发布时就发生了变更,而在发布配置接口上可见以下源码:
com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));
这段代码代表发布了一个事件:ConfigDataChangeEvent 配置数据改变事件。
1.2 NacosClient 调用 RPC 接口
NacosClient 操作发布配置
NacosServer端源码
com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler#handle
可见 RPC Server 端,最终同样会发布 ConfigDataChangeEvent 配置数据改变事件。
2. 变更事件处理 AsyncNotifyService
事件发出了,那就得有人处理
在 Nacos 中,负责处理的类就是 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService
可见在 AsyncNotifyService 类构造器中,注册了专门处理 ConfigDataChangeEvent 的消费者,onEvent 方法将回调所有发送过来的事件。
看看 onEvent 如何处理 ConfigDataChangeEvent 事件的:
@Override
public void onEvent(Event event) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
// 获取到 event 上携带的数据
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
// 统计数据累加
MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
// 获取到整个集群中的所有成员节点
Collection<Member> ipList = memberManager.allMembers();
// 任务队列,一个 HTTP 请求任务、一个 RPC 请求任务
Queue<NotifySingleTask> httpQueue = new LinkedList<>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
// 循环所有成员,决定好每个成员使用什么类型任务
for (Member member : ipList) {
if (!MemberUtil.isSupportedLongCon(member)) {
// 当前集群成员节点是不是支持长连接,不支持放入 HTTP 队列里
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
// 支持长连接放入 RPC 队列里
rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
// 也就是说,上面根据每个成员的特点创建了他们使用的任务类型的 task
// 现在开始使用小程序执行两个队列里的任务
if (!httpQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
}
if (!rpcQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
任务分为两种类型, HTTP 任务、 RPC 任务
2.1 HTTP 任务
首先先看简单一点的 HTTP 任务, 见上方 onEvent 源码,HTTP 任务传入了新建的 AsyncTask 对象里
看看 AsyncTask 内部做了什么:
class AsyncTask implements Runnable {
private Queue<NotifySingleTask> queue;
private NacosAsyncRestTemplate restTemplate;
public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {
this.restTemplate = restTemplate;
this.queue = queue;
}
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
// 从队列中取出一个任务
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
// 进来了代表节点还在线
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) {
// 节点不健康了,任务延迟执行
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);
// 延迟执行任务
asyncTaskExecute(task);
} else {
// 构建请求头
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
// 添加 认证 请求头
AuthHeaderUtil.addIdentityToHeader(header);
// 发送 HTTP 请求
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
}
2.2 RPC任务
class AsyncRpcTask implements Runnable {
private Queue<NotifySingleRpcTask> queue;
public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!queue.isEmpty()) {
// 取出一个任务
NotifySingleRpcTask task = queue.poll();
// 构建一个请求对象
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId(task.getDataId());
syncRequest.setGroup(task.getGroup());
syncRequest.setBeta(task.isBeta);
syncRequest.setLastModified(task.getLastModified());
syncRequest.setTag(task.tag);
syncRequest.setTenant(task.getTenant());
Member member = task.member;
if (memberManager.getSelf().equals(member)) {
// 节点如果是 自己
if (syncRequest.isBeta()) {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP(), true);
} else {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
}
continue;
}
if (memberManager.hasMember(member.getAddress())) {
// 节点还没下线
boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
if (unHealthNeedDelay) {
// 节点不健康了,任务延迟执行
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, member.getAddress());
// 延迟执行任务
asyncTaskExecute(task);
} else {
if (!MemberUtil.isSupportedLongCon(member)) {
// 如果当前节点不支持长连接,就使用 HTTP 方式
// 上面判断过长连接,这里再判断是因为不止上面一种方式创建这个任务
asyncTaskExecute(
new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
task.getLastModified(), member.getAddress(), task.isBeta));
} else {
try {
// 发送 RPC 请求
configClusterRpcClientProxy
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
// 统计数据维护
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}
}
}
} else {
}
}
}
}
以上两种方式本质都是发送了一个请求,HTTP 方式发送了HTTP 请求,RPC 长连接方式发送了一个 RPC 请求,并且专门处理了当前节点的逻辑。
2.3 NacosServer 其他节点接收到消息后如何处理
看看 HTTP 方式调用的 接口:
/communication/dataChange
最后调用了 dumpService.dump 方法
RPC 方式发送的请求后,以下是接收端的处理方法:
发送请求时判断了如果是当前节点则进行的处理:
最后调用了 dumpService.dump 方法
画个图总结下现在的情况:
为什么配置变更了要通知集群中的全部节点呢?
这里并不是同步什么数据,而是因为客户端可能注册到不同的集群节点上,而如果只推送当前节点上注册的客户端,那会导致其他客户端明明也监听了某个配置,但是配置变化了却无法推送过来。
因为集群中的每个节点可能都接受不同客户端的连接
3. 客户端推送实现:DumpService.dump
DumpService.dump
这个方法就是关键之处了, 因为它完成了向客户端推送功能,看看如何实现:
// com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)
private TaskManager dumpTaskMgr;
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
最终调用了以下代码:
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
// 将任务放入 ConcurrentHashMap 中
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
}
该方法流程结束,最终就是将任务放入 map 中。
放入了任务,那就得有取出来处理的流程。
还是同样的类,看看它的构造初始化方法
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
private final ScheduledExecutorService processingExecutor;
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
// map 初始化
tasks = new ConcurrentHashMap<>(initCapacity);
// 创建定时线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 定时线程池开始执行
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
}
重点在线程池的执行这里
参数 processInterval 为 100L,也就代表延迟 100毫秒后每隔100毫秒执行一次
接着 看看 ProcessRunnable ,这个定时任务的内容:
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
// 处理任务
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
// 处理任务
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// 获取到任务的处理器
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 开始处理任务
if (!processor.process(task)) {
// 如果任务返回 false 失败,就重试
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error ", e);
retryFailedTask(taskKey, task);
}
}
}
}
上述代码找出当前任务的 processor 后,就继续 process 处理
看看它怎么处理的:
public class DumpChangeProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
long startUpdateMd5 = System.currentTimeMillis();
// 从数据库查询全部配置的 MD5 值
List<ConfigInfoWrapper> updateMd5List = configInfoPersistService.listAllGroupKeyMd5();
for (ConfigInfoWrapper config : updateMd5List) {
final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());
// 更新不一样的 MD5 值,并且发布 LocalDataChangeEvent 本地数据改变事件
ConfigCacheService
.updateMd5(groupKey, config.getMd5(), config.getLastModified(), config.getEncryptedDataKey());
}
// 代码省略
return true;
}
}
重点看 updateMd5 方法
// com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey){
CacheItem cache = makeSure(groupKey, encryptedDataKey, false);
if (cache.md5 == null || !cache.md5.equals(md5)) {
// 更新本地缓存信息
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
// 发布事件:本地数据改变
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
如果比对出 md5 不一致,代表数据发生了改变,就继续发布 LocalDataChangeEvent 事件。
看看 LocalDataChangeEvent 处理方法:
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
@Override
public void onEvent(LocalDataChangeEvent event) {
String groupKey = event.groupKey;
boolean isBeta = event.isBeta;
List<String> betaIps = event.betaIps;
String[] strings = GroupKey.parseKey(groupKey);
String dataId = strings[0];
String group = strings[1];
String tenant = strings.length > 2 ? strings[2] : "";
String tag = event.tag;
// 配置数据改变
configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
}
// 配置数据改变
public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,List<String> betaIps, String tag) {
// 获取到当前配置的所有监听器
Set<String> listeners = configChangeListenContext.getListeners(groupKey);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
int notifyClientCount = 0;
// 循环推送
for (final String client : listeners) {
Connection connection = connectionManager.getConnection(client);
if (connection == null) {
continue;
}
ConnectionMeta metaInfo = connection.getMetaInfo();
// beta ips check.
String clientIp = metaInfo.getClientIp();
String clientTag = metaInfo.getTag();
if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
continue;
}
// tag check
if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
continue;
}
// 构建一个请求对象
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
// 创建一个 RPC 推送任务
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
// 开始推送
push(rpcPushRetryTask);
// 推送客户端次数累计
notifyClientCount++;
}
Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
}
}
重点当前是最后 push 方法了 :
// com.alibaba.nacos.config.server.remote.RpcConfigChangeNotifier#push
/**
* 推送一个任务
*/
private void push(RpcPushTask retryTask) {
// 获取到请求
ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
if (retryTask.isOverTimes()) {
// 已经超过重试次数
Loggers.REMOTE_PUSH.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.", notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), retryTask.connectionId);
connectionManager.unregister(retryTask.connectionId);
} else if (connectionManager.getConnection(retryTask.connectionId) != null) {
// 连接还存在,客户端还在线
// 开始使用线程池执行任务
ConfigExecutor.getClientConfigNotifierServiceExecutor()
.schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
} else {
// client is already offline, ignore task.
}
}
看到 RpcPushTask 任务放到了线程池中执行,那就看看 RpcPushTask 任务内容:
class RpcPushTask implements Runnable {
/**
* 请求对象
*/
ConfigChangeNotifyRequest notifyRequest;
/**
* 最大重试次数
*/
int maxRetryTimes = -1;
/**
* 已经尝试次数
*/
int tryTimes = 0;
String connectionId;
String clientIp;
String appName;
public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId,
String clientIp, String appName) {
this.notifyRequest = notifyRequest;
this.maxRetryTimes = maxRetryTimes;
this.connectionId = connectionId;
this.clientIp = clientIp;
this.appName = appName;
}
/**
* 是否超过重试次数 true 超过
*/
public boolean isOverTimes() {
return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes;
}
@Override
public void run() {
// 已尝试次数 +1
tryTimes++;
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH);
if (!tpsControlManager.check(tpsCheckRequest).isSuccess()) {
// 检查失败了
push(this);
} else {
// ======================================
// pushWithCallback 推送给客户端 !!
// ======================================
rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
@Override
public void onSuccess() {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_SUCCESS);
tpsControlManager.check(tpsCheckRequest);
}
@Override
public void onFail(Throwable e) {
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();
tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_FAIL);
tpsControlManager.check(tpsCheckRequest);
Loggers.REMOTE_PUSH.warn("Push fail", e);
push(RpcPushTask.this);
}
}, ConfigExecutor.getClientConfigNotifierServiceExecutor());
}
}
}
}
可见,这里完成了 服务端 向客户端推送数据
查看请求字段,可见,并没有推送配置变更的内容,从上一章就可以看出,客户端收到推送的消息后,会主动对变更的数据进行拉取操作
public class ConfigChangeNotifyRequest extends ServerRequest {
String dataId;
String group;
String tenant;
}
下一篇讲解 SpringCloud 和 Nacos 配置中心整合源码