一、引言
前面几个章节把Nacos服务注册从客户端到服务端,整个流程源码都分析了一遍。
本章节我们来分析,order-service、stock-service 完成Nacos注册后,可以通过Feign的方式,来完成服务之间的调用。那它的底层是如何实现的?做了哪些操作 ?本章就来进行解答!
二、目录
目录
一、引言
二、目录
三、客户端服务发现源码
四、服务端实例查询源码
五、总结
再分析源码前,我们先来了解下Nacos服务之间的调用流程:
- 首先每个客户端服务都有一个本地缓存列表,这个缓存列表会定时从服务端进行更新
- 当 order-service 去调用 stock-service 服务时,会根据服务名去本地缓存获取微服务实例,但通过服务名称会获取多个实例,所以需要再根据负载均衡选择其中一个
- 最终 order-service 服务拿到 ip+port 实例信息,发起HTTP调用,拿到返回结果。
注意:之前很多同学都误以为是 Nacos 服务端去请求 stock-serivce,然后把结果返回给 order-service,这样做 Nacos 服务端的压力就太大了,千万不要搞混淆了。
本章重点:
- 首先Nacos客户端是怎么调用实例查询接口的,是如何维护好本地缓存的 ?
- 其次Nacos服务端实例查询接口是如何实现的 ?
三、客户端服务发现源码
主线任务:Nacos客户端是怎么调用实例查询接口的,是如何维护好本地缓存的 ?
在微服务组件当中有个ribbon依赖,它主要是在我们微服务架构当中发挥 负载均衡 的作用。因为我们在线上生产部署的实例往往都是集群机构的,Ribbon会从集群实例中,根据负载均衡的算法选举出最终被调用的一台机器实例。
在我们Nacos当中,也是整合了Ribbon,来实现负载均衡的,从而可以调用Nacos服务端实例列表接口。
可以看到,在Nacos 注册依赖中,也是整合了Ribbon的依赖。
在Ribbon下面,有个 ServerList 接口,这是个扩展接口,这个接口的作用就是获取 server 列表。Nacos 有对这个接口做实现,从而整合Ribbon
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
在 NacosServerList 当中,继承了抽象类 AbstractServerList,在 AbstractServerLis当中实现了 ServerList 的两个接口。
public class NacosServerList extends AbstractServerList<NacosServer> {
private NacosDiscoveryProperties discoveryProperties;
private String serviceId;
public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
this.discoveryProperties = discoveryProperties;
}
@Override
public List<NacosServer> getInitialListOfServers() {
return getServers();
}
@Override
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
private List<NacosServer> instancesToServerList(List<Instance> instances) {
List<NacosServer> result = new ArrayList<>();
if (CollectionUtils.isEmpty(instances)) {
return result;
}
for (Instance instance : instances) {
result.add(new NacosServer(instance));
}
return result;
}
public String getServiceId() {
return serviceId;
}
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
this.serviceId = iClientConfig.getClientName();
}
}
可以看到,order-service 去调用 stock-service 时,最终会走到 getUpdatedListOfServers 方法。那我们就具体来分析这个方法~
我们就来看 getServers 方法,这里重点看 selectInstances() 方法。
private List<NacosServer> getServers() {
try {
// 读取分组
String group = discoveryProperties.getGroup();
// 通过服务名称、分组、true表示只需要健康实例,查询列表
// 我们重点看 seelctInstances方法
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
// 把 Instance 转换成 NacosServer 类型
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
selectInstances 前两个都是重载方法,重点看 hostReactor.getServiceInfo() 方法。
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)
throws NacosException {
return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
// 上面都是方法重载,最终调用到这个方法!!
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
// 默认为 true
ServiceInfo serviceInfo;
if (subscribe) {
// 重点看这个方法
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
重点看 getServiceInfo() 方法。在这个方法中,如果本地缓存为空,就会去查询Nacos实例列表接口,然后写入到本地缓存当中。
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 这里去查询本地缓存
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 本地缓存为空,去调用 Nacos实例列表接口,查询Nacos内存注册表数据
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// 这里的锁就有必要说一下了,这个锁的作用说白了就是为了防止避免 "HTTP重复调用的"!
// 假设当一条线程进来 serviceObj 为空,这时就会走上面 调用 Nacos实例列表接口的步骤方法。当调用Nacos实例列表接口的方法
// 还没执行完的时候,又进来了一条线程发现 因为还在等待 Nacos实例列表接口数据的返回,所以还没来得及往本地缓存列表写入数据,
// 这时本地缓存数据还是空的,所以这里会让该线程等待 5s。等查询实例列表的线程执行完之后,在 finnaly 最后:
// 如果oldService 不等于 null,它需要去唤醒其他正在等待的线程。
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 定时任务重复执行,维护本地缓存
scheduleUpdateIfAbsent(serviceName, clusters);
// 最终是从本地缓存中 获取实例列表数据
return serviceInfoMap.get(serviceObj.getKey());
}
如果本地缓存查不到对应的服务数据,那么就会向 Nacos 发起实例列表查询接口。
private void updateServiceNow(String serviceName, String clusters) {
try {
// 本地缓存没有数据,最终会调用到这个方法
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 发起Http请求调用 Nacos实例接口,获取数据
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
// 获取到数据,写入到本地缓存当中
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
// 组装请求参数
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
// 对 /nacos/v1/ns/instance/list 接口,发起Http请求调用
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
/instance/list 请求地址对应 Nacos 服务端查询实例列表接口,如下图:
在 getServiceInfo 方法当中,获取完实例数据,就会去执行 UpdateTask 定时任务,在定时任务当中,如果本地缓存为空,就会再去调用Nacos实例接口,更新本地缓存。并且这个定时任务是会重复执行的。
最终是从本地缓存中 serviceInfoMap 直接获取实例数据,从这里也可以看出本地缓存其实是一个 Map 结构。
// 定时任务重复执行,维护本地缓存
scheduleUpdateIfAbsent(serviceName, clusters);
// 最终是从本地缓存中 获取实例列表数据
return serviceInfoMap.get(serviceObj.getKey());
四、服务端实例查询源码
主线任务:Nacos服务端实例查询接口是如何实现的 ?
刚才通过客户端源码的分析,我们知道最终请求到 Nacos 服务端 /nacos/v1/ns/instance/list 接口,我们来看下这个接口怎么实现的。在list方法当中,首先去组装请求参数,然后去调用 doSrvIpxt 方法。
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
// 组装请求参数
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 查询实例列表
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
在 doSrvIpxt 方法当中很多都是分支代码逻辑,我们主要看下面这个几个方法就行。
最终还是从Nacos注册表当中获取了实例数据进行返回的。
// 获取实例列表
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
public List<Instance> srvIPs(List<String> clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());
}
// 拿到需要查询的 集群对象
return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
List<Instance> result = new ArrayList<>();
// 遍历集群对象
for (String cluster : clusters) {
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}
// 获取 cluster 对象中所有的 Instance 实例
result.addAll(clusterObj.allIPs());
}
return result;
}
public List<Instance> allIPs() {
// 返回持久化实例、临时实例
List<Instance> allInstances = new ArrayList<>();
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}
五、总结
每一个客户端服务都有一个本地缓存列表,先根据服务名去本地缓存列表当中找,如果没有就去调用Nacos实例查询接口查询注册表当中数据,查到的话进行返回,同步更新本地缓存列表,同时也会开启定时任务来维护本地缓存列表。
如果根据服务名在本地缓存列表查到多个服务实例,最终会根据负载均衡的策略选择其中一个, 进行 HTTP 调用。
最后我们还看了Nacos实例查询接口,实例数据是从Nacos注册表当中获取,进行返回的。
最后的最后,别忘了把源码分析图补充完整: