微服务框架
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】
微服务面试篇
文章目录
- 微服务框架
- 微服务面试篇
- 54 微服务篇
- 54.3 Nacos 如何支撑数十万服务注册压力?
- 54.3.1 Nacos 服务端源码
54 微服务篇
54.3 Nacos 如何支撑数十万服务注册压力?
54.3.1 Nacos 服务端源码
上次我们在 源码里面 看了Nacos 服务注册表的结构
下面来看看 它到底是怎么形成 这样一种结构,并且 把数据存进去 的?
回到请求入口
详细解读 一下
微服务 启动的时候就会向这个接口发起注册 请求POST
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 【从request 中获取namespaceId】
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 【获取服务名称】
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 把request 中的参数封装为Instance【实例】 对象
final Instance instance = parseInstance(request);
// 【注册实例】
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
OK, 这个方法大致就算完了 ,跟进 registerInstance 方法
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 如果是 第一次,则创建空的服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 从注册表里,拿到Service
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加实例到 Service 中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
OK,简单看看初始化 的动作createEmptyService 这个方法
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
OK, 又调了一个方法
其实就在下面
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 尝试从注册表中 获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
// 如果为空,则说明该服务是 第一次注册,则创建新的Service
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
// 记录服务最后一次 更新的时间【其实就是当前时间】
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
//把服务放入 注册表
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
再仔细 看看putServiceAndInit 这个方法
private void putServiceAndInit(Service service) throws NacosException {
//把Service 放入注册表
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//初始化【健康检测】
service.init();
// 服务状态变更的 监听
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
到这儿 ,其实一个 空的服务,就完全初始化完毕 了【即 createEmptyService 方法就算完成了 】
接下看下去 ,就是热乎的服务从注册表 中拿出来
判空
最后添加实例 到Service 中
跟进最后一个addInstance 方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 给当前服务生成 一个唯一标识【可以理解为 serviceId】
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 从注册表 中拿到Service
Service service = getService(namespaceId, serviceName);
// service为锁对象,同一个服务的多个实例,只能串行 来完成注册【避免了并发的 修改】
synchronized (service) {
// 拷贝注册表中 旧的实例列表,然后结合新注册 的实例,得到最终的实例列表【COPY】
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 封装 新实例列表到 instances 对象中
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//consistency 一致性。【更新注册表 (① 更新本地注册表 ② 同步给Nacos 集群中的其他节点)】【最耗时的步骤】
consistencyService.put(key, instances);
}
}
再跟进最后这个 put 方法
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
OK,再进这个 put 方法
@Override
public void put(String key, Record value) throws NacosException {
// 更新本地注册表 【异步的】【后面的“死循环”】
onPut(key, value);
// “同步(bushi)”给Nacos 中的其他节点【同时 异步的将数据同步给Nacos 集群中的其他节点】
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
继续跟进 onPut 方法
public void onPut(String key, Record value) {
// 判断是否是 临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 封装实例列表 封装到Datum
Datum<Instances> datum = new Datum<>();
// value 是服务中的实例列表 Instances
datum.value = (Instances) value;
// key 是serviceId
datum.key = key;
datum.timestamp.incrementAndGet();
// 以serviceId 为key,以 Datum为值,缓存起来
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 把serviceId 和 当前的 操作类型存入task 列表notifier
notifier.addTask(key, DataOperation.CHANGE);
}
再进到 addTask 这个方法
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 把serviceId 和事件【放入】 阻塞队列【“活儿接了,不忙做”】
tasks.offer(Pair.with(datumKey, action));
}
回到上一个onPut 方法,
notifier 这个东西到底是个啥
看看当前这个 类 init()方法,往上滑
@PostConstruct
public void init() {
// 利用线程池,执行 notifier
GlobalExecutor.submitDistroNotifyTask(notifier);
}
这下直接 跟入 notifier 的run 方法中
厉害的是,居然是个 死循环,妙啊
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 死循环
for (; ; ) {
try {
// 从阻塞队列 找那个获取任务, 6666666
Pair<String, DataOperation> pair = tasks.take();
// 执行任务,更新服务列表
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
OK,到这里onPut方法 算完事儿了
继续下面 这个 同步方法,跟进去
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 获取Nacos 中的所有成员,除了我自己
for (Member each : memberManager.allMembersWithoutSelf()) {
//【遍历】
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
OK,该回答问题了
问题说明:考察对Nacos源码的掌握情况
难易程度:难
参考话术:
Nacos内部接收到注册的请求时,不会立即写数据,而是将服务注册的任务放入一个阻塞队列就立即响应给客户端。然后利用线程池读取阻塞队列中的任务,异步【异步是设计最精妙的】来完成实例更新,从而提高并发写能力。