服务发现机制
服务发现是RPC框架非常重要的能力。典型的服务发现一般有两种:接口级服务发现、应用级服务发现。
接口级服务发现典型代表是dubbo2基于zk的服务发现机制。提供者直接向注册中心注册接口信息及地址,消费者通过接口从注册中心拿到对应的地址。
应用级服务发现典型代表是spring-cloud基于eureka的服务发现机制。提供者向注册中心注册应用信息及对应的地址,消费者从注册中心拿到应用的ip。传统的应用级服务发现消费者需要自己另外维护请求接口与应用直接关系,一般是通过请求路径匹配应用的规则来实现。这样当需要发起请求时,先找到请求对应的应用,然后再查找应用对应的IP。
dubbo3的应用级服务发现的特别之处在于提供者在注册应用的同时,也向注册中心注册接口与应用的关系。这样消费者就无需另外维护这个关系。
以ZK作为注册中心为例,dubbo3的服务注册与发现过程大致如下图所示
- 提供者注册应用信息、接口与应用的关系
- 消费者通过接口从注册中心获取应用,再获取应用信息、IP
- 消费者组装服务调用对象
服务注册
提供者告知注册中心:我是谁、我的地址、我有哪些服务。其中我是谁、我的地址就是注册应用实例;我有哪些服务就是注册接口与应用的映射关系
应用实例
dubbo应用实例在zk下的节点路径是/services/appx/ipx。appx是对应的应用名,是持久节点;ipx是对应的机器ip,是临时节点。appx下可能有多个ip,代表多个应用部署多台机器。
dubbo启动时向zk创建对应节点。
org.apache.dubbo.config.deploy.DefaultApplicationDeployer#prepareApplicationInstance
public void prepareApplicationInstance() {
...
if (isRegisterConsumerInstance()) {
exportMetadataService();
if (hasPreparedApplicationInstance.compareAndSet(false, true)) {
// 注:此处调用注册应用实例
registerServiceInstance();
}
}
}
org.apache.dubbo.registry.client.AbstractServiceDiscovery#register
public synchronized void register() throws RuntimeException {
...
this.serviceInstance = createServiceInstance(this.metadataInfo);
...
//计算当前实例的revision,并注册到注册中心
boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
if (revisionUpdated) {
reportMetadata(this.metadataInfo);
//注:向注册中心注册应用信息
doRegister(this.serviceInstance);
}
}
注册接口应用映射
提供者如果只告知注册中心自己的名称、地址,消费者是无法知道接口的地址。所以提供者还需要告知自己有哪些接口,这样消费者就可以根据接口找到应用,在根据应用找到地址。dubbo3接口应用映射关系节点路径:/dubbo/mapping/servicex,servicex是对应的接口类全路径,是持久节点。该节点的值是:appx,appy,appx、appy是对应的应用,多个应用名之间用","隔开。
dubbo3在接口暴露的时候注册对应的节点:
org.apache.dubbo.config.ServiceConfig#exported
protected void exported() {
...
//注:服务暴露时触发注册接口应用映射关系
boolean succeeded = serviceNameMapping.map(url);
...
}
org.apache.dubbo.registry.client.metadata.MetadataServiceNameMapping#map
public boolean map(URL url) {
...
boolean result = true;
for (Map.Entry<String, MetadataReport> entry : metadataReportInstance.getMetadataReports(true).entrySet()) {
MetadataReport metadataReport = entry.getValue();
String appName = applicationModel.getApplicationName();
try {
...
boolean succeeded;
int currentRetryTimes = 1;
String newConfigContent = appName;
do {
ConfigItem configItem = metadataReport.getConfigItem(serviceInterface, DEFAULT_MAPPING_GROUP);
String oldConfigContent = configItem.getContent();
if (StringUtils.isNotEmpty(oldConfigContent)) {
boolean contains = StringUtils.isContains(oldConfigContent, appName);
if (contains) {
succeeded = true;
break;
}
//注:将应用名拼在已有内容后面
newConfigContent = oldConfigContent + COMMA_SEPARATOR + appName;
}
//注:写入注册中心, 节点路径 /dubbo/mapping/接口类全路径,值是appNameX,appNameY
succeeded = metadataReport.registerServiceAppMapping(serviceInterface, DEFAULT_MAPPING_GROUP, newConfigContent, configItem.getTicket());
} while (!succeeded && currentRetryTimes++ <= CAS_RETRY_TIMES);
....
}
return result;
}
服务发现
消费者知道自己要调用哪个接口,它必须从注册拿到接口对应的ip才能将请求发过去。首先消费者需要订阅"接口应用映射"节点,拿到接口对应的应用;其次消费者订阅"应用实例"节点,拿到应用对应的实例ip。
订阅接口应用映射
dubbo3在服务引用时完成订阅动作,DefaultMappingListener是监听dubbo mapping的核心类
org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#doSubscribe
public void doSubscribe(URL url, NotifyListener listener) {
url = addRegistryClusterKey(url);
serviceDiscovery.subscribe(url, listener);
boolean check = url.getParameter(CHECK_KEY, false);
String key = ServiceNameMapping.buildMappingKey(url);
Lock mappingLock = serviceNameMapping.getMappingLock(key);
try {
mappingLock.lock();
Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
try {
//注:创建新的MapplingListener
MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
// 注:首次获取接口对应的映射 并且 监听zk节点变化
subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
} catch (Exception e) {
logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
}
...
} finally {
mappingLock.unlock();
}
}
org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.DefaultMappingListener#onEvent
public synchronized void onEvent(MappingChangedEvent event) {
...
Set<String> newApps = event.getApps();
Set<String> tempOldApps = oldApps;
...
try {
mappingLock.lock();
if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
subscribeURLs(url, listener, newApps);
oldApps = newApps;
return;
}
for (String newAppName : newApps) {
if (!tempOldApps.contains(newAppName)) {
serviceNameMapping.removeCachedMapping(ServiceNameMapping.buildMappingKey(url));
serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
// old instance listener related to old app list that needs to be destroyed after subscribe refresh.
ServiceInstancesChangedListener oldListener = listener.getServiceListener();
if (oldListener != null) {
String appKey = toStringKeys(toTreeSet(tempOldApps));
Lock appSubscriptionLock = getAppSubscription(appKey);
try {
appSubscriptionLock.lock();
//注:从老的listener中移除当前接口,如果老的listener没有被任何接口监听则destroy。
oldListener.removeListener(url.getServiceKey(), listener);
if (!oldListener.hasListeners()) {
oldListener.destroy();
removeAppSubscriptionLock(appKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
subscribeURLs(url, listener, newApps);
oldApps = newApps;
return;
}
}
} finally {
mappingLock.unlock();
}
}
org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribeURLs
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String serviceKey = url.getServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
for (String serviceName : serviceNames) {
//注:首次获取应用实例,触发实例变化事件,组装接口invoker
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
//注:将监听器挂到zk节点上
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(url, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
// 注:如果该监听器之前destroy了,则从本地移除。 这里有一点不严谨:如果某个接口的mapping值变化顺序是appNameX -> appNameY -> appNameX,则第一次从appNameX变化到appNameY时,appNameX的ServiceInstanceChangedListener会destroy但是没有从本地移除。那么第二次从appNameY变回到appNameX时,接口会走到这段逻辑,导致接口的服务地址不会得到更新。当然,这种场景很少出现。
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
订阅应用实例
消费者通过接口应用映射拿到了接口对应的应用,还要知道应用对应的实例ip才能发请求
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener#onEvent
public void onEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
doOnEvent(event);
}
/**
* @param event
*/
private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
refreshInstance(event);
if (logger.isDebugEnabled()) {
logger.debug(event.getServiceInstances().toString());
}
//注:这里一大段是从新的实例分别获取各个实例提供的接口,从而组装消费者接口引用的invokers
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
// grouping all instances of this app(service name) by revision
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
String revision = getExportedServicesRevision(instance);
if (revision == null || EMPTY_REVISION.equals(revision)) {
if (logger.isDebugEnabled()) {
logger.debug("Find instance without valid service metadata: " + instance.getAddress());
}
continue;
}
List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
}
}
// get MetadataInfo with revision
for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
String revision = entry.getKey();
List<ServiceInstance> subInstances = entry.getValue();
MetadataInfo metadata = subInstances.stream()
.map(ServiceInstance::getServiceMetadata)
.filter(Objects::nonNull)
.filter(m -> revision.equals(m.getRevision()))
.findFirst()
.orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances));
parseMetadata(revision, metadata, localServiceToRevisions);
// update metadata into each instance, in case new instance created.
for (ServiceInstance tmpInstance : subInstances) {
MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
tmpInstance.setServiceMetadata(metadata);
}
}
}
int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) {// retry every 10 seconds
hasEmptyMetadata = true;
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("Error submitting async retry task.");
}
logger.warn("Address refresh try task submitted");
}
// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
// 1-17 - Address refresh failed.
logger.error(REGISTRY_FAILED_REFRESH_ADDRESS, "metadata Server failure", "",
"Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");
return;
}
}
hasEmptyMetadata = false;
Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
ServiceInfo serviceInfo = entry.getKey();
Set<String> revisions = entry.getValue();
Map<Integer, Map<Set<String>, Object>> portToRevisions = protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
Map<Set<String>, Object> revisionsToUrls = portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
Object urls = revisionsToUrls.get(revisions);
if (urls == null) {
urls = getServiceUrlsCache(revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort());
revisionsToUrls.put(revisions, urls);
}
List<ProtocolServiceKeyWithUrls> list = newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
}
this.serviceUrls = newServiceUrls;
//注:触发服务地址变化,组装新的服务invoker
this.notifyAddressChanged();
}