微服务框架 SpringCloud微服务架构 微服务面试篇 54 微服务篇 54.4 Nacos如何避免并发读写冲突问题?

news2025/2/5 11:55:11

微服务框架

【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】

微服务面试篇

文章目录

      • 微服务框架
      • 微服务面试篇
      • 54 微服务篇
        • 54.4 Nacos如何避免并发读写冲突问题?
          • 54.4.1 Nacos 服务端源码

54 微服务篇

54.4 Nacos如何避免并发读写冲突问题?

54.4.1 Nacos 服务端源码

【首先想一个 问题】为什么 会有并发读写冲突???

如果说 现在我们 已经拿到了本地的注册表

在这里插入图片描述

Service 里面套Cluster,Cluster 里套 Instance

现在假如有个实例 来了,要一层一层的进行注册,还有一些可能这些时候挂掉了,要从 这里面剔除,还有一些可能要进行更新

这样一个【并行】 的状态, 在删除或者 更新的时候,有人来读,就有可能读到 尚未修复完成的脏数据【读写之间不做 互斥】

但是一旦读写 做了互斥,就会导致 性能的丢失,比如当服务注册 的时候就不能拉取服务列表了,这就很 low 了

【所以 有没有办法可以解决这种 读写 冲突的问题?】 【当然】【这只是第一层】

第二层就是多个服务并发 进行写操作时,也有可能 会产生 写的冲突【这也相当于是一个 并发问题】

其实并发写的冲突问题,解决方案就在下面:

在这里插入图片描述

在ServiceManager 添加实例的时候,它会基于service 添加一个同步锁,一旦加了同步锁,对于单个 服务内的多个实例,它就只能串行 执行 了,这样就可以 避免并发的写冲突问题了【不同的服务相互之间 就不会有影响了 】

这样对服务加锁 的形式就保证了 同一个 服务的多个实例只能串行执行

但是说到底还是一种 对Service 的加锁,即map 的一部分, → 锁的是 局部资源。从而 让不同服务之间可以 并行写,这样性能影响不大,又保证了 安全

【并发写 的问题就可以这样去解决】

在这里插入图片描述

其实在onPut 方法中

在这里插入图片描述

其实是有一个放入 队列的操作,然后有一个 线程池去持续执行这个任务

在这里插入图片描述

OK,这是并发写

【读和写之间 如何 避免冲突问题?】

看到这个 notifier 的底层方法

在这里插入图片描述

这个run 方法

handle(pair);

执行修改动作

跟进去

private void handle(Pair<String, DataOperation> pair) {
    try {
        
        // 获取serviceId
        String datumKey = pair.getValue0();
        // 事件类型,我们是CHANGE 类型
        DataOperation action = pair.getValue1();
        
        services.remove(datumKey);
        
        int count = 0;
        
        if (!listeners.containsKey(datumKey)) {
            return;
        }
        
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == DataOperation.CHANGE) {
                    
                    // 这里的listener 就是service,当服务变更时,自然就触发了onChange 事件,处理变更
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

其实就在下面。

在这里插入图片描述

再跟进 onChange 方法

@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    // 对权重 做初始化
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    
    // 更新实例列表
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

现在回到ServiceManager 类

在这里插入图片描述

注意这个 方法

跟进去

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

再进

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {
    
   	// 从DataStore 中获取实例列表【可以理解为 Nacos 集群同步 来的实例列表】
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    
    // 从本地注册表【有可能是还没有更新 的】中 获取实例列表
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();
    
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    
    // 合并 并 拷贝 旧实例列表
    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        
        // 如果集群同步列表 中有数据,则将本地注册表 与datum 中的列表做对比
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        instanceMap = new HashMap<>(ips.length);
    }
    
    // 遍历 新实例列表
    for (Instance instance : ips) {
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }
        
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            
            // 尝试获取与 当前实例ip、端口一致的旧 实例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                
                // 如果存在,则用旧的instanceId 赋值给新的instanceId
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                
                //如果不存在,证明是一个全新实例,则重新生成id
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }
    
    // 返回实例列表
    return new ArrayList<>(instanceMap.values());
}

在这里插入图片描述

OK,跟进 updateIPs 方法

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    
    // 创建新的map 【其实是 一个新的clusterMap】
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    // 把所有 实例都放入 新的clusterMap
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
    
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        // 遍历新的 新的clusterMap,得到cluster 中的实例列表
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // 把新实例列表,更新到 注册表的Cluster 中
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();
    
    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }
    
    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());
    
}

在这里插入图片描述

继续跟进 updateIps 这个方法

public void updateIps(List<Instance> ips, boolean ephemeral) {
    
    // 先得到旧的 实例列表
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    
    // ips中包含两部分: ① 新增的实例 ② 要更新的实例
    // 求交集,得到要更新的部分实例
    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());
            
            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                // 将实例的 health 保持为oldInstance 的health
                ip.setHealthy(oldIP.isHealthy());
            }
            
            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                        (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }
            
            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                        ip.toString());
            }
        }
    }
    
    
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                        getName(), newIPs.size(), newIPs.toString());
        
        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }
    
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
    
    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                        getName(), deadIPs.size(), deadIPs.toString());
        
        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }
    
    toUpdateInstances = new HashSet<>(ips);
    
    // 用新的实例列表 直接覆盖了 Cluster中 的旧的实例列表
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

在这里插入图片描述

OK,可以回到我们的问题 了

Nacos如何避免并发读写冲突问题?

问题说明:考察对Nacos源码的掌握情况

难易程度:难

参考话术

Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将旧的实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

这样在更新的过程中,就不会对读实例列表的请求产生影响,也不会出现脏读问题了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/103193.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode 324场周赛

第三题 2508. 添加边使所有节点度数都为偶数 连接一条边&#xff0c;一定会让两个点的奇偶性改变。最多连接两条边&#xff0c;最多有四个点的奇偶性改变。所以超过了四个点为奇数点&#xff0c;就不可能了。 并且&#xff0c;由于一次改变两个&#xff0c;奇数点的个数一定是…

[附源码]Nodejs计算机毕业设计酒店管理系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分…

Mybatis之类型转换器TypeHandler的初步了解及具体使用方法

一、TypeHandler简介 1、什么是TypeHandler&#xff1f; 简介&#xff1a;TypeHandler&#xff08;类型转换器&#xff09;在mybatis中用于实现 java类型 和 JDBC类型 的相互转换。mybatis使用 prepareStatement 来进行参数设置的时候&#xff0c;需要通过 TypeHandler 将传入的…

ABAP MESSAGE消息类的创建以及调用方法。

消息类的类型一共分为六种&#xff0c;分别如下表所示 TYPE描述使用效果是否终止事务A(Abortion)终止在PUPUP画面显示是I(Information)信息在PUPUP画面显示否E(Error)错误在状态栏显示是W(Warning)警告在状态栏显示否S(Success)成功在次画面显示否X(Exit)退出在强制终止的画面…

大数据培训Impala之优化

大数据培训Impala之优化 尽量将StateStore和Catalog单独部署到同一个节点&#xff0c;保证他们正常通行。 通过对Impala Daemon内存限制&#xff08;默认256M&#xff09;及StateStore工作线程数&#xff0c;来提高Impala的执行效率。 SQL优化&#xff0c;使用之前调用执行计…

SEO优化的策略_网站seo策略

现在SEO优化成为了每一个企业进行网络营销策划的重要命题,如何做好SEO优化更是企业经常在思考的问题,SEO优化做得好,不仅可以减少企业推广的费用,更是有效的提高了推广的效果,一个好的SEO优化策略应该如何完成呢?应该从以下5点做好网站seo策略。 SEO优化策略1.确定目的 …

MySQL 数据库 - 索引与事务

文章目录1.索引1.1 优缺点1.2 使用2.事务2.1 事务的使用2.2 四大特性2.2.1 原子性2.2.2 隔离性1.索引 索引是一种特殊的文件&#xff0c;包含着对数据表里所有记录的引用指针。 可以对表中的一列或多列创建索引&#xff0c;并指定索引的类型&#xff0c;各类索引有各自的数据结…

[附源码]计算机毕业设计Node.jsX工厂电影网站(程序+LW)

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

企业成立不到三年,能申报高企吗?

高企认定可谓是好处多多,不仅能享受税收优惠、专项补贴、贷款融资,还能提升企业核心竞争力,但在申报国家高新技术企业认定时会审核企业前三年的净资产增长率和销售收入增长率,这对于成立不满三年的企业而言,企业成长性这一项指标的得分就不可能拿到20分满分,甚至可能是0分。那么…

技术分享 | Jenkins 节点该如何管理?

Jenkins 拥有分布式构建(在 Jenkins 的配置中叫做节点)&#xff0c;分布式构建能够让同一套代码在不同的环境(如&#xff1a;Windows 和 Linux 系统)中编译、测试等 Jenkins 的任务可以分布在不同的节点上运行节点上需要配置 Java 运行时环境&#xff0c;JDK 版本大于 1.5节点…

[附源码]计算机毕业设计Python房屋租赁信息系统(程序+源码+LW文档)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

热门技术中的应用:容器技术中的网络-第29讲-容器网络:来去自由的日子,不买公寓去合租

如果说虚拟机是买公寓,容器则相当于合租,有一定的隔离,但是隔离性没有那么好。云计算解决了基础资源层的弹性伸缩,却没有解决PaaS层应用随基础资源层弹性伸缩而带来的批量、快速部署问题。于是,容器应运而生。 容器就是Container,而Container的另一个意思是集装箱。其实…

持续测试的3个关键点才能成功实施

摘要&#xff1a;除非你在过去的一两年里一直处于冬眠状态&#xff0c;否则我相信你已经充分意识到持续测试的好处&#xff1a;降低开发成本、减少浪费、提高系统可靠性、降低发布风险等等. 当然&#xff0c;您认为在现实世界中将其付诸实践并不像某些供应商那样简单。很明显&a…

Twitter整合营销的方法技巧

对于独立站卖家来说&#xff0c;在Twitter它不仅能获取有价值的客户信息&#xff0c;收集粉丝反馈&#xff0c;还能影响消费者的决策。73%的数据显示&#xff0c;Twitter用户每个月都会在网上购物&#xff0c;甚至比较Facebook还要高。所以今天就和卖家们分享Twitter营销到底该…

6 | 渗透测试工具Kali Linux安装与使用

1 Kali Linux简介 渗透测试是通过模拟恶意黑客的攻击方法&#xff0c;来评估计算机网络系统安全的一种评估方法&#xff0c;这个过程包括对系统的任何弱点、技术缺陷或漏洞的主动分析。渗透测试所需的工具可以在各种Linux操作系统中找到&#xff0c;需要手动安装这些工具。由于…

[附源码]Nodejs计算机毕业设计竞赛报名管理系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分…

艾美捷葡萄糖比色测定试剂盒参数说明和相关研究

艾美捷葡萄糖比色测定试剂盒仅供研究使用&#xff0c;该试剂盒是一种比色测定试剂盒,设计用于定量和检测血清、血浆、尿液、缓冲液和组织培养基中的葡萄糖而设计。 艾美捷葡萄糖比色测定试剂盒基本信息&#xff1a; 英文名字&#xff1a;Glucose Colorimetric Assay Kit 编号…

语音识别芯片LD3320介绍续

语音识别芯片LD3320寄存器介绍 语音识别芯片LD3320寄存器大部分都是有读和写的功能&#xff0c;有的是接受数据的&#xff0c;有的是设置开关和状态的。寄存器的地址空间为8位&#xff0c;可能的值为00H到FFH。但除了本文档介绍的寄存器&#xff0c;其他大部分为测试或保留功能…

[封装自己的ui组件库] upload的实现与难点

效果 1、服务文件(tmp为保存上传文件文件夹) 2、点击上传 3、图片列表 4、拖拽 5、手动上传 5、上传失败 6、服务 问题 1、如何打开文件列表 2、如何取出文件 3、对取出的文件校验&#xff1f; 4、如何发送请求(多文件上传&#xff1f;) 5、如何完成上传列表展示 6、拖拽…

基于遗传算法改进的DELM预测 - 附代码

遗传算法改进的深度极限学习机DELM的回归预测 文章目录遗传算法改进的深度极限学习机DELM的回归预测1.ELM原理2.深度极限学习机&#xff08;DELM&#xff09;原理3.遗传算法4.遗传算法改进DELM5.实验结果6.参考文献7.Matlab代码1.ELM原理 ELM基础原理请参考&#xff1a;https:…