前言
一直都想写SpringCloudAlibaba的源码分析,终于开始动手第一篇了,如果想要看懂Nacos源码至少要把《SpringBoot自动》配置看了,不然就是看天书。本篇文章呢带大家一起来看一下Nacos-Client 客户端服务注册这一部分的源码。
基础环境
首先需要下载一个Nacos-server,也就是注册中心了,顺便把源码也下载了, 后面我们在分析服务端的时候会用到, 下载地址 ,https://github.com/alibaba/nacos/releases/tag/1.4.3
下周好之后,把nacos-server解压启动,进入 bin目录,cmd执行 startup.cmd -m standalone ,代表单实例启动。如果嫌麻烦可以把 startup.cmd 中的 set MODE=“cluster” 改成 standalone ,以后就直接启动startup不再加参数了。
接下来是客户端了,我的项目结构如下:
我这里使用的SpringBoot版本是2.2.5 ; alibaba的版本是 2.2.1 ;父工程pom.xml管理如下
<parent>
<groupId> org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<!--SpringClou-alibaba依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SpringCloud依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
nacos-client 依赖如下
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
启动类如下
//Ncaos客户端应用程序启动
@SpringBootApplication
public class NacosClientStarter {
public static void main(String[] args) {
SpringApplication.run(NacosClientStarter.class);
}
}
配置文件中配置了一个服务名和 nacos注册中心地址
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
application:
name: nacos-client
然后启动项目,nacos-client就会被注册到Nacos注册中心,如下:
OK到这里基础环境准备好了,相信对于你来说也是比较容易的事情。
服务注册源码分析
Nacos和Eureka都拥有服务注册,服务发现,心跳续约等功能,本篇文章主要讨论服务注册,也就是客户端启动时会主动向服务端注册自己,那么在服务端会形成一个服务注册表(其实就是一个Map)下面我们就来研究一下服务注册的流程是如何的。
对SpringBoot有一定研究的同学都清楚SpringBoot的自动配置,SpringCloud或者说SpringCloudAlibaba的全是基于SpringBoot的自动配置来完成相关组件的装载。在External libraries 中找到我们导入的依赖 spring-cloud-starter-alibaba-nacos-discovery
然后展开找到META-INF/spring.factories 文件,内容如下:
NacosServerRegistryAutoConfiguration 是针对于服务注册的自动配置类。程序启动该类中的配置自动生效。除了该配置类,文件中还包括针对于服务发现的配置类和 config 的自动配置类。
NacosServerRegistryAutoConfiguration 服务注册自动配置
下面是NacosServerRegistryAutoConfiguration 的源码
@Configuration(
proxyBeanMethods = false
)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
//条件开关
@ConditionalOnProperty(
value = {"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class})
public class NacosServiceRegistryAutoConfiguration {
public NacosServiceRegistryAutoConfiguration() {
}
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosRegistration nacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
//开启自动注册
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}
-
spring.cloud.service-registry.auto-registration.enabled:自动注册条件开关,默认是true
-
NacosServiceRegistry : Nacos服务注册器,客户端往服务端发送注册请求的整个过程是通过它来完成的,它实现于 ServiceRegistry 接口(EurekaClient也是的哦) 。复写了register服务注册方法。
-
NacosRegistration :可以看做是对要注册是服务的等级,其中包括服务的实例ID,注册地址等等
-
NacosAutoServiceRegistration :自动注册器,服务注册由它来触发的
-
NacosDiscoveryProperties :针对于Nacos的配置属性对象
-
NacosRegistration :根据nacos配置创建的注册的服务的登记对象,其中包含了注册的服务的serviceId,host,port等等
NacosAutoServiceRegistration 自动注册
NacosAutoServiceRegistration 继承了 AbstractAutoServiceRegistration,通过监听 WebServerInitializedEvent web服务初始化事件来调用 NacosAutoServiceRegistration#register 触发自动注册。该方法中会判断 register-enabled: true 开关来决定是否注册。如果开启自动注册,就会最终来调用 NacosServiceRegistry#register 方法触发服务注册。
start()方法内部会调用 NacosAutoServiceRegistration #register方法
而NacosAutoServiceRegistration #register 做了是否开启注册配置(spring.cloud.nacos.discovery.register-enabled=true)开关后,就调用super的注册方法注册,下面是 AbstractAutoServiceRegistration#register源码
protected void register() {
this.serviceRegistry.register(getRegistration());
}
这里的 serviceRegistry 就是最前面说到的 NacosServiceRegistry ,核心服务注册流程就在他里面。
NacosServiceRegistry 服务注册流程
下面是 NacosServiceRegistry#register 的源码
@Override
public void register(Registration registration) {
//判断是否有服务ID,也就是spring.application.name配置不能为空
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
//拿到服务ID
String serviceId = registration.getServiceId();
//拿到服务的组名
String group = nacosDiscoveryProperties.getGroup();
//封装好的服务实例
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//注册实例
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
该方法会先判断服务 serviceId也就是服务名不能为空,然后拿到 group和Instance(服务实例对象),调用 NamingService#registerInstance来注册服务。下面是 NacosNamingService#registerInstance的源码
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
这里就看的比较清楚了,这里会把服务的ip,端口,服务名等信息封装到 BeatInfo 对象中,beatReactor.addBeatInfo是把当前服务实例加入心跳机制(心跳续约),然后通过serverProxy.registerService注册。下面是NamingProxy#registerService的源码
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(9);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, "POST");
}
...省略...
public String reqAPI(String api, Map<String, String> params, String body, String method) throws NacosException {
return this.reqAPI(api, params, body, this.getServerList(), method);
}
最终把相关配置使用一个Map进行封装,然后交给reqAPI方法去发送POST请求给服务端,
- /nacos/v1/ns/instance : 是nacos注册的接口,拼接上yaml中的nacos地址就是完成的注册地址了
- this.getServerList() : 拿到所有的nacos服务端的地址,考虑到nacos-server集群的情况
代码最终来到reqAPI方法
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new NacosException(400, "no server available");
} else {
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
//使用随机数随机取一个nacos-server
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
int i = 0;
while(i < servers.size()) {
String server = (String)servers.get(index);
try {
//发送请求了
return this.callServer(api, params, body, server, method);
} catch (NacosException var13) {
exception = var13;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
}
index = (index + 1) % servers.size();
++i;
}
}
}
if (StringUtils.isNotBlank(this.nacosDomain)) {
int i = 0;
while(i < 3) {
try {
return this.callServer(api, params, body, this.nacosDomain, method);
} catch (NacosException var12) {
exception = var12;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
}
++i;
}
}
}
LogUtils.NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", new Object[]{api, servers, exception.getErrCode(), exception.getErrMsg()});
throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
}
代码不难理解,reqAPI方法中会随机从多个Nacos服务器地址中取一个,然后发起注册请求,通过callServer方法完成。
public String callServer(String api, Map<String, String> params, String body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0L;
this.injectSecurityInfo(params);
List<String> headers = this.builderHeaders();
String url;
//拼接完整的URL: http://127.0.0.1:8848/nacos/v1/ns/instance
if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
if (!curServer.contains(":")) {
curServer = curServer + ":" + this.serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
} else {
url = curServer + api;
}
//调用nacos的http客户端执行请求
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, "UTF-8", method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
if (200 == result.code) {
return result.content;
} else if (304 == result.code) {
return "";
} else {
throw new NacosException(result.code, result.content);
}
}
方法中会对注册的地址进行拼接,如: http://127.0.0.1:8848/nacos/v1/ns/instance,然后低着参数,调用HttpClient.request执行请求;下面是:com.alibaba.nacos.client.naming.net.HttpClient#request 的源码,看得出来底层还是通过 JDK自带的HttpURLConnection来发送请求的
public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {
HttpURLConnection conn = null;
HttpResult var8;
try {
String encodedContent = encodingParams(paramValues, encoding);
url = url + (StringUtils.isEmpty(encodedContent) ? "" : "?" + encodedContent);
conn = (HttpURLConnection)(new URL(url)).openConnection();
setHeaders(conn, headers, encoding);
conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
conn.setReadTimeout(TIME_OUT_MILLIS);
conn.setRequestMethod(method);
conn.setDoOutput(true);
if (StringUtils.isNotBlank(body)) {
byte[] b = body.getBytes();
conn.setRequestProperty("Content-Length", String.valueOf(b.length));
conn.getOutputStream().write(b, 0, b.length);
conn.getOutputStream().flush();
conn.getOutputStream().close();
}
conn.connect();
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("Request from server: " + url);
}
var8 = getResult(conn);
return var8;
} catch (Exception var14) {
try {
if (conn != null) {
LogUtils.NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from " + InetAddress.getByName(conn.getURL().getHost()).getHostAddress());
}
} catch (Exception var13) {
LogUtils.NAMING_LOGGER.error("[NA] failed to request ", var13);
}
LogUtils.NAMING_LOGGER.error("[NA] failed to request ", var14);
var8 = new HttpResult(500, var14.toString(), Collections.emptyMap());
} finally {
IoUtils.closeQuietly(conn);
}
return var8;
}
BeatReactor#addBeatInfo心跳续约
在 NacosNamingService#registerInstance方法中把服务信息封装为一个 BeatInfo ,然后加入this.beatReactor.addBeatInfo 心跳机制。我们来看一下心跳是如何做的,下面是beatReactor.addBeatInfo的源码
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
this.dom2Beat.put(key, beatInfo);
//线程池,定时任务,5000毫秒发送一次心跳。beatInfo.getPeriod()是定时任务执行的频率
this.executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
//心跳任务
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
public void run() {
if (!this.beatInfo.isStopped()) {
long nextTime = this.beatInfo.getPeriod();
try {
JSONObject result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = (long)result.getIntValue("clientBeatInterval");
boolean lightBeatEnabled = false;
if (result.containsKey("lightBeatEnabled")) {
lightBeatEnabled = result.getBooleanValue("lightBeatEnabled");
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0L) {
nextTime = interval;
}
int code = 10200;
if (result.containsKey("code")) {
code = result.getIntValue("code");
}
if (code == 20404) {
Instance instance = new Instance();
instance.setPort(this.beatInfo.getPort());
instance.setIp(this.beatInfo.getIp());
instance.setWeight(this.beatInfo.getWeight());
instance.setMetadata(this.beatInfo.getMetadata());
instance.setClusterName(this.beatInfo.getCluster());
instance.setServiceName(this.beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
} catch (Exception var10) {
}
}
} catch (NacosException var11) {
LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JSON.toJSONString(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
}
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
和Eureka一样,心跳也是通过线程池 ScheduledExecutorService 来实现的,时间频率默认是5秒一次。
- BeatInfo : 心跳续约的对象,其中包括服务的IP,端口,服务名,权重等
- executorService.schedule :定时任务,beatInfo.getPeriod()是定时任务执行频率,默认是5000 毫秒发送一次心跳续约请求到NacosServer
- BeatTask :是一个Runnable线程,run方法中会调用 BeatReactor.this.serverProxy.sendBeat 发送心跳请求。
BeatTask作为心跳续约的线程对象,他的run方法中 通过 BeatReactor.this.serverProxy.sendBeat发送心跳,如果发现服务未注册会通过 BeatReactor.this.serverProxy.registerService 注册服务。
下面是 com.alibaba.nacos.client.naming.net.NamingProxy#sendBeat 发送心跳的方法
public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap(8);
String body = "";
if (!lightBeatEnabled) {
try {
body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8");
} catch (UnsupportedEncodingException var6) {
throw new NacosException(500, "encode beatInfo error", var6);
}
}
params.put("namespaceId", this.namespaceId);
params.put("serviceName", beatInfo.getServiceName());
params.put("clusterName", beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = this.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, "PUT");
return JSON.parseObject(result);
}
这里也是会拼接好心跳的地址 :127.0.0.1:8848/nacos/v1/ns/instance/beat ,参数包括namespaceId命名空间ID;serviceName 服务ing;clusterName 集群名;ip 服务的IP;port 端口。然后发送一个PUT请求。底层依然是从多个NacosServer随机选择一个发起心跳请求。
客户端就先分析到这里,为了梳理流程我这里画了一个图
总结
- NACOS就是使用SpringBoot自动装配完成相关组件的配置,在Nacos的自动配置类(NacosServerRegistryAutoConfiguration )中注册了服务注册器(NacosServiceRegistry )和自动注册器(NacosAutoServiceRegistration ),注册的服务登记对象(NacosRegistration)等。
- Nacos自动注册器通过监听应用启动时间来触发自动注册,他会判断一个自动注册开关(spring.cloud.service-registry.auto-registration.enabled)。满足自动注册就会调用NacosServiceRegistry 的register方法进行服务注册
- NacosServiceRegistry 把注册工作交给NacosNamingService去完成,在NacosNamingService中做了两件时间:1是把服务封装为BeatInfo加入心跳机制,2是完成服务的注册
- 对于心跳机制,通过BeatReactor 来完成,底层使用的是带定时任务的线程池5S一次心跳发送到服务端。
- 对于服务注册则是通过NamingProxy代理来完成,和心跳一样,都是使用随机选择一个NacosServer,然后拼接注册地址,交给HttpClient发送http请求。
好吧,到这里就真的结束了,如果文章对你有帮助建议点赞收藏加好评。你的鼓励是我最大的动力