文章目录
- 前言
- 一、服务监听ContextRefreshedEvent
- 1、AbstractApplicationContext.refresh
- 2、AbstractApplicationContext.finishRefresh
- 3、DubboDeployApplicationListener.onApplicationEvent
- 4、DefaultModuleDeployer .referServices
- 5、SimpleReferenceCache.get
- 二、引用服务 ReferenceConfig
- 1、时序图
- 2、ReferenceConfig.get
- 3、ReferenceConfig.init
- 4、ReferenceConfig.createProxy
- 5、ReferenceConfig.createInvokerForRemote
- 三、注册协议 RegistryProtocol
- 1、RegistryProtocol.refer
- 2、RegistryProtocol.doRefer
- 3、RegistryProtocol.interceptInvoker
前言
文章基于3.1.0版本进行分析
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>3.1.0</version>
</dependency>
一、服务监听ContextRefreshedEvent
在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。
1、AbstractApplicationContext.refresh
public void refresh() throws BeansException, IllegalStateException {
synchronized(this.startupShutdownMonitor) {
// 省略无关代码***
// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理
this.finishRefresh();
// 省略无关代码***
}
}
2、AbstractApplicationContext.finishRefresh
protected void finishRefresh() {
// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// 广播刷新完成事件
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
if (!NativeDetector.inNativeImage()) {
LiveBeansView.registerApplicationContext(this);
}
}
3、DubboDeployApplicationListener.onApplicationEvent
dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听
public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {
private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);
private ApplicationContext applicationContext;
private ApplicationModel applicationModel;
private ModuleModel moduleModel;
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer
ModuleDeployer deployer = moduleModel.getDeployer();
Assert.notNull(deployer, "Module deployer is null");
// start module
Future future = deployer.start();
// if the module does not start in background, await finish
if (!deployer.isBackground()) {
try {
// 等待发布结束
future.get();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());
} catch (Exception e) {
logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);
}
}
}
}
4、DefaultModuleDeployer .referServices
DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作
public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {
// DefaultApplicationDeployer
private ApplicationDeployer applicationDeployer;
@Override
public synchronized Future start() throws IllegalStateException {
...
// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件
// 其功能等价于监听器 DubboConfigApplicationListener
applicationDeployer.initialize();
initialize();
// 真正触发 服务注册功能
exportServices();
// prepare application instance
// exclude internal module to avoid wait itself
if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
applicationDeployer.prepareInternalModule();
}
// 服务消费
// refer services
referServices();
...
return startFuture;
}
private void referServices() {
configManager.getReferences().forEach(rc -> {
try {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
// 刷新配置
if (!referenceConfig.isRefreshed()) {
referenceConfig.refresh();
}
// 如果还没初始化
if (rc.shouldInit()) {
// 是否异步注入
if (referAsync || rc.shouldReferAsync()) {
ExecutorService executor = executorRepository.getServiceReferExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
referenceCache.get(rc);
} catch (Throwable t) {
logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);
}
}, executor);
asyncReferringFutures.add(future);
} else {
// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCache
referenceCache.get(rc);
}
}
} catch (Throwable t) {
logger.error(getIdentifier() + " refer catch error.");
referenceCache.destroy(rc);
throw t;
}
});
}
}
5、SimpleReferenceCache.get
SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。
public <T> T get(ReferenceConfigBase<T> rc) {
String key = generator.generateKey(rc);
// 服务类型 如果是泛化调用则这个类型为GenericService
Class<?> type = rc.getInterfaceClass();
boolean singleton = rc.getSingleton() == null || rc.getSingleton();
T proxy = null;
// Check existing proxy of the same 'key' and 'type' first.
// 单例
if (singleton) {
// 缓存数据找
proxy = get(key, (Class<T>) type);
} else {
logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +
"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");
}
// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()
if (proxy == null) {
// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)
List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));
// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)
referencesOfType.add(rc);
// 与前面一样 前面是类型映射,这里是key映射
List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));
referenceConfigList.add(rc);
// 开始引用服务
proxy = rc.get();
}
return proxy;
}
二、引用服务 ReferenceConfig
服务发现和引用就是从这里开始的
1、时序图
2、ReferenceConfig.get
获取引用的代理对象
public T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// ensure start module, compatible with old api usage
// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化
// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
主要包括
- checkAndUpdateSubConfigs() – 检查并更新缺省配置
- init() – 消费者服务的初始化,核心逻辑。
3、ReferenceConfig.init
初始化代理对象
protected synchronized void init() {
// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的
if (initialized && ref != null) {
return;
}
try {
// 刷新配置
if (!this.isRefreshed()) {
this.refresh();
}
// init serviceMetadata
// 初始化元数据信息 如版本号,分组,服务接口名
initServiceMetadata(consumer);
// //继续初始化元数据信息 服务接口类型和key
serviceMetadata.setServiceType(getServiceInterfaceClass());
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
// 参数配置转成MAP
Map<String, String> referenceParameters = appendConfig();
// init service-application mapping
// 初始化路径,参数转化成url,dubbo主要从url上面读取参数
initServiceAppsMapping(referenceParameters);
// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor;
if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
repository.registerService(serviceDescriptor);
} else {
serviceDescriptor = repository.registerService(interfaceClass);
}
// 创建消费者模型对象
consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,
getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);
// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.
consumerModel.setConfig(this);
repository.registerConsumer(consumerModel);
serviceMetadata.getAttachments().putAll(referenceParameters);
// 创建引用的代理对象 核心代码
ref = createProxy(referenceParameters);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
consumerModel.setDestroyCaller(getDestroyRunner());
consumerModel.setProxyObject(ref);
consumerModel.initMethodModels();
checkInvokerAvailable();
} catch (Throwable t) {
// *** 省略部分代码
throw t;
}
initialized = true;
}
主要流程
- 服务引用前初始化serviceMetadata服务元数据
- 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
- 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑
4、ReferenceConfig.createProxy
创建代理对象,实际调用需要通过这个代理对象进行调用
private T createProxy(Map<String, String> referenceParameters) {
// 本地引用
if (shouldJvmRefer(referenceParameters)) {
createInvokerForLocal(referenceParameters);
} else {
urls.clear();
meshModeHandleUrl(referenceParameters);
if (StringUtils.isNotEmpty(url)) {
// user specified URL, could be peer-to-peer address, or register center's address.
// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urls
parseUrl(referenceParameters);
} else {
// if protocols not in jvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置
aggregateUrlFromRegistry(referenceParameters);
}
}
// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑
createInvokerForRemote();
}
if (logger.isInfoEnabled()) {
logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?
" it's GenericService reference" : " it's not GenericService reference"));
}
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
// create service proxy
// 构建一个代理对象,代理客户端的请求
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
5、ReferenceConfig.createInvokerForRemote
远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker
private void createInvokerForRemote() {
// 只有一个,秩序创建一个invoker
if (urls.size() == 1) {
URL curUrl = urls.get(0);
// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.refer
invoker = protocolSPI.refer(interfaceClass, curUrl);
if (!UrlUtils.isRegistry(curUrl)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
} else {
List<Invoker<?>> invokers = new ArrayList<>();
URL registryUrl = null;
// 创建多个invoker
for (URL url : urls) {
// For multi-registry scenarios, it is not checked whether each referInvoker is available.
// Because this invoker may become available later.
//
invokers.add(protocolSPI.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// use last registry url
registryUrl = url;
}
}
// 创建一个集群路由invoker
if (registryUrl != null) {
// registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
// (RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
} else {
// not a registry url, must be direct invoke.
// 直连
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
URL curUrl = invokers.get(0).getUrl();
String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol
三、注册协议 RegistryProtocol
应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol
1、RegistryProtocol.refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 组装配置中心的地址
url = getRegistryUrl(url);
// 获取用于操作Zookeeper的Registry类型
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
}
}
// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster
// 后续调用服务失败时候会先失效转移再降级
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
// 主要干活的地方 生成引用 invoker
return doRefer(cluster, registry, type, url, qs);
}
生成配置中心的url
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
×tamp=1725379058285
2、RegistryProtocol.doRefer
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);
// 消费者url信息
URL consumerUrl = new ServiceConfigURL (
p,
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute
);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
// 带迁移性质的Invoker对象
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
// 执行迁移规则创建应用级优先的服务发现Invoker对象
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
consumerUrl如下
consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
®ister.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
×tamp=1725379319215
3、RegistryProtocol.interceptInvoker
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListener
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
// MigrationRuleListener
// 迁移规则应用级引用
listener.onRefer(this, invoker, consumerUrl, url);
}
return invoker;
}
接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现