微服务框架
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】
微服务面试篇
文章目录
- 微服务框架
- 微服务面试篇
- 54 微服务篇
- 54.5 Nacos与Eureka的区别有哪些?【接口方式、实例类型、健康检测】
- 54.5.1 Nacos 客户端源码
54 微服务篇
54.5 Nacos与Eureka的区别有哪些?【接口方式、实例类型、健康检测】
54.5.1 Nacos 客户端源码
先来直接看 看 客户端是 如何完成服务注册的
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
首先这个依赖要引入 进来
看看这个依赖
首先它是 一个starter,实现了 自动装配,
以前学过 自动装配 会去读取
这个文件,看看
里面我们 现在要研究的就是 NacosServiceRegistryAutoConfiguration
服务 注册 的自动装配
点进去看看
可以看到, 它是一个 配置类嘛
里面注册了 很多很多的bean
其中
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
这个类 就是用来完成 服务的注册 的
然后下面 还有一个
自动服务注册,平时我们一 启动服务就完成注册 ,就是依靠 这个类 来完成的
OK,进到 NacosServiceRegistry 这个类
进到这个 registerInstance 这个方法
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 检查instance 是否合法
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 临时实例才做 心跳,永久实例 不做心跳
if (instance.isEphemeral()) {
// 心跳
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 服务注册
serverProxy.registerService(groupedServiceName, groupName, instance);
}
看看 心跳的发送
官方文档:https://nacos.io/zh-cn/docs/open-api.html
对应代码中
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
就是这样 心跳就发出去 了
客户端 心跳发送完成后 ,也是由 InstanceController 进行接收
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
// 从request 中解析心跳信息
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 通过namespaceId、serviceName、clusterName、ip、port 从注册表中获取Instance 对象
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
if (instance == null) {
// 心跳的实例不存在,需要重新注册
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 获取对应的注册表 中的Service 信息
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
//处理服务中 的实例心跳
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
//处理服务中 的实例心跳
service.processClientBeat(clientBeat);
注意这个
我们 跟进 这个方法
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
OK,再跟进 scheduleNow 这个方法
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}
可以看到 又是线程池,又是异步执行
跟进ClientBeatProcessor 这个类
里面有个 run 方法
@Override
public void run() {
// 得到服务
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
// 获取注册表中 的对应集群
Cluster cluster = service.getClusterMap().get(clusterName);
// 得到 集群中的实例列表
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
//判断实例的ip、port 是否与心跳实例的ip、port 一致,如果一致,则证明是当前实例的心跳
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 更新最后一次 心跳的时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
// 发布服务状态 变更的事件
getPushService().serviceChanged(service);
}
}
}
}
}
这就完成了 心跳实例的修改,当然这都是 服务端的处理,
现在但是,如果现在有一个服务 的心跳不正常,那要何时去把这个实例 给干掉呢?
回到我们当时的服务注册 逻辑
跟进这个方法
进入第一个 创建空 服务的方法
注意这个初始化
跟进这个方法
跟进这个 init 【这个方法 就是在初始化 对某个 服务的健康检测】
public void init() {
// 初始化服务,开启心跳的健康检测
HealthCheckReactor.scheduleCheck(clientBeatCheckTask); // 就是这行
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
// 集群 初始化
entry.getValue().init();
}
}
scheduleCheck 跟进这个方法
没啥问题,心跳检查,周期也在
跟进 ClientBeatCheckTask 这个 任务的run 方法
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 获取所有实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
// 遍历实例【目的是 标记不健康实例】
for (Instance instance : instances) {
// 判断 实例的最后一次心跳时间 距离现在的 时间间隔,如果大于 15,超时
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
// 标记实例为 不健康
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
// 发布服务 状态变更的通知
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// // 判断 实例的最后一次心跳时间 距离现在的 时间间隔,大于30 s,则代表需要被删除
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 从服务列表中剔除
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
接着 进到 HealthCheckTask 的run 方法
跟进 process 方法
@Override
public void process(HealthCheckTask task) {
//得到 集群中的所有实例的IP 地址
List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {
return;
}
for (Instance ip : ips) {
if (ip.isMarked()) {
if (SRV_LOG.isDebugEnabled()) {
SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished, service: " + task.getCluster().getService()
.getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":" + ip.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}
Beat beat = new Beat(ip, task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
OK,可以回到 我们的问题了
Nacos与Eureka的区别有哪些?
问题说明:考察对Nacos、Eureka的底层实现的掌握情况
难易程度:难
参考话术:
Nacos与Eureka有相同点,也有不同之处,可以从以下几点来描述:
- 接口方式:Nacos与Eureka都对外暴露了Rest风格的API接口,用来实现服务注册、发现等功能
- 实例类型:Nacos的实例有永久和临时实例之分;而Eureka只支持临时实例
- 健康检测:Nacos对临时实例采用心跳模式检测,对永久实例采用主动请求来检测;Eureka只支持心跳模式
- 服务发现:Nacos支持定时拉取和订阅推送两种模式;Eureka只支持定时拉取模式