微服务框架
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】
微服务面试篇
文章目录
- 微服务框架
- 微服务面试篇
- 54 微服务篇
- 54.4 Nacos如何避免并发读写冲突问题?
- 54.4.1 Nacos 服务端源码
54 微服务篇
54.4 Nacos如何避免并发读写冲突问题?
54.4.1 Nacos 服务端源码
【首先想一个 问题】为什么 会有并发读写冲突???
、
如果说 现在我们 已经拿到了本地的注册表
Service 里面套Cluster,Cluster 里套 Instance
现在假如有个实例 来了,要一层一层的进行注册,还有一些可能这些时候挂掉了,要从 这里面剔除,还有一些可能要进行更新
这样一个【并行】 的状态, 在删除或者 更新的时候,有人来读,就有可能读到 尚未修复完成的脏数据【读写之间不做 互斥】
但是一旦读写 做了互斥,就会导致 性能的丢失,比如当服务注册 的时候就不能拉取服务列表了,这就很 low 了
【所以 有没有办法可以解决这种 读写 冲突的问题?】 【当然】【这只是第一层】
第二层就是多个服务并发 进行写操作时,也有可能 会产生 写的冲突【这也相当于是一个 并发问题】
其实并发写的冲突问题,解决方案就在下面:
在ServiceManager 添加实例的时候,它会基于service 添加一个同步锁,一旦加了同步锁,对于单个 服务内的多个实例,它就只能串行 执行 了,这样就可以 避免并发的写冲突问题了【不同的服务相互之间 就不会有影响了 】
这样对服务加锁 的形式就保证了 同一个 服务的多个实例只能串行执行
但是说到底还是一种 对Service 的加锁,即map 的一部分, → 锁的是 局部资源。从而 让不同服务之间可以 并行写,这样性能影响不大,又保证了 安全
【并发写 的问题就可以这样去解决】
其实在onPut 方法中
其实是有一个放入 队列的操作,然后有一个 线程池去持续执行这个任务
OK,这是并发写
【读和写之间 如何 避免冲突问题?】
看到这个 notifier 的底层方法
这个run 方法
handle(pair);
执行修改动作
跟进去
private void handle(Pair<String, DataOperation> pair) {
try {
// 获取serviceId
String datumKey = pair.getValue0();
// 事件类型,我们是CHANGE 类型
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
// 这里的listener 就是service,当服务变更时,自然就触发了onChange 事件,处理变更
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
其实就在下面。
再跟进 onChange 方法
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
// 对权重 做初始化
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 更新实例列表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
现在回到ServiceManager 类
注意这个 方法
跟进去
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
再进
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
// 从DataStore 中获取实例列表【可以理解为 Nacos 集群同步 来的实例列表】
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 从本地注册表【有可能是还没有更新 的】中 获取实例列表
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
// 合并 并 拷贝 旧实例列表
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
// 如果集群同步列表 中有数据,则将本地注册表 与datum 中的列表做对比
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
// 遍历 新实例列表
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
// 尝试获取与 当前实例ip、端口一致的旧 实例
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
// 如果存在,则用旧的instanceId 赋值给新的instanceId
instance.setInstanceId(oldInstance.getInstanceId());
} else {
//如果不存在,证明是一个全新实例,则重新生成id
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
// 返回实例列表
return new ArrayList<>(instanceMap.values());
}
OK,跟进 updateIPs 方法
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 创建新的map 【其实是 一个新的clusterMap】
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 把所有 实例都放入 新的clusterMap
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
// 遍历新的 新的clusterMap,得到cluster 中的实例列表
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 把新实例列表,更新到 注册表的Cluster 中
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
继续跟进 updateIps 这个方法
public void updateIps(List<Instance> ips, boolean ephemeral) {
// 先得到旧的 实例列表
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
// ips中包含两部分: ① 新增的实例 ② 要更新的实例
// 求交集,得到要更新的部分实例
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
// 将实例的 health 保持为oldInstance 的health
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
// 用新的实例列表 直接覆盖了 Cluster中 的旧的实例列表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
OK,可以回到我们的问题 了
Nacos如何避免并发读写冲突问题?
问题说明:考察对Nacos源码的掌握情况
难易程度:难
参考话术:
Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将旧的实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。
这样在更新的过程中,就不会对读实例列表的请求产生影响,也不会出现脏读问题了。