服务暴漏
如果配置需要刷新则根据配置优先级刷新服务配置
如果服务已经导出,则直接返回
是否异步导出(全局或者服务级别配置了异步,则需要异步导出服务)
服务暴漏入口DefaultModuleDeployer#exportServices
private void exportServices() {
//从服务配置缓存查询缓存的所有服务配置,然后挨个服务发布
for (ServiceConfigBase sc : configManager.getServices()) {
exportServiceInternal(sc);
}
}
private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
//如果配置需要刷新则根据配置优先级刷新服务配置
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
//服务已经暴漏过就返回
if (sc.isExported()) {
return;
}
//是否异步导出(全局或者服务级别配置了异步,则需要异步导出服务)
if (exportAsync || sc.shouldExportAsync()) {
// 使用线程池导出服务
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else {
//同步导出
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
}
}
ServiceConfig#export()
ServiceConfigBase 的模板方法
@Override
public void export() {
if (this.exported) {
return;
}
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
synchronized (this) {
//DCL
if (this.exported) {
return;
}
if (!this.isRefreshed()) {
this.refresh();
}
//服务导出配置配置为false则不导出
if (this.shouldExport()) {
this.init();
if (shouldDelay()) {
//延迟暴漏
doDelayExport();
} else {
//导出服务
doExport();
}
}
}
}
ServiceConfig#init
public void init() {
if (this.initialized.compareAndSet(false, true)) {
// load ServiceListeners from extension
// 加载服务监听器扩展
ExtensionLoader<ServiceListener> extensionLoader = this.getExtensionLoader(ServiceListener.class);
this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());
}
//provider的配置传递给元数据配置对象
initServiceMetadata(provider);
// 元数据设置接口和引用
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
//元数据key格式 group/服务接口:版本号
serviceMetadata.generateServiceKey();
}
ServiceConfig#doExport
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
//服务路径,为空则设计接口名
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 导出URL
doExportUrls();
exported();
}
ServiceConfig#doExportUrls
导出URL了逻辑
private void doExportUrls() {
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor;
//ref 是否是 ServerService
final boolean serverService = ref instanceof ServerService;
if (serverService) {
serviceDescriptor = ((ServerService) ref).getServiceDescriptor();
repository.registerService(serviceDescriptor);
} else {
//注册服务,解析服务接口将服务方法等描述信息存放在了服务存储
//ModuleServiceRepository类型对象的成员变量services中
serviceDescriptor = repository.registerService(getInterfaceClass());
}
//provider领域模型 ,封装了一些提供者需要的基本属性,同时内部解析分装方法信息
providerModel = new ProviderModel(serviceMetadata.getServiceKey(),
// 服务实现类
ref,
// 服务描述,包含了服务接口的方法信息
serviceDescriptor,
//当前的模型
getScopeModel(),
// 元数据对象,接口加载器
serviceMetadata, interfaceClassLoader);
// Compatible with dependencies on ServiceModel#getServiceConfig(), and will be removed in a future version
providerModel.setConfig(this);
providerModel.setDestroyRunner(getDestroyRunner());
repository.registerProvider(providerModel);
// 获取配置的注册中心列表,同时转换成URL
// 核心 根据是否双注册配置返回URL信息
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, register service one more time to map it to path.
// 存储服务接口信息
repository.registerService(pathKey, interfaceClass);
}
// 核心 根据协议导出到配置到注册中心
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
providerModel.setServiceUrls(urls);
}
服务注册
双注册配置
dubbo.application.register-mode=all
默认值为all代表应用级注册和接口级注册,配置值解释:
- all 双注册
- instance 应用级注册
service-discovery-registry:……
- interface 接口级注册
registry:……
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
loadRegistries
public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<>();
ApplicationConfig application = interfaceConfig.getApplication();
// 获取到接口配置注册的地址
List<RegistryConfig> registries = interfaceConfig.getRegistries();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
// try to refresh registry in case it is set directly by user using config.setRegistries()
if (!config.isRefreshed()) {
config.refresh();
}
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
address = ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
AbstractConfig.appendParameters(map, application);
AbstractConfig.appendParameters(map, config);
map.put(PATH_KEY, RegistryService.class.getName());
AbstractInterfaceConfig.appendRuntimeParameters(map);
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(extractRegistryType(url))
.setScopeModel(interfaceConfig.getScopeModel())
.build();
// provider delay register state will be checked in RegistryProtocol#export
if (provider || url.getParameter(SUBSCRIBE_KEY, true)) {
registryList.add(url);
}
}
}
}
}
return genCompatibleRegistries(interfaceConfig.getScopeModel(), registryList, provider);
}
genCompatibleRegistries
private static List<URL> genCompatibleRegistries(ScopeModel scopeModel, List<URL> registryList, boolean provider) {
List<URL> result = new ArrayList<>(registryList.size());
// 每个注册中心增加服务注册中心地址的配置
registryList.forEach(registryURL -> {
if (provider) {
// for registries enabled service discovery, automatically register interface compatible addresses.
String registerMode;
//注册协议配置了service-discovery-registry 走这个逻辑
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INSTANCE));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INSTANCE;
}
// 添加应用级别配置
result.add(registryURL);
// 判断是否添加接口级别注册
if (DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)
&& registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
.setProtocol(REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(interfaceCompatibleRegistryURL);
}
} else {
//双注册模式配置查询 对应参数为dubbo.application.register-mode 默认值为all
registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
//如果满足应用级别注册,添加应用级别注册
if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
&& registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
.setProtocol(SERVICE_REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(serviceDiscoveryRegistryURL);
}
// 如果满足接口注册配置,添加接口级别注册
if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
result.add(registryURL);
}
}
FrameworkStatusReportService reportService = ScopeModelUtil.getApplicationModel(scopeModel).getBeanFactory().getBean(FrameworkStatusReportService.class);
reportService.reportRegistrationStatus(reportService.createRegistrationReport(registerMode));
} else {
result.add(registryURL);
}
});
return result;
}
ServiceConfig#doExportUrlsFor1Protocol(protocolConfig, registryURLs);
导出服务配置到本地和注册中心
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//生成协议配置
Map<String, String> map = buildAttributes(protocolConfig);
// remove null key and null value
map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));
// init serviceMetadata attachments
// 协议配置放到元数据 attachments 中
serviceMetadata.getAttachments().putAll(map);
// 协议配置和默认的协议配置转成URL
URL url = buildUrl(protocolConfig, map);
// 导出URL
exportUrl(url, registryURLs);
}
导出URL ServiceConfig#exportUrl(url, registryURLs)
private void exportUrl(URL url, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// 没有明确指定远程导出,开启本地导出
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 没右明确指定本地导出,开启远程导出
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// export to extra protocol is used in remote export
String extProtocol = url.getParameter("ext.protocol", "");
List<String> protocols = new ArrayList<>();
// export original url
url = URLBuilder.from(url).
addParameter(IS_PU_SERVER_KEY, Boolean.TRUE.toString()).
removeParameter("ext.protocol").
build();
url = exportRemote(url, registryURLs);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
}
if (!extProtocol.equals("")) {
String[] extProtocols = extProtocol.split(",", -1);
protocols.addAll(Arrays.asList(extProtocols));
}
// export extra protocols
for(String protocol : protocols) {
if(!protocol.equals("")){
URL localUrl = URLBuilder.from(url).
setProtocol(protocol).
build();
localUrl = exportRemote(localUrl, registryURLs);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(localUrl, providerModel.getServiceModel(), getApplicationModel());
}
this.urls.add(localUrl);
}
}
}
}
this.urls.add(url);
}
导出服务到本地
核心方法 exportLocal(url); 本地调用使用了 injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但执行了 Dubbo 的 Filter 链。
exportLocal 中主要更新了URL (协议转换成了injvm,host改为127.0.0.1 ,port 0)然后调用doExportUrl
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
local = local.setScopeModel(getScopeModel())
.setServiceModel(providerModel);
doExportUrl(local, false);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
导出服务到注册中心
private URL exportRemote(URL url, List<URL> registryURLs) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 遍历所有注册中心地址,挨个注册
for (URL registryURL : registryURLs) {
//如果是应用级注册service-discovery-registry 为协议URL 参数service-name-mapping为true
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// url 添加动态配置 dynamic
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// log info
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// 核心导出URL
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
doExportUrl(url, true);
}
return url;
}
ServiceConfig#doExportUrl
导出到本地 和 导出到注册中心公用了 doExportUrl
方法
private void doExportUrl(URL url, boolean withMetaData) {
//adaptor 扩展类型处理,javassist 获取 代理invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
//远程导出,元数据也是用invoker包装一下
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}
JavassistProxyFactory类型的getInvoker方法
ProxyFactory 使用了dubbo的SPI机制,默认加载 javassist 的 spi对应类使用
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
try {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
//创建实际服务提供者的代理类型,代理类型后缀为DubboWrap
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回一个 继承 AbstractProxyInvoker 并重写 doInvoke 的类,调用wrapper.invokeMethod
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
} catch (Throwable fromJavassist) {
// log info and exception
}
}
使用协议导出 export
Exporter<?> exporter = protocolSPI.export(invoker);
protocolSPI使用的DubboSPI机制,通过spi的wapper机制,可以知道这里面的调用会有一个调用链,每一个export都会经过如下调用链
- 协议序列化机制 ProtocolSerializationWrapper
- 协议过滤器Wrapper ProtocolFilterWrapper
- 协议监听器Wrapper ProtocolListenerWrapper
- QOS的协议Wrapper QosProtocolWrapper (运维相关使用)
- 具体协议的Protocol 例如 InjvmProtocol ,DubboProtocol
调用链中通过**UrlUtils.isRegistry(invoker.getUrl())**
来判断是否是应用级别注册 (service-discovery-registry)
ProtocolSerializationWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//这里主要逻辑是将服务提供者url添加到服务存储仓库中
getFrameworkModel(invoker.getUrl().getScopeModel()).getServiceRepository().registerProviderUrl(invoker.getUrl());
return protocol.export(invoker);
}
ProtocolFilterWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心的协议导出直接执行
if (UrlUtils.isRegistry(invoker.getUrl())) {
//服务发现service-discovery-registry的导出会走这个逻辑
return protocol.export(invoker);
}
//过滤器调用链FilterChainBuilder的扩展对象查询
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
//这里分为2步 生成过滤器调用链 然后使用链表中的节点调用 这里值查询provider类型的过滤器
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
ProtocolListenerWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心地址则直接导出
if (UrlUtils.isRegistry(invoker.getUrl())) {
//服务发现service-discovery-registry的导出会走这个逻辑
return protocol.export(invoker);
}
// 先导出对象 再创建过滤器包装对象 执行监听器逻辑
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(ExporterListener.class, invoker.getUrl().getScopeModel())
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
QosProtocolWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注册中心导出的时候开启QOS 默认端口22222
if (UrlUtils.isRegistry(invoker.getUrl())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
startQosServer(invoker.getUrl());
启动Qos服务,使用的Netty模型 NioEventLoopGroup
导出本地和注册中心的Protocol区别
InjvmProtocol 的导出方法
本地使用协议 injvm
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
RegistryProtocol
RegistryProtocol 的 export 包含了一下四部分
- 根据最新配置覆盖配置
- 导出协议端口开启TCP服务
- 注册到注册中心
- 通知服务启动了
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();
overrideListeners.put(registryUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
//本地导出协议开启端口
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 通过URL获取注册中心的Registry操作对象
final Registry registry = getRegistry(registryUrl);
//转换成向注册中心注册的URL
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish (provider itself and registry should both need to register)
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
//是否向注册中心注册
if (register) {
//这里有两种情况
//接口级注册会将接口级服务提供者数据直接注册到Zookeper上面,
//服务发现(应用级注册)这里仅仅会将注册数据转换为服务元数据等后面来发布元数据
register(registry, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
if (!registry.isServiceDiscovery()) {
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
// 内置监听器通知
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
doLocalExport本地导出协议开启端口
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protoco对象是dubbo自动生成的适配器对象protocol$Adaptive 适配器对象会根据当前协议的参数来查询具体的协议扩展对象
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
protocol.export()也会经过调用链,最后一个Protocol是DubboProtocol
DubboProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
// 从invoker 获取URL
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// 创建导出需要的Expoter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//export a stub service for dispatching event
boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackService) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(PROTOCOL_UNSUPPORTED, "", "", "consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stub proxy support event ,but no stub methods founded.");
}
}
}
//创建服务,开启服务端口
openServer(url);
optimizeSerialization(url);
return exporter;
}
openServer开启服务端口
RPC协议的TCP通信
private void openServer(URL url) {
checkDestroyed();
// find server.
String key = url.getAddress();
// client can export a service which only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
return;
}
}
}
// server supports reset, use together with override
server.reset(url);
}
}
createServer 创建协议服务
为地址创建协议服务
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 服务端使用的网络 默认是netty
String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
}
//交换器
ExchangeServer server;
try {
// 创建交换器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
transporter = url.getParameter(CLIENT_KEY);
if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
throw new RpcException("Unsupported client type: " + transporter);
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
loadServerProperties(protocolServer);
return protocolServer;
}