版本 awsVersion = ‘1.11.277’
EurekaTransport 用于客户端和服务端之间进行通信,封装了以下接口的实现:
- ClosableResolver 接口实现
- TransportClientFactory 接口实现
- EurekaHttpClient 接口实现及其对应的 EurekaHttpClientFactory 接口实现
private static final class EurekaTransport {
    // 服务端集群解析器,用于获取服务端列表
    private ClosableResolver bootstrapResolver;
    // 底层工厂,用于创建具体通信协议的实现
    private TransportClientFactory transportClientFactory;
    // EurekaHttpClient实现,用于注册实例、取消注册、续约
    private EurekaHttpClient registrationClient;
    // 上层工厂,用于创建registrationClient
    private EurekaHttpClientFactory registrationClientFactory;
    // EurekaHttpClient实现,用于全量拉取、增量拉取服务列表
    private EurekaHttpClient queryClient;
    // 上层工厂,用于创建queryClient
    private EurekaHttpClientFactory queryClientFactory;
}
EurekaHttpClientFactory 是上层工厂接口(A top level factory interface),用于创建 EurekaHttpClientDecorator 对象(to create http clients for application/eurekaClient use)。
public interface EurekaHttpClientFactory {
    EurekaHttpClient newClient();
    void shutdown();
}
例如 DiscoveryClient # scheduleServerEndpointTask 方法:
newRegistrationClientFactory = EurekaHttpClients
    .registrationClientFactory(
        eurekaTransport.bootstrapResolver,
        eurekaTransport.transportClientFactory,
        transportConfig
    );
newRegistrationClient = newRegistrationClientFactory.newClient();
TransportClientFactory 是底层工厂接口(A low level client factory interface),用于创建 JerseyApplicationClient(Eureka 原生实现)、Jersey2ApplicationClient(Eureka 原生实现)、RestTemplateEurekaHttpClient(SpringCloud 实现)等具体协议相关的实现。底层工厂创建的对象不建议直接使用(Not advised to be used by top level consumers),需要经过上层工厂加工。
public interface TransportClientFactory {
    EurekaHttpClient newClient(EurekaEndpoint serviceUrl);
    void shutdown();
}
例如 DiscoveryClient # scheduleServerEndpointTask 方法:
TransportClientFactories transportClientFactories = 
    argsTransportClientFactories == null
        ? new Jersey1TransportClientFactories()
        : argsTransportClientFactories;
        
EurekaHttpClient 接口,用于客户端和服务端之间进行通信,封装了注册、取消注册、续约、拉取服务列表等一系列操作。
public interface EurekaHttpClient {
    // 注册实例(服务注册)
    EurekaHttpResponse<Void> register(InstanceInfo info);
    // 取消注册(服务下线)
    EurekaHttpResponse<Void> cancel(String appName, String id);
    // 发送心跳(服务续约)
    EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
    // 更新实例的服务状态InstanceStatus(服务状态更新)
    EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);
    // 删除实例的覆盖状态,overriddenStatus将变成UNKNOWN
    EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);
    // 获取指定区域regions的注册表(全量获取)
    EurekaHttpResponse<Applications> getApplications(String... regions);
    // 获取指定区域regions的注册列表(增量获取)
    EurekaHttpResponse<Applications> getDelta(String... regions);
    EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
    EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions);
    // 获取指定应用的实例列表
    EurekaHttpResponse<Application> getApplication(String appName);
    // 获取指定实例
    EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
    // 获取指定实例
    EurekaHttpResponse<InstanceInfo> getInstance(String id);
    void shutdown();
}

 HttpReplicationClient 用于服务端和服务端之间进行通信,在 EurekaHttpClient 的基础上封装了更新服务端实例状态、批量同步数据等操作。
public interface HttpReplicationClient extends EurekaHttpClient {
    // 更新服务端实例的状态
    EurekaHttpResponse<Void> statusUpdate(String asgName, ASGStatus newStatus);
    // 向其他服务端批量同步数据
    EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList);
}
Eureka 服务端集群节点是对等节点(peerNode),对等节点之间进行数据同步会产生循环问题和数据冲突问题。
- 对于循环问题,Eureka 使用 Http 请求头 x-netflix-discovery-replication表示是否是服务端同步数据操作,如果是服务端同步数据,就不会再继续向其他服务端进行同步数据。
// com.netflix.eureka.resources.InstanceResource
@PUT
public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    // ...
}
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
private void replicateToPeers(Action action,
                              String appName,
                              String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */,
                              boolean isReplication) {
    // ...
    // If it is a replication already,
    // do not replicate again as this will create a poison replication
    if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
        return;
    }
    // ...
}
- 对于数据冲突问题,Eureka 通过比较 ReplicationInstance 类的 lastDirtyTimestamp属性解决。lastDirtyTimestamp 是服务实例 InstanceInfo 的属性,表示服务实例最近一次变更时间。
例如 Server A 向 Server B 同步数据:
- 如果 Server A 的数据比 Server B 的新,Server B 返回 Status.NOT_FOUND,Server A 重新将该服务实例注册到 Server B
- 如果 Server A 的数据比 Server B 的旧,Server B 返回 Status.CONFLICT,要求 Server A 同步 Server B 的数据
// com.netflix.eureka.resources.InstanceResource # renewLease
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                            boolean isReplication) {
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if (appInfo != null) {
        if ((lastDirtyTimestamp != null) 
            && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
            Object[] args = {
                                id,
                                appInfo.getLastDirtyTimestamp(),
                                lastDirtyTimestamp,
                                isReplication
                            };
            if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                logger.debug(
                    "Time to sync, " + 
                    "since the last dirty timestamp differs - " + 
                    "ReplicationInstance id : {}," + 
                    "Registry: {} Incoming: {} Replication: {}",
                    args);
                return Response.status(Status.NOT_FOUND).build();
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                // In the case of replication, 
                // send the current instance info in the registry 
                // for the replicating node to sync itself with this one.
                if (isReplication) {
                    logger.debug(
                        "Time to sync, " + 
                        "since the last dirty timestamp differs - " + 
                        "ReplicationInstance id : {}," + 
                        "Registry: {} Incoming: {} Replication: {}",
                        args);
                    return Response.status(Status.CONFLICT)
                                   .entity(appInfo)
                                   .build();
                } else {
                    return Response.ok().build();
                }
            }
        } // end if ((lastDirtyTimestamp != null)
    }// end if (appInfo != null)
    return Response.ok().build();
}
抽象类 EurekaHttpClientDecorator 使用装饰者模式为 EurekaHttpClient 添加新的功能。

-  SessionedEurekaHttpClient为 EurekaHttpClient 设置随机的会话时间,超过会话时间则调用 EurekaHttpClientFactory 的 newClient 方法创建一个新的 EurekaHttpClient 执行请求。随机的会话时间在配置sessionedClientReconnectIntervalSeconds的0.5-1.5之间。
-  RetryableEurekaHttpClient为 EurekaHttpClient 提供重试功能,默认重试3次。当服务端响应异常时,将服务端添加到quarantineSet中,并从ClusterResolver解析得到的服务端列表中移除 quarantineSet ,选择其他的服务端创建一个新的 EurekaHttpClient 执行请求,超过重试次数则抛出 TransportException 异常。
-  RedirectingEurekaHttpClient为 EurekaHttpClient 提供重定向功能,默认可重定向次数10次。当服务端响应 302时,获取响应中的targetUrl,创建一个新的 EurekaHttpClient 执行请求,超过重定向次数则抛出 TransportException 异常。
-  MetricsCollectingEurekaHttpClient为 EurekaHttpClient 提供指标收集功能,用于集成 Netflix Servo 监控组件,统计各类型请求的耗时以及不同响应码和异常的计数。


















