【微服务】SpringCloud中Ribbon集成Eureka实现负载均衡

news2024/11/22 22:03:01

💖 Spring家族及微服务系列文章

✨【微服务】SpringCloud轮询拉取注册表及服务发现源码解析

✨【微服务】SpringCloud微服务续约源码解析

✨【微服务】SpringCloud微服务注册源码解析

✨【微服务】Nacos2.x服务发现?RPC调用?重试机制?

✨【微服务】Nacos通知客户端服务变更以及重试机制

✨【微服务】Nacos服务发现源码分析

✨【微服务】SpringBoot监听器机制以及在Nacos中的应用

✨【微服务】Nacos服务端完成微服务注册以及健康检查流程

✨【微服务】Nacos客户端微服务注册原理流程

✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理

✨【微服务】SpringBoot启动流程注册FeignClient

✨【微服务】SpringBoot启动流程初始化OpenFeign的入口

✨Spring Bean的生命周期

✨Spring事务原理

✨SpringBoot自动装配原理机制及过程

✨SpringBoot获取处理器流程

✨SpringBoot中处理器映射关系注册流程

✨Spring5.x中Bean初始化流程

✨Spring中Bean定义的注册流程

✨Spring的处理器映射器与适配器的架构设计

✨SpringMVC执行流程图解及源码

目录

​编辑

💖 Spring家族及微服务系列文章

一、前言

二、Ribbon集成Eureka

1、入口

1.1、ZoneAwareLoadBalancer构造初始化

1.2、DynamicServerListLoadBalancer构造初始化

2、获取并更新服务列表到本地

2.1、更新服务的列表

1)DomainExtractingServerList组件

2.2、从EurekaClient获取服务列表

2.3、updateAllServerList(List ls)逻辑

2.4、设置到本地

2.5、更新到本地

3、动态更新服务列表

3.1、enableAndInitLearnNewServersFeature() 启动任务

3.2、延迟执行任务

3.3、执行更新

4、Ping机制

4.1、启动任务

4.2、任务

4.3、调用ping

4.4、pingServers逻辑


一、前言

在前面的文章中已经分析了Ribbon实现负载均衡的原理,我们这篇文章接着讲解ribbon如何集成eureka的,入口是什么?如何获取eureka服务列表,服务发现,缓存到本地?如何动态获取更新服务列表,周期性任务?如何选取一个可以用的服务Ping机制,周期性任务?CAS加锁、释放锁?

二、Ribbon集成Eureka

无论是Eureka还是Ribbon都是Netflix公司的两大开源项目,不过现在都不维护了。SpringCloud只是集成了它们,即底层实现是依赖于这两个项目去实现的。不过在高版本的SpringCloud当中移除了Ribbon,自己实现一套负载均衡解决方案,至于稳定性现在不好说。所以国人依然是以稳定性考虑,依然沿用JDK8,高版本的Spring使用了JDK17。再次提醒下一定要以稳定为主,不然背锅的可是你哦!!!

比较有意思的是,启动的时候没有打印关于Ribbon的日志,找了半天也没有找到还自我怀疑配置出错了。结果调用了一下,发现它这个时候才使用DefaultListableBeanFactory创建实例的,尤其上面圈的两个。 

1、入口

在前面的文章《【微服务】SpringCloud中使用Ribbon实现负载均衡的原理》中讲解到,SpringBoot在自动装配的过程中,根据RibbonClientConfiguration中的配置默认装配ZoneAwareLoadBalancer的。ZoneAwareLoadBalancer是ribbon项目里面的,RibbonClientConfiguration是spring-cloud-netflix-ribbon里面的。

既然是接口与实现类我们不妨看看类的关系图

    好像很复杂的样子,我们先对接口与实现类关系图有个印象,留个疑惑?其实父子关系,无非微妙在构造方法、方法重用、模板方法、super关键字、继承等。那么上面的自动装配使用了构造方法,我们看下有没有什么玄机。

注意:我们重点关注LoadBalancer链条。

1.1、ZoneAwareLoadBalancer构造初始化

    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        // 通过super关键字调用父类有参构造方法
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);

        String name = Optional.ofNullable(getName()).orElse("default");

        // 通过配置初始化字段
        this.enabled = clientConfig.getGlobalProperty(ENABLED);
        this.triggeringLoad = clientConfig.getGlobalProperty(TRIGGERING_LOAD_PER_SERVER_THRESHOLD.format(name));
        this.triggeringBlackoutPercentage = clientConfig.getGlobalProperty(AVOID_ZONE_WITH_BLACKOUT_PERCENTAGE.format(name));
    }

这里通过super关键字调用父类有参构造方法初始化几个Ribbon关键组件,思考一下如果不用super的话调用该构造方法父类会不会有什么变化。通过全局配置初始化一些字段。

1.2、DynamicServerListLoadBalancer构造初始化

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        // 通过super调用父类构造方法初始化
        super(clientConfig, rule, ping);
        // 初始化为DomainExtractingServerList
        this.serverListImpl = serverList;
        // 过滤器,初始化默认为ZonePreferenceServerListFilter
        this.filter = filter;
        // 服务列表更新器,初始化时默认为PollingServerListUpdater
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        // 重点关注该初始化
        restOfInit(clientConfig);
    }

serverList究竟是什么,我们可从入参追溯,发现跟自动装配有关。但是发现了两处都有 serverList,究竟谁才是最终赢家?我们不妨debugger下。

1)debugger负载均衡请求

这里我们debugger进入到NamedContextFactory(spring-cloud-commons项目里面的),红框里面的几个重点关注的单例Bean,有没有引起你的注意?有关Ribbon的不妨都去看看,说不定会发现新大陆哦!!!

2)ServerList装配

	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(IClientConfig config,
			Provider<EurekaClient> eurekaClientProvider) {
		if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
			return this.propertiesFactory.get(ServerList.class, config, serviceId);
		}
		DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
				config, eurekaClientProvider);
		DomainExtractingServerList serverList = new DomainExtractingServerList(
				discoveryServerList, config, this.approximateZoneFromHostname);
		return serverList;
	}

DiscoveryEnabledNIWSServerList是ribbon项目里面的,DomainExtractingServerList是spring-cloud-netflix项目里面的,从上面的debugger图可见ribbonServerList即serverList为DomainExtractingServerList。它们的构造方法可以点击进去看看,下面会2.1的1)详细分析

3)restOfInit(IClientConfig clientConfig)集成Eureka入口

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        // 关闭此选项以避免在 BaseLoadBalancer.setServerList ()中执行重复的异步启动
        this.setEnablePrimingConnections(false);
        // 感知新的服务实例的添加/移除,即动态维护服务实例列表
        enableAndInitLearnNewServersFeature();

        // 更新Eureka client 中所有服务的实例列表,即初始化服务实例列表
        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

主要逻辑:

  1. 感知新的服务实例的添加/移除,即动态维护服务实例列表。下面3分析
  2. 更新Eureka client 中所有服务的实例列表,即初始化服务实例列表。下面2分析

2、获取并更新服务列表到本地

2.1、更新服务的列表

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            // 获取服务
            servers = serverListImpl.getUpdatedListOfServers();
            // 从 Discovery 客户端获取{}的服务器列表: {}
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                // 过滤处理。这里有一部分逻辑,感兴趣的可以去看看
                servers = filter.getFilteredListOfServers(servers);
                // 从 Discovery 客户端获取的{}的筛选服务器列表: {}
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        // 更新全部服务列表
        updateAllServerList(servers);
    }

主要逻辑:

  1. 获取服务列表。serverListImpl在(1中2))初始化时默认为DomainExtractingServerList,而它是在spring-cloud-netfilx项目里面的,所以这里调用的是它的getUpdatedListOfServers()获取服务的,下面1)补充说明下。但是在ribbon项目里面你点击该方法发现直接跳转到下面获取服务列表了,这时很容易忽略了一些东西,下面1)进行补充。
  2. 过滤处理。这里有一部分逻辑,感兴趣的可以去看看
  3. 更新全部服务列表到ribbon本地

1)DomainExtractingServerList组件

	public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
			IClientConfig clientConfig, boolean approximateZoneFromHostname) {
		// list为DiscoveryEnabledNIWSServerList的引用
		this.list = list;
		this.ribbon = RibbonProperties.from(clientConfig);
		this.approximateZoneFromHostname = approximateZoneFromHostname;
	}

	@Override
	public List<DiscoveryEnabledServer> getInitialListOfServers() {
		// 委托DiscoveryEnabledNIWSServerList处理
		List<DiscoveryEnabledServer> servers = setZones(
				this.list.getInitialListOfServers());
		return servers;
	}

	@Override
	public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
		// 委托DiscoveryEnabledNIWSServerList处理
		List<DiscoveryEnabledServer> servers = setZones(
				this.list.getUpdatedListOfServers());
		return servers;
	}

可见在自动装配中初始化时list为DiscoveryEnabledNIWSServerList,而上面两个功能都是委托(ribbon项目里面的)DiscoveryEnabledNIWSServerList处理的。

2.2、从EurekaClient获取服务列表

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<>();
        }

        // 获取eurekaClient 
        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                // 如果 targetArea 为 null,它将被解释为 client 的相同区域
                // 获取服务实例
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                // 遍历服务实例列表
                for (InstanceInfo ii : listOfInstanceInfo) {
                    // 获取服务实例为UP状态的
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // 拷贝是必要的,因为 InstanceInfo 构建器只使用原始引用,而且我们不想破坏对象的全局
                            // eureka拷贝,因为它可能被我们系统中的其他客户机使用
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        // 转换为DiscoveryEnabledServer
                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        // 添加到集合
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    // 如果当前的 vipAddress 有服务器,我们不使用后续的基于 vipAddress 的服务器
                    break; 
                }
            }
        }
        return serverList;
    }

主要逻辑:

  1. 获取eurekaClient,上一篇博客是否还有印象呢
  2. vipAddresses不为空,分隔然后遍历。1)获取服务实例列表。上一篇服务发现是否记得。2)遍历服务实例列表;3)过滤获取获取服务实例为UP即正常运行状态的;4)是否使用覆盖端口,如果是则拷贝。拷贝是必要的,因为 InstanceInfo 构建器只使用原始引用,而且我们不想破坏对象的全局eureka拷贝,因为它可能被我们系统中的其他客户机使用;5)转换为DiscoveryEnabledServer,添加到服务列表serverList;6)如果当前的vipAddress有服务器,我们不使用后续的基于vipAddress的服务

2.3、updateAllServerList(List<T> ls)逻辑

    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        // 确保在多线程情况下只有一个线程进来
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    // 设置,以便客户端可以立即开始使用这些服务器,而不必等到ping周期结束。
                    s.setAlive(true); 
                }
                // 更新服务列表
                setServersList(ls);
                // 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
                super.forceQuickPing();
            } finally {
                // 还原以便下次CAS成功
                serverListUpdateInProgress.set(false);
            }
        }
    }

主要逻辑:

  1. CAS确保在多线程情况下只有一个线程进来
  2. 设置服务为活跃状态,以便客户端可以立即开始使用这些服务器,而不必等到ping周期结束。
  3. 更新服务列表。下面分析
  4. 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
  5. 还原以便下次CAS成功

2.4、设置到本地

    @Override
    public void setServersList(List lsrv) {
        // 通过super调用父类方法更新到本地
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<>();
        for (Server server : serverList) {
            // 确保创建了ServerStats,以避免在热路径上创建它们
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        // 为区域设置服务器列表
        setServerListForZones(serversInZones);
    }

主要逻辑:

  1. 通过super调用父类方法更新到本地
  2. 遍历服务列表,确保创建了ServerStats,以避免在热路径上创建它们
  3. 为区域设置服务器列表

2.5、更新到本地

    public void setServersList(List lsrv) {
        Lock writeLock = allServerLock.writeLock();
        logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
        
        ArrayList<Server> newServers = new ArrayList<>();
        // 加锁
        writeLock.lock();
        try {
            ArrayList<Server> allServers = new ArrayList<>();
            // 遍历入参
            for (Object server : lsrv) {
                if (server == null) {
                    continue;
                }

                if (server instanceof String) {
                    server = new Server((String) server);
                }

                if (server instanceof Server) {
                    logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                    // 强制类型转换,添加到allServers列表
                    allServers.add((Server) server);
                } else {
                    throw new IllegalArgumentException(
                            "Type String or Server expected, instead found:"
                                    + server.getClass());
                }

            }
            boolean listChanged = false;
            // 如果allServerList的值与allServers不等则说明发生了更新
            if (!allServerList.equals(allServers)) {
                listChanged = true;
                if (changeListeners != null && changeListeners.size() > 0) {
                   List<Server> oldList = ImmutableList.copyOf(allServerList);
                   List<Server> newList = ImmutableList.copyOf(allServers);                   
                   for (ServerListChangeListener l: changeListeners) {
                       try {
                           // 服务列表变更
                           l.serverListChanged(oldList, newList);
                       } catch (Exception e) {
                           logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                       }
                   }
                }
            }
            if (isEnablePrimingConnections()) {
                for (Server server : allServers) {
                    if (!allServerList.contains(server)) {
                        server.setReadyToServe(false);
                        newServers.add((Server) server);
                    }
                }
                // 观察控制台没有走这里
                if (primeConnections != null) {
                    primeConnections.primeConnectionsAsync(newServers, this);
                }
            }
            // This will reset readyToServe flag to true on all servers
            // regardless whether
            // previous priming connections are success or not
            // 这将在所有服务器上将readyToServe标志重置为true,无论以前的启动连接是否成功
            // 覆盖现有的服务器列表
            allServerList = allServers;
            // 能否跳过ping,观察控制台执行下面forceQuickPing()逻辑,也就是走了listChanged = true;处理
            if (canSkipPing()) {
                for (Server s : allServerList) {
                    s.setAlive(true);
                }
                upServerList = allServerList;
            } else if (listChanged) {
                // 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
                forceQuickPing();
            }
        } finally {
            // 释放锁
            writeLock.unlock();
        }
    }

主要逻辑:

  1. 加锁成功,遍历入参即服务列表
  2. 如果是Server类型,则强制类型转换,添加到allServers列表;否则抛异常
  3. 如果allServerList的值与allServers不等则说明发生了更新,发布服务列表变更事件
  4. 覆盖现有的服务器列表。这将在所有服务器上将readyToServe标志重置为true,无论以前的启动连接是否成功
  5. 能否跳过ping,观察控制台执行下面forceQuickPing()逻辑,也就是走了listChanged = true;处理;如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
  6. 释放锁,

3、动态更新服务列表

3.1、enableAndInitLearnNewServersFeature() 启动任务

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

根据自动装配serverListUpdater为PollingServerListUpdater,所以这里委托PollingServerListUpdater的start()方法处理。

3.2、延迟执行任务

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        // CAS确保并发场景只有一个线程进入
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = () ->  {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    // 实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            };

            // 延迟1秒执行,周期性隔30秒
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

主要逻辑:

  1. CAS确保并发场景只有一个线程进入
  2. 实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理,下面分析
  3. 延迟1秒执行,周期性30秒执行任务

3.3、执行更新

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

实际上交给DynamicServerListLoadBalancer的匿名内部类updateAction处理,所以回到了DynamicServerListLoadBalancer。而updateListOfServers()就是我们上面2中分析的流程了。

4、Ping机制

细心观察控制台打印日志,你会发现还有一个周期性任务,复制然后搜索追踪,下面分析细节: 

4.1、启动任务

在1的1.2以及LoadBalancer链条可知会进入到BaseLoadBalancer构造方法做一些初始化工作,内部调用setPingInterval(pingIntervalTime)

    public void setPingInterval(int pingIntervalSeconds) {
        // pingIntervalSeconds默认30秒
        if (pingIntervalSeconds < 1) {
            return;
        }

        // 默认30秒
        this.pingIntervalSeconds = pingIntervalSeconds;
        if (logger.isDebugEnabled()) {
            logger.debug("LoadBalancer [{}]:  pingIntervalSeconds set to {}",
        	    name, this.pingIntervalSeconds);
        }
        // 启动任务
        setupPingTask(); // since ping data changed
    }

    void setupPingTask() {
        // 是否跳过ping
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            // 终止此计时器,放弃任何当前计划的任务。不会干扰当前正在执行的任务(如果存在的话)。
            // 一旦一个计时器被终止,它的执行线程就会优雅地终止,不会再有任何任务被安排在它上面。
            // 注意,从这个计时器调用的a timer 任务的 run 方法中调用这个方法绝对可以保证正在进行的任务执行是这个计时器将要执行的最后一个任务执行。
            // 此方法可以重复调用; 第二次和随后的调用没有效果。
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        // 执行BaseLoadBalancer的内部类PingTask#run(),默认每30秒执行一次
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        // 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。
        forceQuickPing();
    }

主要逻辑:

  1. pingIntervalSeconds默认30秒,setupPingTask()启动任务
  2. 是否跳过ping,如果是return。一般不会跳过
  3. lbTimer不为空,终止此计时器,放弃任何当前计划的任务。不会干扰当前正在执行的任务(如果存在的话)。一旦一个计时器被终止,它的执行线程就会优雅地终止,不会再有任何任务被安排在它上面。 注意,从这个计时器调用的a timer 任务的 run 方法中调用这个方法绝对可以保证正在进行的任务执行是这个计时器将要执行的最后一个任务执行。此方法可以重复调用; 第二次和随后的调用没有效果。
  4. 执行BaseLoadBalancer的内部类PingTask#run(),默认每30秒执行一次
  5. 如果我们当前没有发出信号,并且没有已经安排好的快速信号,则强制立即发出信号。

4.2、任务

    class PingTask extends TimerTask {
        public void run() {
            try {
            	new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

    public void runPinger() throws Exception {
            // CAS确保并发下只有一个线程进来处理,注意并发度不是特别高场景
            if (!pingInProgress.compareAndSet(false, true)) { 
                // CAS失败,说明有线程正在执行任务,就不需要再做了
                return; 
            }
            
            // 我们进去了,我们找到Ping了

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * ReadLock 应该是空闲的,除非 addServer 操作正在进行..。
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                // ping所有的服务,返回是否存活数组
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<>();
                final List<Server> changedServers = new ArrayList<>();

                for (int i = 0; i < numCandidates; i++) {
                    // 是否活跃状态
                    boolean isAlive = results[i];
                    // 取出一个服务
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    // 观察控制台没看到
                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                    		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    // 这里做剔除操作,即活跃的留下来
                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                // 加锁以便并发场景下只有一个线程可以做修改
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                // 释放锁
                upLock.unlock();

                // 发布服务状态变更事件
                notifyServerStatusChangeListener(changedServers);
            } finally {
                // 还原以便下次CAS成功,有点类似于释放锁
                pingInProgress.set(false);
            }
        }
    }

PingTask 是BaseLoadBalancer的内部类,主要逻辑:

  1. CAS确保并发下只有一个线程进来处理,注意并发度不是特别高场景。如果CAS失败,说明有线程正在执行任务,就不需要再做了
  2. CAS成功,加锁。ReadLock 应该是空闲的,除非 addServer 操作正在进行..。赋值,释放锁。
  3. ping所有的服务,返回是否存活数组,下面分析
  4. 遍历服务个数,从是否存活数组results[i]取出一个状态,allServers[i]取出一个服务,如果服务是存活正常的则添加到newUpList;加锁以便并发场景下只有一个线程可以做修改upServerList,释放锁;发布服务状态变更事件
  5. 还原以便下次CAS成功,有点类似于释放锁

4.3、调用ping

    private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* 默认答案是服务挂了。*/
                try {
                    // 注意: IFF 假设我们有一大组服务器(比如说15个) ,我们正在做一个真正的 Ping,
                    // 下面的逻辑将串行运行它们,因此需要15倍的时间来 Ping 每个服务器一个更好的方法是把它放在一个
                    // 执行池,但是,在写这篇文章的时候,我们没有真正使用一个真正的 Ping
                    // (它主要是在内存中的 eureka 调用) ,因此我们可以负担得起简化这个设计和运行这个串行
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }

SerialPingStrategy是BaseLoadBalancer的内部类,主要逻辑:

  1. 从入参获取服务个数,初始化一个boolean类型数组results,长度为numCandidates
  2. 默认答案是服务挂了。
  3. ping不为空,开始ping。注意: IFF 假设我们有一大组服务器(比如说15个) ,我们正在做一个真正的 Ping,下面的逻辑将串行运行它们,因此需要15倍的时间来 Ping 每个服务器一个更好的方法是把它放在一个执行池,但是,在写这篇文章的时候,我们没有真正使用一个真正的 Ping(它主要是在内存中的 eureka 调用) ,因此我们可以负担得起简化这个设计和运行这个串行

在Ribbon中默认使用的是DummyPing,但是在SpringCloud中eureka集成ribbon时EurekaRibbonClientConfiguration配置了NIWSDiscoveryPing,所以覆盖掉了。

4.4、pingServers逻辑

		public boolean isAlive(Server server) {
		    boolean isAlive = true;
		    if (server!=null && server instanceof DiscoveryEnabledServer){
		    	// 强制类型转换
	            DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;	   
	            // 获取服务实例
	            InstanceInfo instanceInfo = dServer.getInstanceInfo();
	            if (instanceInfo != null){	                
	                InstanceStatus status = instanceInfo.getStatus();
	                if (status != null){
	                	// 如果服务实例的状态为UP则isAlive为true,否则false
	                    isAlive = status.equals(InstanceStatus.UP);
	                }
	            }
	        }
		    return isAlive;
		}

主要逻辑:

  1. 强制类型转换为DiscoveryEnabledServer类型
  2. 获取服务实例,实例不为空,获取实例状态
  3. 实例状态不为空,如果服务实例的状态为UP则isAlive为true,否则false

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Maven打Jar包,启动报NoClassDefFoundError错误

今天准备将游戏服务器的压测机器人打包分发给其他人来运行对服务器进行压力测试。打成的jar包发现运行报错了。找了半天才找到最终原因。下面是原因和一些分析的情况。 原因 java -jar .\robot.jar发现错误如下 看到这个错误就知道jvm找不到对应的类。但是为什么找不到对应的…

JVM的内存区域划分

文章目录 前言一、本地方法栈&#xff08;线程私有&#xff09;二、程序计数器&#xff08;线程私有&#xff09;三、Java虚拟机栈&#xff08;线程私有&#xff09;四、堆&#xff08;线程共享&#xff09;五、方法区&#xff08;元数据区&#xff09;前言 JVM 是Java 运行的基…

Android:Navigation使用safe args插件传递参数

Navigation使用safe args插件传递参数1、 使用配置2、举例说明1、MainActivity2、AvalFragment, DovomFragment2.1、AvalFragment2.2、DovomFragment参考1、 使用配置 afe args与传统传参方式相比&#xff0c;好处在于安全的参数类型&#xff0c;并且通过谷歌官方的支持&#…

GameFrameWork框架(Unity3D)使用笔记(六)游戏主流程ProcedureMain——从数据表加载出所需实体

目录 前言&#xff1a; 一、Entity配置表 1、创建数据表 2、创建数据表行类 二、Character配置表 1、创建数据表 2、写数据表行类 三、加载数据表 四、扩展一下Entity模块 五、应用Character数据表的位置信息 六、测试 总结&#xff1a; 前言&#xff1a; 上一篇中我…

第2章 Elasticsearch入门

2.1 Elasticsearch 安装 2 . 1 .1 下载软件 Elasticsearch的官方地址&#xff1a;www.elastic.co/cn/ Elasticsearch最新的版本是7.11.2&#xff08;截止2021.3.10&#xff09;&#xff0c;我们选择7.8.0版本&#xff08;最新版本半年前的版本&#xff09; 下载地址&#x…

贝叶斯网络

贝叶斯网络的独立性&#xff1a; 当一个结点G的父节点已知的时候&#xff0c;该结点G与其所有非后代结点条件独立 交叉因果推断&#xff1a;如上述图中的例子,对于P&#xff08;i | g | d&#xff09;等于说是中D到I 这条路径中&#xff0c;做半边的路径是顺着箭头走的&#x…

表白墙(前端+后端+数据库)

目录 一、创建项目 1、创建maven项目&#xff0c;引入依赖 2、创建目录结构 二、前端代码 1、页面内容和样式 2、提交按钮的点击事件 3、发送GET请求 三、数据库 四、后端代码 1、重写doPost方法 1.1 创建Message类 1.2 重写doPost方法 1.3 实现save方法 2、重写…

你需要知道的50颗卫星:地球卫星清单

开放数据卫星 1陆地卫星 地球资源卫星令人难以置信的长期遗产已经保存了地球40多年的历史。通过无数的应用程序&#xff0c;它甚至发现 island Landsat in Canada。 图片来源&#xff1a;NASA 2哨兵 作为 Copernicus Programme 哨兵的6个任务的舰队是一个游戏改变者。明确地…

2022年经典散文:滚烫的石板

滚烫的石板 ——灵遁者 此刻&#xff0c;我想表达的情愫大概有千万种&#xff0c;如何表达并不容易&#xff0c;就好像一个人的时候&#xff0c;也在面对某个我认识或者不认识的人&#xff0c;话总是说不清&#xff0c;也说不出来。 小孩总是敢于表达的&#xff0c;就像一条没…

.NET 7 的 AOT 到底能不能扛反编译?

一&#xff1a;背景 1.讲故事 在B站&#xff0c;公众号上发了一篇 AOT 的文章后&#xff0c;没想到反响还是挺大的&#xff0c;都称赞这个东西能抗反编译&#xff0c;可以让破解难度极大提高&#xff0c;可能有很多朋友对逆向不了解&#xff0c;以为用 ILSpy,Reflector,DnSpy…

群勃龙-半琥珀酸酯(TR-HS)与BSA牛血清白蛋白偶联 TR-HS-BSA

产品名称&#xff1a;群勃龙-半琥珀酸酯与牛血清白蛋白偶联 英文名称&#xff1a;TR-HS-BSA 用途&#xff1a;科研 状态&#xff1a;固体/粉末/溶液 产品规格&#xff1a;1g/5g/10g 保存&#xff1a;冷藏 储藏条件&#xff1a;-20℃ 储存时间&#xff1a;1年 牛血清中的简单蛋白…

全光谱台灯对孩子眼睛有影响吗?什么样的全光谱台灯真的有用

全光谱台灯对眼睛当然是有影响的&#xff0c;因为光谱成分丰富度与太阳光类似&#xff0c;所以无限接近于太阳光的显色能力&#xff0c;这样的灯光下物体的色差如同沐浴太阳光一般真实&#xff0c;色差不失真&#xff0c;人眼自然就越舒服。 那么什么样的全光谱台灯有用呢&…

Python|excel表格数据一键转json格式小工具|支持xlsx、xls格式转json|【源码+解析】

背景 最近在使用JavaScript编写一些浏览器RPA脚本&#xff0c;脚本使用过程中遇到一些问题&#xff0c;脚本使用的数据往往存放在excel表&#xff0c;但运行时只能读取json数据&#xff0c;导致频繁人工excel转json&#xff0c;效率低下。 遇到问题后赶紧搜索excel转json小工…

基于PHP+MySQL药品信息查询系统(含论文)

本系统阐述了医药信息查询系统的开发过程,并对该系统的需求分析及系统需要实现的设计方法作了介绍。该系统的基本功能包括用户注册登录,查看医药资讯,医药查询和在线留言等信息。 本系统技术介绍:php,mysql,apache,notepad,sublime.运行环境wamp,PHPstudy,xammp等php集成环境. …

FastAPI使用typing类型提示

typing是Python标准库&#xff0c;用来做类型提示。FastAPI使用typing做了&#xff1a; 编辑器支持&#xff1b; 类型检查&#xff1b; 定义类型&#xff0c;request path parameters, query parameters, headers, bodies, dependencies等等&#xff1b; 类型转换&#xff1…

去中心化应用的终极是应用链?

互操作性是近期在Web3兴起的概念&#xff0c;是指不同的计算机系统、网络、操作系统和应用程序一起工作并共享信息的能力。随着链上通信、语义交互逐渐复杂&#xff0c;链上用户多样的需求已然超出应用在单条链可承受的技术能力。 原本视作创新实验的Web3应用逐渐被公众接纳&am…

初识变量和数据类型

JavaScript第2天 输入输出语句 输出语句 alert(变量) > 弹出document.write(变量) > 输出在页面上面console.log(变量) > 打印在控制台上 /* JS的输出语句 */ alert("弹出") document.write("直接在写页面上面") console.log("打印在控制…

MacOS 如何选择鼠标不飘滚动平滑

MacOS 如何选择鼠标不飘滚动平滑 前言 今天不务正业的聊聊 macos 下的鼠标的事情&#xff0c;群里也有朋友和我聊&#xff0c;正好说说这事。 我在很长的时间里都在用 macbook pro 的触控板 键盘的高效模式&#xff0c;因为触控板和键盘很近所以效率很高。 但是有一个问题就是…

set和multiset容器

1、基本概念 所有元素在插入时会自动排好序&#xff1b; 属于关联式容器&#xff0c;底层结构是用二叉树实现的 2、set和multiset的区别 set中不允许有重复元素&#xff0c;multiset允许有重复元素。 3、构造和赋值 构造&#xff1a; set<T>st; //默认构造 set&l…

Linux | 进程间通信 | 匿名管道 | 命名管道 | 模拟代码实现进程通信

文章目录进程通信的意义匿名管道通信原理管道的访问控制进程控制管道的特点命名管道进程通信的意义 之前聊进程时&#xff0c;讲过一个性质&#xff0c;即进程具有独立性&#xff0c;两个进程之间的交互频率是比较少的。就连父子进程也只是共享代码&#xff0c;修改父子进程中…