文章目录
- 1.Nacos Client的自动注册原理和实现
- 2.Naocs Client向Server发送注册请求
- 3.Nacos Client向Server发送心跳请求
Nacos Client的任务: 向Server发送注册请求, 向Server发送心跳请求, Client获取所有的服务, Client定时更新本地服务, Client获取要调用服务的提供者列表
1.Nacos Client的自动注册原理和实现
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration {
@Autowired(required = false)
private AutoServiceRegistration autoServiceRegistration;
@Autowired
private AutoServiceRegistrationProperties properties;
@PostConstruct
protected void init() {
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException("Auto Service Registration has "
+ "been requested, but there is no AutoServiceRegistration bean");
}
}
}
注册方法: register()
- NacosServiceRegistry
- NacosAutoServiceRegistration
AbstractAutoServiceRegistration.onApplicationEvent()
调用的是NacosServiceRegistry.register()
- 为什么一启动的时候就注册:
AbstractAutoServiceRegistration.onApplicationEvent(), 一启动的时候会调用NacosServiceRegistry.register(), 而NacosServiceRegistry是NacosDiscoveryAutoConfiguration类注入的。
2.Naocs Client向Server发送注册请求
根据之前的register()
NacosNamingService.registerInstance()
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 生成的String格式为:groupId@@微服务名称
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 若当前实例为临时实例,则向Server发送心跳
if (instance.isEphemeral()) {
// 构建一个心跳信息实例
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 向server发送心跳(定时任务)
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 向Server发送注册请求
serverProxy.registerService(groupedServiceName, groupName, instance);
}
BeatInfo为心跳类, BeatReactor为生成心跳类的类
NamingProxy.registerService()
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 将instance拆散后写入到params中,并以请求参数的形式出现在请求中
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 提交一个POST请求
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
// 第4个参数是获取到配置文件中指定的nacos server地址
return reqApi(api, params, body, getServerList(), method);
}
将instance拆散后写入到params中,并以请求参数的形式出现在请求中, 提交一个post请求
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
// 遍历所有server,从中随机的选择一个server去连接,
// 若该server连接失败,则采用轮询方式选择下一个去尝试连接,
// 直到连接成功为止,或尝试次数为server数量
if (servers != null && !servers.isEmpty()) {
// 生成一个随机数
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
// 连接server
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size(); // 轮询
}
}
if (StringUtils.isNotBlank(nacosDomain)) {
// 默认尝试着连接三次
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
} // end-for
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
// 构建请求url
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
// 提交请求
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
NacosRestTemplate.java
// NacosRestTemplate.java
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
// 提交请求
return execute(url, httpMethod, requestHttpEntity, responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
// 获取到Nacos自定义的HttpClient,其就是对JDK中的HttpURLConnection的封装
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
获取到Nacos自定义的HttpClient,其就是对JDK中的HttpURLConnection的封装,发送对应的post请求。
JdkHttpClientRequest.execute()
3.Nacos Client向Server发送心跳请求
NacosNamingService.registerInstance()
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// 形成的key的格式为:groupId@@微服务名称#ip#port
// 这个key是固定了主机了
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
// dom2Beat是一个缓存map,其key为主机,value则为该主机发送的心跳beatInfo
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 开启一个定时任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
开启一个定时任务来发送心跳。
BeatTask.run()
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
// 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 若在server端没有发现该client,则server返回的状态码为20404
// 此时client会发起注册请求
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// 向server发送注册请求
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
// 又启动一次定时任务
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
发送心跳: sendBeat(), 发送put请求。
若在server端没有发现该client,则server返回的状态码为20404, 此时client会发起注册请求
registerService(): 发送post请求
在run()方法最后有又启动一次定时任务。