【Eureka】【源码+图解】【八】Eureka客户端的服务获取

news2025/1/9 16:37:41

【Eureka】【源码+图解】【七】Eureka的下线功能

目录

  • 7. 获取服务
    • 7.1 初始化HeartBeat的task
    • 7.2 将task进一步包装成定时timerTask
    • 7.3 定时时间到,执行timeTask
    • 7.4 task获得线程资源,执行refreshRegistry()
    • 7.5 服务端接受请求
    • 7.6 获取Applications

7. 获取服务

整体流程如下:
在这里插入图片描述

7.1 初始化HeartBeat的task

public class DiscoveryClient implements EurekaClient {
    private final ThreadPoolExecutor cacheRefreshExecutor;
    
    DiscoveryClient(...) {
        ......
        // 1. 初始化获取服务的线程池
        cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, 
            // eureka.client.cacheRefreshExecutorThreadPoolSize,注册信息更新最大线程数,默认2
            clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );
        ......
    }
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // eureka.client.registryFetchIntervalSeconds,更新间隔,默认30秒
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            // eureka.client.cacheRefreshExecutorExponentialBackOffBound,更新超时最大倍数,默认10
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 3. 初始化获取服务的task
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            // 4. 开启定时任务
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        ......
    }
    
    // 2. 定义获取服务的线程
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
}

7.2 将task进一步包装成定时timerTask

public class TimedSupervisorTask extends TimerTask {
    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler; // 定时调度器
        this.executor = executor; // 任务执行线程池
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task; // 具体的task,即CacheRefreshThread
        this.delay = new AtomicLong(timeoutMillis); // 定时时间
        this.maxDelay = timeoutMillis * expBackOffBound; // 最大定时时间
        ......
    }
}

7.3 定时时间到,执行timeTask

public class TimedSupervisorTask extends TimerTask {
    @Override
    public void run() {
        Future<?> future = null;
        try {
            // 1. 将CacheRefreshThread提交到线程池,并用future接收结果
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 2. 阻塞等待结果,等待时间eureka.client.registryFetchIntervalSeconds
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);
            // 3. 设置下一次执行任务的时间
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            timeoutCounter.increment();
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            // 等待结果超时,下一次执行任务时间为2*currentDelay
            // 最大延时为eureka.client.registryFetchIntervalSeconds * eureka.client.cacheRefreshExecutorExponentialBackOffBound
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            rejectedCounter.increment();
        } catch (Throwable e) {
            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }
            if (!scheduler.isShutdown()) {
                // 4. 定时下一次任务
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

7.4 task获得线程资源,执行refreshRegistry()

public class DiscoveryClient implements EurekaClient {
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
    @VisibleForTesting
    void refreshRegistry() {
        try {
            ......
            // 1. 动态获取最新的RemoteRegions
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            ......
            // 2. 获取最新的服务实例
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        ......
            // 3. 获取最新的服务实例并更新到本地
                getAndStoreFullRegistry();
            ......
            applications.setAppsHashCode(applications.getReconcileHashCode());
        ......
        return true;
    }
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
        // 4. 发送http请求到服务端
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }

        if (apps == null) {
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 5. 过滤掉状态非UP的实例并更新到localRegionApps,缓存
            localRegionApps.set(this.filterAndShuffle(apps));
        } else {
        }
    }
}

7.5 服务端接受请求

public class ApplicationsResource {
    @Inject
    ApplicationsResource(EurekaServerContext eurekaServer) {
        this.serverConfig = eurekaServer.getServerConfig();
        this.registry = eurekaServer.getRegistry();
        // 缓存为AbstractInstanceRegistry.responseCache
        this.responseCache = registry.getResponseCache();
    }
    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {
        
        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            // 1. 设置regions
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        // 2. 设置Version,默认V2
        CurrentRequestVersion.set(Version.toEnum(version));
        // 3. 设置keyType
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        // 4. Key的唯一性确定请参考前文(3.2.2.1节)
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
        // 5. 从responseCache获取值,客户端默认getGZIP
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        CurrentRequestVersion.remove();
        return response;
    }
}

7.6 获取Applications

public class ResponseCacheImpl implements ResponseCache {
    public byte[] getGZIP(Key key) {
        // shouldUseReadOnlyResponseCache = eureka.server.useReadOnlyResponseCache, 默认true
        Value payload = getValue(key, shouldUseReadOnlyResponseCache);
        if (payload == null) {
            return null;
        }
        return payload.getGzipped();
    }
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            // 默认true,不管是true或false,第一次获取都会走readWriteCacheMap.get(key)
            // 第一次需要load,因此会走到generatePayload,见(3.2.2.1节),不再赘述
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }
}

关于Eureka的分析到此告一段落,接下来开始学习LoadBalancer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/55448.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spark - RDD 的分区和Shuffle

一、RDD 的分区 前面在学习 MapReduces 的时候就提到分区&#xff0c;在RDD中同样也存在分区的概念&#xff0c;本质上都是为了提高并行度&#xff0c;从而提高执行的效率&#xff0c;那在 Spark 中的分区该怎么设置呢&#xff1f; 首先分区不是越多越好&#xff0c;太多意味…

[Android]Logcat调试

Android采用Log(android.util.log)工具打印日志&#xff0c;它将各类日志划分为五个等级。 Log.e 打印错误信息 Log.w 打印警告信息 Log.i 打印一般信息 Log.d 打印调试信息 Log.v 打印冗余信息 不同等级的日志信息&#xff0c;在日志栏中会以不同颜色和等级(E、W、…

(附源码)ssm医院挂号系统 毕业设计 250858

医院挂号系统的设计与实现 摘 要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于站在的角度存在偏差&#xff0c;人们经常能够获得不同类型信息&#xff0c;这也是技术最为难以攻克的课题。针对医院排队挂号等问…

深入理解 Python 描述符

学习 Python 这么久了&#xff0c;说起 Python 的优雅之处&#xff0c;能让我脱口而出的&#xff0c; Descriptor&#xff08;描述符&#xff09;特性可以排得上号。 描述符 是Python 语言独有的特性&#xff0c;它不仅在应用层使用&#xff0c;在语言语法糖的实现上也有使用到…

【java基础系列】16- Java中怎么处理异常?

Java的异常处理 1、异常的概念 概念&#xff1a;程序在运行过程中出现的不正常现象。出现异常不处理将终止程序运行。异常处理的必要性&#xff1a;任何程序都可以存在大量的未知问题、错误&#xff1b;如果不对这些问题进行正确处理&#xff0c;则可能导致程序的中断&#x…

[附源码]Python计算机毕业设计SSM开放性实验室网上预约管理(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

分布式和可再生系统建模(simulink)

目录 1 可再生/微电网概述 1.1 分布式和可再生系统建模和仿真 1.2 解决工作流程 1.3 能源管理系统监控设计 1.4 验证控件的测试网格规范和IEEE标准 1.5 部署算法和代码生成 1.6 网格集成研究控制器的实时测试 2 微电网案例 2.1 简介 2.2 在 Simulink 中实现微电网…

【JavaEE】初识操作系统,吃透进程

✨哈喽&#xff0c;进来的小伙伴们&#xff0c;你们好耶&#xff01; &#x1f6f0;️&#x1f6f0;️系列专栏:【JavaEE】 ✈️✈️本篇内容:初始操作系统&#xff0c;进程的概念 &#x1f680;&#x1f680;代码存放仓库gitee&#xff1a;JavaEE初阶代码存放&#xff01; ⛵⛵…

关于AM437x Linux+qt开发笔记(2)

第一部分,触摸屏 命令 lsinput (有些系统不移植)查看系统input实践 命令ox -d /dev/input/event1 或hexdump -d /dev/input/event1 (从上图看,event0没有接的触屏) ,点击屏幕如下 命令 ls /dev/input -al , 查看input设备的触摸屏软连接。 命令cat /etc/udev/ru…

编码与测试

文章目录一、编码1、概念2、如何选择程序设计语言3、程序设计风格&#xff08;1&#xff09;源程序文档化(2) 数据说明&#xff08;3&#xff09;语句构造&#xff08;4&#xff09;输入输出&#xff08;5&#xff09;程序效率编码时提高程序运行效率的主要规则二、软件测试基础…

【爬坑之路一】windows系统下更新升级node版本【亲测有效】

前言 一定要看到最后&#xff01;&#xff01;&#xff01; 项目开发中&#xff0c;需要升级 node 版本。本着不想卸载 node 再重新安装的原则&#xff0c;因为node 的环境配置以及各种相关配置有些繁琐&#xff0c;所以就想着使用 命令的方式进行升级。 在网上找了一些升级 n…

C#详解:程序域、程序集、模块、Type、反射

总结&#xff1a; ">>>":代表包含 进程>>>应用程序域AppDomain>>>程序集Assembly>>>模块Module>>>类型Type>>>成员&#xff08;方法、属性等&#xff09; 1、程序集Assembly 如图&#xff0c;假设一个解决方…

ARM 自己动手安装交叉编译工具链

一、Windows中装软件的特点 Windows中装软件使用安装包&#xff0c;安装包解压后有2种情况&#xff1a; 一种是一个安装文件&#xff08;.exe .msi&#xff09;&#xff0c;双击进行安装&#xff0c;下一步直到安装完毕。安装完毕后会在桌面上生成快捷方式&#xff0c;我们平…

(附源码)ssm招聘网站 毕业设计 250858

SSM招聘网站 摘 要 招聘网站采用B/S结构、java开发语言、以及Mysql数据库等技术。系统主要分为管理员、用户、企业三部分&#xff0c;管理员管理主要功能包括&#xff1a;首页、站点管理&#xff08;轮播图、公告栏&#xff09;用户管理&#xff08;管理员、应聘用户、企业用户…

重点| 系统集成项目管理工程师考前50个知识点(2)

本文章总结了系统集成项目管理工程师考试背记50个知识点&#xff01;&#xff01;&#xff01; 帮助大家更好的复习&#xff0c;希望能对大家有所帮助 比较长&#xff0c;放了部分&#xff0c;需要可私信&#xff01;&#xff01; 11、项目目标包括成果性目标和约束性目标。项…

直播倒计时,PyTorch Conference 2022 今晚开启

内容一览&#xff1a;PyTorch Conference 2022 即将在美国南部城市新奥尔良举办。本次会议将带来技术讲座、项目演示及最佳案例分享。 本文首发自微信公众号&#xff1a;PyTorch 开发者社区 关键词&#xff1a;PyTorch 深度学习 机器学习 PyTorch Conference 2022 今晚开启 自…

360crawlergo结合xray被动扫描

360crawlergo结合xray被动扫描 360crawlergo结合xray被动扫描安装配置 360crawlergo结合xray被动扫描 安装 Xray https://github.com/chaitin/xraycrawlergo_x_XRAY https://github.com/timwhitez/crawlergo_x_XRAYcrawlergo https://github.com/0Kee-Team/crawlergo 更多的…

高等数值计算方法学习笔记第7章【非线性方程组求根】

高等数值计算方法学习笔记第7章【非线性方程组求根】一、方程求根与二分法&#xff08;第五次课&#xff09;二、不动点迭代法及其收敛性1.不动点迭代与不动点迭代法&#xff08;一个例题&#xff09;2.不动点的存在性与迭代法的收敛性&#xff08;两个定理&#xff0c;两例题&…

计算机网络——常考的面试题

什么是TCP/IP&#xff1f; TCP建立连接为什么要三次握手&#xff1f;断开连接为什么要四次挥手&#xff1f; SSL/TSL握手过程&#xff1f; 1、网络分层模型 OSI&#xff1a;全称叫Open System Interconnection (开放式系统互联)&#xff0c;是国际标准化组织ISO制定的理论模…

【软件测试】面试老约不到?软件测试简历项目经验怎么写?论项目经验的重要性......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 随着就业竞争越来越…