Dubbo的服务暴漏与服务发现源码详解

news2025/1/23 15:04:34

服务暴漏

如果配置需要刷新则根据配置优先级刷新服务配置

如果服务已经导出,则直接返回

是否异步导出(全局或者服务级别配置了异步,则需要异步导出服务)

服务暴漏入口DefaultModuleDeployer#exportServices

private void exportServices() {
    //从服务配置缓存查询缓存的所有服务配置,然后挨个服务发布
    for (ServiceConfigBase sc : configManager.getServices()) {
        exportServiceInternal(sc);
    }
}
private void exportServiceInternal(ServiceConfigBase sc) {
    ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
	//如果配置需要刷新则根据配置优先级刷新服务配置
    if (!serviceConfig.isRefreshed()) {
        serviceConfig.refresh();
    }
	//服务已经暴漏过就返回
    if (sc.isExported()) {
        return;
    }
	//是否异步导出(全局或者服务级别配置了异步,则需要异步导出服务)
    if (exportAsync || sc.shouldExportAsync()) {
        // 使用线程池导出服务
        ExecutorService executor = executorRepository.getServiceExportExecutor();
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                if (!sc.isExported()) {
                    sc.export();
                    exportedServices.add(sc);
                }
            } catch (Throwable t) {
                logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
            }
        }, executor);

        asyncExportingFutures.add(future);
    } else {
        //同步导出
        if (!sc.isExported()) {
            sc.export();
            exportedServices.add(sc);
        }
    }
}

ServiceConfig#export()

ServiceConfigBase 的模板方法

@Override
public void export() {
    if (this.exported) {
        return;
    }

    // ensure start module, compatible with old api usage
    getScopeModel().getDeployer().start();
	
    synchronized (this) {
        //DCL
        if (this.exported) {
            return;
        }

        if (!this.isRefreshed()) {
            this.refresh();
        }
        //服务导出配置配置为false则不导出
        if (this.shouldExport()) {
            this.init();

            if (shouldDelay()) {
                //延迟暴漏
                doDelayExport();
            } else {
                //导出服务
                doExport();
            }
        }
    }
}

ServiceConfig#init

public void init() {
    if (this.initialized.compareAndSet(false, true)) {
        // load ServiceListeners from extension
        // 加载服务监听器扩展
        ExtensionLoader<ServiceListener> extensionLoader = this.getExtensionLoader(ServiceListener.class);
        this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());
    }
    //provider的配置传递给元数据配置对象
    initServiceMetadata(provider);
    // 元数据设置接口和引用
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setTarget(getRef());
    //元数据key格式 group/服务接口:版本号
    serviceMetadata.generateServiceKey();
}

ServiceConfig#doExport

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    if (exported) {
        return;
    }
	//服务路径,为空则设计接口名
    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    // 导出URL
    doExportUrls();
    exported();
}

ServiceConfig#doExportUrls

导出URL了逻辑

private void doExportUrls() {
    ModuleServiceRepository repository = getScopeModel().getServiceRepository();
    ServiceDescriptor serviceDescriptor;
    //ref 是否是 ServerService
    final boolean serverService = ref instanceof ServerService;
    if (serverService) {
        serviceDescriptor = ((ServerService) ref).getServiceDescriptor();
        repository.registerService(serviceDescriptor);
    } else {
        //注册服务,解析服务接口将服务方法等描述信息存放在了服务存储
        //ModuleServiceRepository类型对象的成员变量services中
        serviceDescriptor = repository.registerService(getInterfaceClass());
    }
    //provider领域模型 ,封装了一些提供者需要的基本属性,同时内部解析分装方法信息
    providerModel = new ProviderModel(serviceMetadata.getServiceKey(),
        // 服务实现类
        ref,
        // 服务描述,包含了服务接口的方法信息
        serviceDescriptor,
        //当前的模型
        getScopeModel(),
        // 元数据对象,接口加载器
        serviceMetadata, interfaceClassLoader);

    // Compatible with dependencies on ServiceModel#getServiceConfig(), and will be removed in a future version
    providerModel.setConfig(this);

    providerModel.setDestroyRunner(getDestroyRunner());
    repository.registerProvider(providerModel);
	// 获取配置的注册中心列表,同时转换成URL
    // 核心 根据是否双注册配置返回URL信息
    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
	
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
            .map(p -> p + "/" + path)
            .orElse(path), group, version);
        // stub service will use generated service name
        if (!serverService) {
            // In case user specified path, register service one more time to map it to path.
            // 存储服务接口信息
            repository.registerService(pathKey, interfaceClass);
        }
        // 核心  根据协议导出到配置到注册中心
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }

    providerModel.setServiceUrls(urls);
}

服务注册

双注册配置

dubbo.application.register-mode=all

默认值为all代表应用级注册和接口级注册,配置值解释:

  • all 双注册
  • instance 应用级注册 service-discovery-registry:……
  • interface 接口级注册 registry:……
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

loadRegistries

public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<>();
    ApplicationConfig application = interfaceConfig.getApplication();
    // 获取到接口配置注册的地址
    List<RegistryConfig> registries = interfaceConfig.getRegistries();
    if (CollectionUtils.isNotEmpty(registries)) {
        for (RegistryConfig config : registries) {
            // try to refresh registry in case it is set directly by user using config.setRegistries()
            if (!config.isRefreshed()) {
                config.refresh();
            }
            String address = config.getAddress();
            if (StringUtils.isEmpty(address)) {
                address = ANYHOST_VALUE;
            }
            if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                AbstractConfig.appendParameters(map, application);
                AbstractConfig.appendParameters(map, config);
                map.put(PATH_KEY, RegistryService.class.getName());
                AbstractInterfaceConfig.appendRuntimeParameters(map);
                if (!map.containsKey(PROTOCOL_KEY)) {
                    map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                }
                List<URL> urls = UrlUtils.parseURLs(address, map);

                for (URL url : urls) {
                    url = URLBuilder.from(url)
                        .addParameter(REGISTRY_KEY, url.getProtocol())
                        .setProtocol(extractRegistryType(url))
                        .setScopeModel(interfaceConfig.getScopeModel())
                        .build();
                    // provider delay register state will be checked in RegistryProtocol#export
                    if (provider || url.getParameter(SUBSCRIBE_KEY, true)) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return genCompatibleRegistries(interfaceConfig.getScopeModel(), registryList, provider);
}

genCompatibleRegistries

private static List<URL> genCompatibleRegistries(ScopeModel scopeModel, List<URL> registryList, boolean provider) {
    List<URL> result = new ArrayList<>(registryList.size());
    // 每个注册中心增加服务注册中心地址的配置
    registryList.forEach(registryURL -> {
        if (provider) {
            // for registries enabled service discovery, automatically register interface compatible addresses.
            String registerMode;
            //注册协议配置了service-discovery-registry 走这个逻辑
            if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
                registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INSTANCE));
                if (!isValidRegisterMode(registerMode)) {
                    registerMode = DEFAULT_REGISTER_MODE_INSTANCE;
                }
                // 添加应用级别配置
                result.add(registryURL);
                // 判断是否添加接口级别注册
                if (DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)
                    && registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
                    URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
                        .setProtocol(REGISTRY_PROTOCOL)
                        .removeParameter(REGISTRY_TYPE_KEY)
                        .build();
                    result.add(interfaceCompatibleRegistryURL);
                }
            } else {
                //双注册模式配置查询 对应参数为dubbo.application.register-mode 默认值为all
                registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getCachedDynamicProperty(scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
                if (!isValidRegisterMode(registerMode)) {
                    registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
                }
                //如果满足应用级别注册,添加应用级别注册
                if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
                    && registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
                    URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
                        .setProtocol(SERVICE_REGISTRY_PROTOCOL)
                        .removeParameter(REGISTRY_TYPE_KEY)
                        .build();
                    result.add(serviceDiscoveryRegistryURL);
                }
            	// 如果满足接口注册配置,添加接口级别注册
                if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
                    result.add(registryURL);
                }
            }

            FrameworkStatusReportService reportService = ScopeModelUtil.getApplicationModel(scopeModel).getBeanFactory().getBean(FrameworkStatusReportService.class);
            reportService.reportRegistrationStatus(reportService.createRegistrationReport(registerMode));
        } else {
            result.add(registryURL);
        }
    });

    return result;
}

ServiceConfig#doExportUrlsFor1Protocol(protocolConfig, registryURLs);

导出服务配置到本地和注册中心

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //生成协议配置
    Map<String, String> map = buildAttributes(protocolConfig);

    // remove null key and null value
    map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));
    // init serviceMetadata attachments
    // 协议配置放到元数据 attachments 中
    serviceMetadata.getAttachments().putAll(map);
	// 协议配置和默认的协议配置转成URL
    URL url = buildUrl(protocolConfig, map);
	// 导出URL
    exportUrl(url, registryURLs);
}

导出URL ServiceConfig#exportUrl(url, registryURLs)

private void exportUrl(URL url, List<URL> registryURLs) {
    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // 没有明确指定远程导出,开启本地导出
        if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
    	// 没右明确指定本地导出,开启远程导出
        // export to remote if the config is not local (export to local only when config is local)
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            // export to extra protocol is used in remote export
            String extProtocol = url.getParameter("ext.protocol", "");
            List<String> protocols = new ArrayList<>();
            // export original url
            url = URLBuilder.from(url).
                addParameter(IS_PU_SERVER_KEY, Boolean.TRUE.toString()).
                removeParameter("ext.protocol").
                build();
            url = exportRemote(url, registryURLs);
            if (!isGeneric(generic) && !getScopeModel().isInternal()) {
                MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
            }

            if (!extProtocol.equals("")) {
                String[] extProtocols = extProtocol.split(",", -1);
                protocols.addAll(Arrays.asList(extProtocols));
            }
            // export extra protocols
            for(String protocol : protocols) {
                if(!protocol.equals("")){
                    URL localUrl = URLBuilder.from(url).
                        setProtocol(protocol).
                        build();
                    localUrl = exportRemote(localUrl, registryURLs);
                    if (!isGeneric(generic) && !getScopeModel().isInternal()) {
                        MetadataUtils.publishServiceDefinition(localUrl, providerModel.getServiceModel(), getApplicationModel());
                    }
                    this.urls.add(localUrl);
                }
            }
        }
    }
    this.urls.add(url);
}

导出服务到本地

核心方法 exportLocal(url); 本地调用使用了 injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但执行了 Dubbo 的 Filter 链。

exportLocal 中主要更新了URL (协议转换成了injvm,host改为127.0.0.1 ,port 0)然后调用doExportUrl

private void exportLocal(URL url) {
    URL local = URLBuilder.from(url)
        .setProtocol(LOCAL_PROTOCOL)
        .setHost(LOCALHOST_VALUE)
        .setPort(0)
        .build();
    local = local.setScopeModel(getScopeModel())
        .setServiceModel(providerModel);
    doExportUrl(local, false);
    logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

导出服务到注册中心

private URL exportRemote(URL url, List<URL> registryURLs) {
    if (CollectionUtils.isNotEmpty(registryURLs)) {
        // 遍历所有注册中心地址,挨个注册
        for (URL registryURL : registryURLs) {
            //如果是应用级注册service-discovery-registry 为协议URL 参数service-name-mapping为true
            if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
                url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
            }
            //if protocol is only injvm ,not register
            if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                continue;
            }
        	// url 添加动态配置 dynamic
            url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
            if (monitorUrl != null) {
                url = url.putAttribute(MONITOR_KEY, monitorUrl);
            }

            // For providers, this is used to enable custom proxy to generate invoker
            String proxy = url.getParameter(PROXY_KEY);
            if (StringUtils.isNotEmpty(proxy)) {
                registryURL = registryURL.addParameter(PROXY_KEY, proxy);
            }
        	// log info
            if (logger.isInfoEnabled()) {
                if (url.getParameter(REGISTER_KEY, true)) {
                    logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL.getAddress());
                } else {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
            }
        	// 核心导出URL
            doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
        }
    } else {
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        doExportUrl(url, true);
    }
    return url;
}

ServiceConfig#doExportUrl

导出到本地 和 导出到注册中心公用了 doExportUrl 方法

private void doExportUrl(URL url, boolean withMetaData) {
    //adaptor 扩展类型处理,javassist 获取 代理invoker
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    if (withMetaData) {
        //远程导出,元数据也是用invoker包装一下
        invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    }
    Exporter<?> exporter = protocolSPI.export(invoker);
    exporters.add(exporter);
}

JavassistProxyFactory类型的getInvoker方法

ProxyFactory 使用了dubbo的SPI机制,默认加载 javassist 的 spi对应类使用

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    try {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        //创建实际服务提供者的代理类型,代理类型后缀为DubboWrap
		final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //返回一个 继承 AbstractProxyInvoker 并重写 doInvoke 的类,调用wrapper.invokeMethod
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    } catch (Throwable fromJavassist) {
        // log info and exception
    }
}

使用协议导出 export

Exporter<?> exporter = protocolSPI.export(invoker);

protocolSPI使用的DubboSPI机制,通过spi的wapper机制,可以知道这里面的调用会有一个调用链,每一个export都会经过如下调用链

  1. 协议序列化机制 ProtocolSerializationWrapper
  2. 协议过滤器Wrapper ProtocolFilterWrapper
  3. 协议监听器Wrapper ProtocolListenerWrapper
  4. QOS的协议Wrapper QosProtocolWrapper (运维相关使用)
  5. 具体协议的Protocol 例如 InjvmProtocol ,DubboProtocol

调用链中通过**UrlUtils.isRegistry(invoker.getUrl())**来判断是否是应用级别注册 (service-discovery-registry)

img

ProtocolSerializationWrapper

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //这里主要逻辑是将服务提供者url添加到服务存储仓库中
    getFrameworkModel(invoker.getUrl().getScopeModel()).getServiceRepository().registerProviderUrl(invoker.getUrl());
    return protocol.export(invoker);
}

ProtocolFilterWrapper

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //注册中心的协议导出直接执行
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        //服务发现service-discovery-registry的导出会走这个逻辑
        return protocol.export(invoker);
    }
    //过滤器调用链FilterChainBuilder的扩展对象查询
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    //这里分为2步 生成过滤器调用链 然后使用链表中的节点调用  这里值查询provider类型的过滤器
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

ProtocolListenerWrapper

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //注册中心地址则直接导出
    if (UrlUtils.isRegistry(invoker.getUrl())) {
         //服务发现service-discovery-registry的导出会走这个逻辑
        return protocol.export(invoker);
    }
    // 先导出对象 再创建过滤器包装对象 执行监听器逻辑
    return new ListenerExporterWrapper<T>(protocol.export(invoker),
            Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(ExporterListener.class, invoker.getUrl().getScopeModel())
                    .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}

QosProtocolWrapper

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
     //注册中心导出的时候开启QOS 默认端口22222
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        startQosServer(invoker.getUrl());
        return protocol.export(invoker);
    }
    return protocol.export(invoker);
}
startQosServer(invoker.getUrl());

启动Qos服务,使用的Netty模型 NioEventLoopGroup

导出本地和注册中心的Protocol区别

InjvmProtocol 的导出方法

本地使用协议 injvm

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

RegistryProtocol

RegistryProtocol 的 export 包含了一下四部分

  • 根据最新配置覆盖配置
  • 导出协议端口开启TCP服务
  • 注册到注册中心
  • 通知服务启动了
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();
    overrideListeners.put(registryUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    //本地导出协议开启端口
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    // 通过URL获取注册中心的Registry操作对象
    final Registry registry = getRegistry(registryUrl);
    //转换成向注册中心注册的URL
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    // decide if we need to delay publish (provider itself and registry should both need to register)
    boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
    //是否向注册中心注册
    if (register) {
        //这里有两种情况 
        //接口级注册会将接口级服务提供者数据直接注册到Zookeper上面,
        //服务发现(应用级注册)这里仅仅会将注册数据转换为服务元数据等后面来发布元数据
        register(registry, registeredProviderUrl);
    }

    // register stated url on provider model
    registerStatedUrl(registryUrl, registeredProviderUrl, register);


    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);

    if (!registry.isServiceDiscovery()) {
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    }
	// 内置监听器通知
    notifyExport(exporter);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

doLocalExport本地导出协议开启端口

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        //protoco对象是dubbo自动生成的适配器对象protocol$Adaptive 适配器对象会根据当前协议的参数来查询具体的协议扩展对象
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

protocol.export()也会经过调用链,最后一个Protocol是DubboProtocol

DubboProtocol

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    checkDestroyed();
    // 从invoker 获取URL
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    // 创建导出需要的Expoter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

    //export a stub service for dispatching event
    boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackService) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(PROTOCOL_UNSUPPORTED, "", "", "consumer [" + url.getParameter(INTERFACE_KEY) +
                    "], has set stub proxy support event ,but no stub methods founded.");
            }

        }
    }
    //创建服务,开启服务端口
    openServer(url);
    optimizeSerialization(url);

    return exporter;
}
openServer开启服务端口

RPC协议的TCP通信

private void openServer(URL url) {
    checkDestroyed();
    // find server.
    String key = url.getAddress();
    // client can export a service which only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);

    if (isServer) {
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                    return;
                }
            }
        }

        // server supports reset, use together with override
        server.reset(url);
    }
}
createServer 创建协议服务

为地址创建协议服务

private ProtocolServer createServer(URL url) {
    url = URLBuilder.from(url)
        // send readonly event when server closes, it's enabled by default
        .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
        // enable heartbeat by default
        .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
        .addParameter(CODEC_KEY, DubboCodec.NAME)
        .build();
    // 服务端使用的网络 默认是netty
    String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
        throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
    }
	//交换器
    ExchangeServer server;
    try {
        // 创建交换器
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    transporter = url.getParameter(CLIENT_KEY);
    if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
        throw new RpcException("Unsupported client type: " + transporter);
    }

    DubboProtocolServer protocolServer = new DubboProtocolServer(server);
    loadServerProperties(protocolServer);
    return protocolServer;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/397052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis缓存穿透

缓存穿透&#xff1a; 缓存穿透说简单点就是⼤量请求的 key 根本不存在于缓存中&#xff0c;导致请求直接到了数据库上&#xff0c; 根本没有经过缓存这⼀层。举个例⼦&#xff1a;某个⿊客故意制造我们缓存中不存在的 key 发起⼤量 请求&#xff0c;导致⼤量请求落到数据库。…

http笔记

文章目录1、什么是http&#xff1f;2、http报文格式3、请求报文1、认识URL2、认识http方法3、认识header4、响应报文5、https加密机制1、什么是http&#xff1f; http是应用层最广泛使用的协议之一&#xff1b;其中浏览器获取到网页就是基于http实现的&#xff1b;http就是浏览…

Caddy2学习笔记——Caddy2反向代理docker版本的DERP中继服务器

一、个人环境概述 本人拥有一个国内云服务商的云主机和一个备案好的域名&#xff0c;通过caddy2来作为web服务器。我的云主机系统是Ubuntu。 我的云主机是公网ip&#xff0c;地址为&#xff1a;43.126.100.78&#xff1b;我备案好的域名是&#xff1a;hotgirl.com。后面的文章…

【量化交易笔记】3.实现数据库保存数据

上一节&#xff0c;我们通过下载相关的 pandas 数据保存为 本地csv文件&#xff0c;这一节将上节的数据以数据库方式保存。 数据库保存 采集数据部分前一节已做说明&#xff0c;这里就直接用采用前面的内容。这里着重说明的事数据库连接。对与 python 相连接的数据库有很多&a…

玩转Python的交互(命令行)模式

我喜欢使用Python的交互界面&#xff08;命令行模式&#xff09;来运行和调试Python代码。为什么不用PyCharm、VSCode&#xff1f;因为先入为主&#xff0c;加上我的DOS命令行的情结&#xff0c;我第一次安装使用Python就是用这种黑白界面的&#xff0c;平时写代码惯用EmEditor…

MySQL慢查询

2 慢查询 2.1 慢查询介绍 MySQL的慢查询日志是MySQL提供的一种日志记录&#xff0c;它用来记录在MySQL中响应时间超过阀值的语句&#xff0c;具体指运行时间超过long_query_time值的SQL&#xff0c;则会被记录到慢查询日志中。具体指运行时间超过long_query_time值的SQL&…

软件测试之快速熟悉项目

快速熟悉项目 1、了解项目架构 C/S架构 C/S 代表的是客户端/服务器&#xff08;client/server&#xff09;&#xff0c;这类软件的使用者需要在本地电脑安装客户端程序&#xff0c;例如&#xff1a;QQ。 优点:安全性高。 缺点:一旦软件有更新&#xff0c;用户需要手动下载&am…

Rust 开发系列PyO3:Rust与Python的联动编程(中)

第三节&#xff1a;对比C语言的Python原生扩展开发模式 C/c编写Python扩展的方法&#xff0c;与Rust大致是相同的&#xff0c;如果不论语言本身的语法带来的繁琐的话&#xff0c;就单纯以开发步骤和模式来看&#xff0c;原生语言写扩展的步骤更为标准和简单。 大致来说&#…

QT入门Item Views之QTreeView

目录 一、QTreeView界面相关 1、布局介绍 二、基本属性功能 1、设置单元格不能编辑 2、一次选中一个item 3、去掉鼠标移动到单元格上的虚线框 4、最后一列自适应 三、代码展示 1、创建模型&#xff0c;导入模型 2、 右键菜单栏 3、双…

深度学习模型训练工作汇报(3.8)

进行数据的初始整理的准备 主要是进行伪序列字典的设置&#xff0c;以及训练数据集的准备。 期间需要的一些问题包括在读取文件信息的时候&#xff0c;需要跳过文件的第一行或者前两行&#xff0c;如果使用循环判断的话&#xff0c;会多进行n次的运算&#xff0c;这是不划算的…

003+limou+HTML——(3)HTML列表

000、前言 列表是网页常见的一种数据排列方式&#xff0c;在HTMl中列表一共有三种&#xff1a;有序列表、无序列表、定义列表&#xff08;另外“目录列表dir”和“菜单列表menu”已经在HTML5中被废除了&#xff0c;现在都是使用无序列表ul来替代&#xff09; 001、有序列表&a…

C/C++指针与数组(一)

预备知识 1、数据的存储 2、基本内建类型 1&#xff09;类型的大小 C offers a flexible standard with some guaranteed minimum sizes, which it takes from C: A short integer is at least 16 bits wide.An int integer is at least as big as short.A long integer is a…

Spring Cloud学习笔记:基础知识

这是本人学习的总结&#xff0c;主要学习资料如下 马士兵教育 目录1、Spring Cloud 简介2、Eureka3、建立Spring Cloud项目3.1、启动Server3.1.1、dependency3.1.2、配置文件3.1.3、Server端启动代码3.2、启动Client3.2.1、dependency3.2.2、配置文件3.3.3、Client端启动代码3…

Go之入门(特性、变量、常量、数据类型)

一、Go语言特性 语法简单并发性。Go语言引入了协程goroutine&#xff0c;实现了并发编程内存分配。Go语言为了解决高并发下内存的分配和管理&#xff0c;选择了tcmalloc进行内存分配&#xff08;为了并发设计的高性能内存分配组件&#xff0c;使用cache为当前线程提供无锁分配…

电脑自动重启是什么原因?详细解说

案例&#xff1a;电脑自动重启是什么原因&#xff1f; “一台用了一年的电脑&#xff0c;最近使用&#xff0c;每天都会一两次莫名其妙自动重启&#xff0c;看了电脑错误日志&#xff0c;看不懂什么意思&#xff0c;一直找不到答案。有没有高手知道怎么解决这个问题的。” 当…

仿写简单IOC

目录 TestController类: UserService类: 核心代码SpringIOC&#xff1a; Autowired和Component注解 SpringIOCTest 类 ​编辑 总结&#xff1a; TestController类: Component public class TestController {Autowiredprivate UserService userService;public void test…

RocketMQ如何测试

RocketMQ如何测试MQ简介RocketMQRocketMQ测试点MQ简介 MQ&#xff1a;Message Queue&#xff0c;即消息队列&#xff0c;是一种应用程序之间的消息通信&#xff0c;简单理解就是A服务不断的往队列里发布信息&#xff0c;另一服务B从队列中读取消息并执行处理&#xff0c;消息发…

同步、异步ETL架构的比较

背景介绍&#xff1a; 数据的抽取&#xff0c;转换和加载 (ETL, Extract, Transform, Load) 是构建数据仓库过程中最复杂也是至 关重要的一个步骤&#xff0c;我们通常用两种办法来处理 ETL 流程: 一种是异步(Asynchronous) ETL 方式, 也称为文本文件(Flat file)方式。 另外…

华为云平台架构名词解释

名词解释 网络设备 ISW&#xff08;外网接入交换机&#xff09;&#xff1a;出口交换机&#xff0c;常用于和外网建立静态/BGP路由互联 CSW &#xff08;内网接入交换机&#xff09;&#xff1a;专线接入&#xff08;用户内网骨干&#xff09;交换机&#xff0c;用户自有网络…

一场以数字技术深度影响和改造传统实业的新风口,正在开启

当数字经济的浪潮开始上演&#xff0c;一场以数字技术深度影响和改造传统实业的新风口&#xff0c;正在开启。对于诸多在互联网时代看似业已走入死胡同的物种来讲&#xff0c;可以说是打开了新的天窗。对于金融科技来讲&#xff0c;同样如此。以往&#xff0c;谈及金融科技&…