目录
一、前言
1、简述
2、SpringCloudCommons 项目
二、客户端服务注册
1、流程图
2、入口
2.1、客户端注册引入依赖
3、EurekaServiceRegistry服务注册机
3.1、EurekaServiceRegistry注册逻辑
4、ApplicationInfoManager
4.1、setInstanceStatus(InstanceStatus status)逻辑
5、DiscoveryClient核心组件
5.1、状态变更者
6、InstanceInfoReplicator组件
2)启动复制任务
6.1、onDemandUpdate()按需更新逻辑
6.2、run()任务逻辑注册微服务
7、DiscoveryClient的注册逻辑
8、发送HTTP请求注册中心完成微服务注册
三、服务端服务注册
1、InstanceRegistry
2、缓存到注册中心本地
一、前言
1、简述
本篇文章要探讨的问题如下:服务注册发现的抽象(这个提供了方向)?Eureka微服务注册的入口?Future模式的使用、调度线程池使用、周期性任务、令牌桶限流、加锁、取消了如何恢复?当然,提出了这些问题,都是下面涉及到的。
2、SpringCloudCommons 项目
SpringCloudCommons 提供了两个库的特性: SpringCloudContext 和 SpringCloudCommons。Spring Cloud Context 为 Spring Cloud 应用程序的 ApplicationContext 提供实用工具和特殊服务(引导上下文、加密、刷新范围和环境端点)。SpringCloudCommons 是在不同的 SpringCloud 实现中使用的一组抽象和公共类(Nacos、Spring Cloud Netflix、Spring Cloud Consul都是基于它实现微服务注册和发现的)。
二、客户端服务注册
1、流程图(自行补充)
2、入口
2.1、客户端注册引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
如果想了解SpringCloud项目搭建请看SpringCloud微服务第1章之项目搭建
是不是很熟悉,同样的配方,跟Nacos客户端注册核心相关类前缀不一样,还有SpringBoot自动装配相关的META-INF下的spring.factories文件。关于集成这方面前面的文章都详细分析了,这里就不会过多赘述,感兴趣的读者可以回头巩固一下,这里我们就直奔主题了。
3、EurekaServiceRegistry服务注册机
Registration和ServiceRegistry是spring-cloud-commons项目下的,Registration用来维护需要注册的服务信息,ServiceRegistry抽象了微服务注册、发现等,EurekaServiceRegistry是spring-cloud-netflix-eureka-client下的服务注册机。
3.1、EurekaServiceRegistry注册逻辑
EurekaServiceRegistry的初始化是SpringBoot的自动装配完成的,触发注册逻辑也是这个时候开始的。
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 设置实例状态,开始注册
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 注册健康检查
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
initialStatus为向远程 Eureka 服务器注册的初始状态,默认InstanceStatus.UP,在EurekaInstanceConfigBean中维护。setInstanceStatus()设置实例的状态,里面会触发注册逻辑。点击跟进到下面的处理逻辑:
4、ApplicationInfoManager
下面的一些组件都是在原生的Eureka项目。
4.1、setInstanceStatus(InstanceStatus status)逻辑
public synchronized void setInstanceStatus(InstanceStatus status) {
// UP
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
// STARTING
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
// 自动装配流程已经初始化
for (StatusChangeListener listener : listeners.values()) {
try {
// 通知状态变更
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
启动注册中心再启动一个微服务,
会发现StatusChangeEvent的两个状态值,说明执行了这里的逻辑;而这个日志是下面的方法打印的,即这里调用了下面的方法,这就意味着:next为UP,prev为STARTING。调用监听器的通知方法,发布状态变更事件。
1)setStatus(InstanceStatus status)
public synchronized InstanceStatus setStatus(InstanceStatus status) {
if (this.status != status) {
InstanceStatus prev = this.status;
this.status = status;
setIsDirty();
return prev;
}
return null;
}
该方法在InstanceInfo维护,方法逻辑:如果设置了与当前状态不同的状态,则返回 prev 状态,否则返回 null。
2)InstanceStatus服务实例状态枚举类
public enum InstanceStatus {
UP, // Ready to receive traffic准备接收通讯
DOWN, // Do not send traffic- healthcheck callback failed不要发送交通健康检查回调失败
STARTING, // Just about starting- initializations to be done - do not send traffic
// 刚刚开始-初始化要做-不要发送流量
OUT_OF_SERVICE, // Intentionally shutdown for traffic故意关闭交通
UNKNOWN;
public static InstanceStatus toEnum(String s) {
if (s != null) {
try {
return InstanceStatus.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
// ignore and fall through to unknown
logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
}
}
return UNKNOWN;
}
}
该类同样是在InstanceInfo维护:
- UP, // 运行中状态,准备接收通讯
- DOWN, // 下线状态,不要发送交通健康检查回调失败
- STARTING, // 初始化状态,刚刚开始-初始化不要发送流量
- OUT_OF_SERVICE, // 故意关闭交通
- UNKNOWN; // 未知状态
5、DiscoveryClient核心组件
5.1、状态变更者
它是在DiscoveryClient初始化时初始化的。
// 状态变更监听者
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
// Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING]
logger.info("Saw local status change event {}", statusChangeEvent);
instanceInfoReplicator.onDemandUpdate();
}
};
// 初始化状态变更监听者
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
这个是Java匿名内部类的,ApplicationInfoManager维护了监听器的缓存注册等,故在4.1中调用监听器的方法来到这里的处理逻辑。这里打印日志,如果你开始了日志打印可以在控制台看到。我们点击继续跟踪到下面逻辑:
6、InstanceInfoReplicator组件
InstanceInfoReplicator实现了Runnable接口,在DiscoveryClient初始化时初始化,职责:更新本地instanceInfo并将其复制到远程服务器的任务:
- 配置为单个更新线程,以保证对远程服务器的顺序更新
- 更新任务可以通过onDemandUpdate()按需调度
- 任务处理的速率受到BurstSize(默认2)的限制
- 新的更新任务总是在早期更新任务之后自动安排。但是,如果启动了 按需任务,计划的自动更新任务将被丢弃(并且将在 新的按需 更新之后安排新的任务)。
1)初始化
// InstanceInfo replicator实例信息复制任务
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 状态变更监听者
.....省略.......
// 定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
构建实例信息复制任务,定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册。
2)启动复制任务
public void start(int initialDelayMs) {
// 上面默认false,可见CAS成功
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
// 调度执行任务(自己的run()逻辑)
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
新的更新任务总是在早期更新任务之后自动安排。但是,如果启动了 按需任务,计划的自动更新任务将被丢弃(并且将在 新的按需 更新之后安排新的任务)。如下面:
6.1、onDemandUpdate()按需更新逻辑
public boolean onDemandUpdate() {
// 令牌桶算法进行限流
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
// 执行器未关闭
if (!scheduler.isShutdown()) {
// 提交任务
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
// isDone()如果此任务完成,则返回 true。观察控制台初始化时可见是false
if (latestPeriodic != null && !latestPeriodic.isDone()) {
// 取消最新的预定更新,将在onDemandUpdate()结束时重新调度
// 由于scheduler只有一个核心线程,而start(int initialDelayMs)中使用了,故这里取消
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
// 取消但是不中断
latestPeriodic.cancel(false);
}
// 执行自己的run逻辑
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
主要逻辑:
- RateLimiter基于令牌桶算法进行限流(令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务)。
- 执行器未关闭,提交任务
- 取消最新的预定更新,将在onDemandUpdate()结束时重新调度。由于scheduler只有一个核心线程,而start(int initialDelayMs)中使用了,故这里取消
- 执行自己的run逻辑,下面分析
6.2、run()任务逻辑注册微服务
public void run() {
try {
// 刷新本地服务实例信息
discoveryClient.refreshInstanceInfo();
// start中初始化时设置了isInstanceInfoDirty为true,所以dirtyTimestamp不为空
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
// 实例信息复制器出了点问题
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 恢复复制,默认实例信息复制器随需应变允许的每分钟更新率为4
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
主要逻辑:
- 刷新本地服务实例信息
- 请求注册中心注册
- 恢复复制,默认实例信息复制器随需应变允许的每分钟更新率为4。如果onDemandUpdate()又被执行,还是可能被丢弃的。
7、DiscoveryClient的注册逻辑
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
// 请求注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
// 返回注册是否成功
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
兜个圈又回来了!!!调用与注册中心通信类的注册方法,返回是否注册成功。
8、发送HTTP请求注册中心完成微服务注册
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// 拼接URL
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
// 获取resourceBuilder
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
// 发送post请求注册中心
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
主要逻辑:
- 拼接请求注册中心URL
- 获取resourceBuilder,这个是sun公司提供的
- 发送post请求注册中心,返回响应结果
三、服务端服务注册
1、InstanceRegistry
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
handleRegistration(info, leaseDuration, isReplication);
super.register(info, leaseDuration, isReplication);
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
这里的逻辑不多,两个注册方法,通过super委托PeerAwareInstanceRegistryImpl父类AbstractInstanceRegistry处理。
AbstractInstanceRegistry抽象服务实例注册机,我们下面看看它的注册逻辑:
2、缓存到注册中心本地
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
// 加锁
read.lock();
try {
// 尝试从本地获取
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 本地没有则缓存起来
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 如果已经有租约,则保留最后一个脏时间戳而不覆盖它
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 租约不存在,因此是一个新的注册
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
// 因为客户端想要注册它,所以增加发送更新的客户端的数量
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
// 修改数据保存到最近队列 用于客户端增量更新
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
// 修改数据保存到最近队列 用于客户端增量更新
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 释放锁
read.unlock();
}
}
主要逻辑:
- 加锁,尝试从本地获取看下有没有注册了
- 本地registry(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>)没有则重新缓存起来
- 如果已经有租约,则保留最后一个脏时间戳而不覆盖它;否则租约不存在,因此是一个新的注册,因为客户端想要注册它,所以增加发送更新的客户端的数量。
- 修改数据保存到最近队列 用于客户端增量更新