Nacos注册服务
cosumer启动的时候,从nacos server上读取指定服务名称的实例列表,缓存到本地内存中。
开启一个定时任务,每隔10s去nacos server上拉取服务列表
nacos的push机制:
通过心跳检测发现服务提供者出现心态超时的时候,推送一个push消息到consumer,更新本地的缓存数据。
Nacos注册源码分析
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.8.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
基于以上版本源码去学习。
入口
我们自己的项目在配置了nacos作为注册中心后,至少要配置这么一个属性
spring.cloud.nacos.discovery.server-addr=10.130.16.110:8848
# 从逻辑上看,这个是通过grpc去注册还是通过http去注册。false-http注册 true-grpc
spring.cloud.nacos.discovery.ephemeral=false
那么这个属性会让应用找到nacos的server地址去注册。如果不配置的话,会一直报错
springboot的@EnableAutoConfiguration
这里就不再讲解了。都到nacos的源码了,springboot默认是熟悉的。
我们再去打开NacosServiceRegistryAutoConfiguration
这个类。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
其中第三个类NacosAutoServiceRegistration
实现了一个抽象类AbstractAutoServiceRegistration
.
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent> {
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
看到这里有实现一个ApplicationListener<WebServerInitializedEvent>
的类,这个类是spring的一个监听事件(观察者模式),而这个事件就是webserver初始化的时候去触发的。onApplicationEvent
方法调用了bind()
方法。而bind()
中又调用了start()
.
start()
中有一个register()
。而这个register就是NacosServiceRegistry
中的register()
。
register
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
}
- getNacosInstanceFromRegistration 获取注册的实例信息。
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
- namingService.registerInstance(serviceId, group, instance);
这个clientProxy有3个实现类,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。
这个类的构造方法中有个init(properties)方法,这个方法中给clientProxy赋值了。走的是NamingClientProxyDelegate方法。一般情况下,带有delegate的方法都是委派模式。
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
-
NamingClientProxyDelegate.registerService
委派这里做了一个可执行的判断
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
-
NamingClientProxyDelegate.getExecuteClientProxy
做了一个判断,配置ephemeral=false就走http,否则grpc。这里请注意,如果nacos-server还是用的1.x.x版本的话,会报错的。因为2.x.x增加一个grpc的支持,会额外的多增加一个端口,默认对外提供端口为8848和9848
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
-
NamingHttpClientProxy.registerService
这里的clientProxy=NamingHttpClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map<String, String> params = new HashMap<String, String>(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
-
NamingHttpClientProxy.reqApi
public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, serverListManager.getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); if (serverListManager.isDomain()) { String nacosDomain = serverListManager.getNacosDomain(); for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } } index = (index + 1) % servers.size(); } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }
serverListManager.isDomain()这个判断是配置了几个nacos server的值,如果是一个的话,走if逻辑,如果多余1个的话,走else逻辑。
else中的servers就是nacos server服务列表,通过Ramdom拿到一个随机数,然后去callServer(),如果发现其中的一个失败,那么index+1 获取下一个服务节点再去callServer。如果所有的都失败的话,则抛出错误。
-
NamingHttpClientProxy.callServer
前边的判断支线省略,拼接url,拼好了后,进入try逻辑块中,这里封装了一个nacosRestTemplate类。请求完成后,返回一个restResult,拿到了请求结果后,把请求结果code放入了一个交MetricsMonitor的类中了,从代码上看很明显是监控相关的类,点击进去果然发现是prometheus相关的。这里我们不扩展了,继续回到主线。
如果返回结果是200的话,把result.content返回去。
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
String namespace = params.get(CommonParams.NAMESPACE_ID);
String group = params.get(CommonParams.GROUP_NAME);
String serviceName = params.get(CommonParams.SERVICE_NAME);
params.putAll(getSecurityHeaders(namespace, group, serviceName));
Header header = NamingHttpUtil.builderHeader();
String url;
if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
url = curServer + api;
} else {
if (!InternetAddressUtil.containsPort(curServer)) {
curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
-
NacosRestTemplate.exchangeForm
关键方法:this.requestClient().execute()
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
return execute(url, httpMethod, requestHttpEntity, responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
private final HttpClientRequest requestClient;
private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();
public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
super(logger);
this.requestClient = requestClient;
}
private HttpClientRequest requestClient() {
if (CollectionUtils.isNotEmpty(interceptors)) {
if (logger.isDebugEnabled()) {
logger.debug("Execute via interceptors :{}", interceptors);
}
return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
}
return requestClient;
}
这里的requestClient是构造方法传入进来的。那么我们需要找到初始化的类
public class NamingHttpClientProxy extends AbstractNamingClientProxy {
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
}
public NacosRestTemplate getNacosRestTemplate() {
return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}
HttpClientBeanHolder.getNacosRestTemplate
典型的双重检查锁。
public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
if (httpClientFactory == null) {
throw new NullPointerException("httpClientFactory is null");
}
String factoryName = httpClientFactory.getClass().getName();
NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
if (nacosRestTemplate == null) {
synchronized (SINGLETON_REST) {
nacosRestTemplate = SINGLETON_REST.get(factoryName);
if (nacosRestTemplate != null) {
return nacosRestTemplate;
}
nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
SINGLETON_REST.put(factoryName, nacosRestTemplate);
}
}
return nacosRestTemplate;
}
而这里的httpClientFactory是什么呢?
而NamingHttpClientFactory是一个AbstractHttpClientFactory的实现类,由于NamingHttpClientProxy没有重写createNacosRestTemplate方法。所以最终引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EP1tkPNQ-1673943210548)
private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
public NacosRestTemplate getNacosRestTemplate() {
return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}
private static class NamingHttpClientFactory extends AbstractHttpClientFactory {
@Override
protected HttpClientConfig buildHttpClientConfig() {
return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
.setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
}
@Override
protected Logger assignLogger() {
return NAMING_LOGGER;
}
}
AbstractHttpClientFactory.createNacosRestTemplate
@Override
public NacosRestTemplate createNacosRestTemplate() {
HttpClientConfig httpClientConfig = buildHttpClientConfig();
final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
// enable ssl
initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
@Override
public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
clientRequest.setSSLContext(loadSSLContext());
clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
}
}, new TlsFileWatcher.FileChangeListener() {
@Override
public void onChanged(String filePath) {
clientRequest.setSSLContext(loadSSLContext());
}
});
return new NacosRestTemplate(assignLogger(), clientRequest);
}
JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
可以看到这里定义了一个JdkHttpClientRequest 。
再往下跟就到java.net.HttpURLConnection的调用,去请求nacos-server的地址,再往下的就不做分析了,进入了http的通讯层了。
最终返回了一个结果,如果是200的话,就注册成功了。失败了就会抛出异常。