Nacos 配置中心之长轮询--服务端

news2025/4/9 23:16:08

先回顾一下客户端和服务端交互的过程
在这里插入图片描述

服务端

入口

直接看长轮询的接口
ConfigController.listener

    @PostMapping("/listener")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        log.info("listen config id:" + probeModify);

        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

        Map<String, String> clientMd5Map;
        try {
            // 处理数据,将拼接的字符串 转成 key-content
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        log.info("listen config id 2:" + probeModify);

        // do long-polling
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

ConfigServletInner.doPollingConfig()

    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                                  Map<String, String> clientMd5Map, int probeRequestSize)
        throws IOException {

        // 长轮询
        // 长轮询判断,根据请求头的 Long-Pulling-Timeout
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }

        // else 兼容短轮询逻辑
        // 如果是短轮询 走下面的请求, 下面的请求就是把客户端传过来的数据和服务端的数据逐项进行比较,保存到changeGroup中。
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

        // 兼容短轮询result
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);

        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
        if (version == null) {
            version = "2.0.0";
        }
        int versionNum = Protocol.getVersionNumber(version);

        /**
         * 2.0.4版本以前, 返回值放入header中
         */
        if (versionNum < START_LONGPOLLING_VERSION_NUM) {
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
        } else {
            request.setAttribute("content", newResult);
        }

        Loggers.AUTH.info("new content:" + newResult);

        // 禁用缓存
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
    }

上面的代码主要是分两种情况:
1、长轮询请求
2、短轮询请求:直接进行比较,保存到changeGroups中

短轮询比较简单,下面来分析下长轮询都做了哪些事情:

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {

		// 获取客户端的超时时间
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");

        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        // 判断,如果为true,定时任务将会在30s后开始执行,否则在 29.5s开始执行
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            // 和服务端的数据进行 MD5 比较,如果发现变化则直接返回
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        // 把当前请求转化为一个异步请求,意味着此时tomcat线程被释放,
        // 也就是客户端的请求,需要通过 asyncContext 来手动触发返回, 否则一直挂起
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);

        // 执行长轮询请求
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

1、获取客户端请求的超时时间,减去500ms后赋值给timeout变量.
2、判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后执行
3、如果不是isFixedPolling,则会先和服务端的数据进行MD5对比,如果发生变化直接返回
4、如果没有变化,就会把请求转化为异步请求挂起,然后延迟执行ClientLongPolling线程

长轮询的延迟任务
因为ClientLongPolling是一个runnable线程,可以看下他的run方法

        public void run() {
            // 延迟执行任务
            asyncTimeoutFuture = scheduler.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                        /**
                         * 删除订阅关系
                         */
                        allSubs.remove(ClientLongPolling.this);

                        if (isFixedPolling()) {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);
                            // 通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端
                            List<String> changedGroups = MD5Util.compareMd5(
                                (HttpServletRequest)asyncContext.getRequest(),
                                (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                            if (changedGroups.size() > 0) {
                                // 返回给客户端
                                sendResponse(changedGroups);
                            } else {
                                sendResponse(null);
                            }
                        } else {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);
                            sendResponse(null);
                        }
                    } catch (Throwable t) {
                        LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
                    }

                }

            }, timeoutTime, TimeUnit.MILLISECONDS);
            // 添加到订阅队列中
            allSubs.add(this);
        }

ClientLongPolling.run方法立有一个延迟任务,执行的时候,通过比较MD5判断客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端.

数据变更事件
上面讲到有一个30s的周期,如果这段时间内配置变了,那不就不能得到及时更新了吗?
当然不是了,注意上面的一个变量:allSubs

从上面的逻辑不难看出,在延迟任务执行前的这30s内,ClientLongPolling是一直存在于队列中,因为执行完后就被添加到了队列中,执行时才会移除,所有我们需要看看这个是干什么的

LongPollingService继承了AbstractEventListener
在这里插入图片描述

可以看下LongPollingService的构造器

    public LongPollingService() {
    	// 初始化 allSubs
        allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("com.alibaba.nacos.LongPolling");
                return t;
            }
        });
        scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    }

就会调用AbstractEventListener的构造器

        public AbstractEventListener() {
            /**
             * automatic register
             */
             // 添加了监听器
            EventDispatcher.addEventListener(this);
        }

监听的事件

    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
            	// 监听 LocalDataChangeEvent 事件
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                // 创建 DataChangeTask
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }

LongPollingService生成的时候,订阅了一个LocalDataChangeEvent事件,触发这个事件的时候会执行一个DataChangeTask异步任务
下面看下DataChangeTask里的run方法

        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                // 遍历队列中的所有的ClientLongPolling
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    // 判断当前的 ClientLongPolling中,请求的key是否包含当前修改的groupKey
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // 如果beta发布且不在beta列表直接跳过
                        if (isBeta && !betaIps.contains(clientSub.ip)) {
                            continue;
                        }

                        // 如果tag发布且不在tag列表直接跳过
                        if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                            continue;
                        }

                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); // 删除订阅关系
                        LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                            (System.currentTimeMillis() - changeTime),
                            "in-advance",
                            RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                            "polling",
                            clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                        // 响应客户端消息
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
            }
        }

1.会遍历队列中所有的ClientLongPolling对象
2.判断请求过来的需要对比的key里面是否包含当前变更的配置key
3.包含则移除出队列,并直接返回响应客户端信息

总结

  1. 配置分三种形态,本地配置文件,本地缓存文件,本地缓存数据
  2. 客户端通过主动拉取和长轮询的方式来获取配置以及更新配置
  3. 主动拉取的顺序是本地配置文件 —> 服务端 —> 本地缓存文件
  4. 客户端长轮询中对比配置不同的方式是对比本地文件与本地缓存数据的MD5
  5. 长轮询是在客户端与服务端对比配置不同中发起的,存在不同配置服务端则立刻返回,没有则服务端会保持长连接延迟执行任务(30s左右),这中间服务端一旦有配置变更(LocalDataChangeEvent事件)则会提前响应返回
  6. 长轮询在获取到不同的配置后还会遍历这些配置主动拉取一次获取具体配置内容并写入本地缓存文件中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/88453.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

抓住三个关键因素,提高你的ASA广告效果!

​ 众所周知&#xff0c;App Store 作为 iOS 端的流量收口&#xff0c;旗下的 ASA 广告更是广告主在 iOS 生态投放广告的唯一渠道&#xff0c;所提供的四大广告位&#xff08;Today 标签、搜索标签、搜索结果和产品页面&#xff09;覆盖了用户访问的全路径&#xff0c;为广告主…

12月14日:跟着猫叔写代码api中的增删改查

首先在数据库中建立一个学生成绩信息表 DROP TABLE IF EXISTS bro_ceshiapi; CREATE TABLE bro_ceshiapi (id int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT id,name varchar(100) DEFAULT NULL COMMENT 姓名,class varchar(100) DEFAULT NULL COMMENT 班级,score decima…

[附源码]Python计算机毕业设计Django基于vuejs的文创产品销售平台app

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

学习Vue3 - 认识 Reactive 全家桶

reactive 用来绑定复杂的数据类型&#xff0c;例如&#xff1a;对象、数组 reactive 源码约束了我们的类型 他是不可以绑定普通的数据类型的&#xff0c;这样是不允许的&#xff0c;会报错 因此&#xff0c;如果绑定普通的数据类型&#xff0c;可以使用ref ref绑定对象或者…

计算机SCI论文,如何写吸引人的摘要? - 易智编译EaseEditing

摘要简明扼要的概括全文的主要内容&#xff0c;是整篇文章的精华&#xff0c;是编辑、审稿专家以及读者阅读文章的最先关注的部分。 一个好的摘要可以正确反映文章内容&#xff0c;引起编辑、审稿专家以及读者的关注。那如何写出一个好的论文摘要呢&#xff0c;今天小易为大家…

一种基于摩斯密码的页面加密方法(web和小程序)

1. 开始 web开发中&#xff0c;常有一些功能仅希望对开发、测试人员等一小部分人展示&#xff0c;比如测试一个小程序项目中&#xff0c;想让测试人员快速复制当前对应的h5页面&#xff0c;这时候如果页面是必须登录的&#xff0c;我们可以借助vconsole&#xff0c;然后维护一…

redis之主从切换可能有哪些问题

写在前面 本文一起看下Redis cluster 集群模式下&#xff0c;发生了主从切换时可能存在的问题以及应对方案。 1&#xff1a;主从数据不一致 主从数据不一致&#xff0c;是由于主从同步延迟造成的&#xff0c;可能的解决方案如下&#xff1a; 1&#xff1a;尽量将主从同机房…

React面试:谈谈虚拟DOM,Diff算法与Key机制

1.虚拟dom 原生的JS DOM操作非常消耗性能&#xff0c;而React把真实原生JS DOM转换成了JavaScript对象。这就是虚拟Dom&#xff08;Virtual Dom&#xff09; 每次数据更新后&#xff0c;重新计算虚拟Dom&#xff0c;并和上一次生成的虚拟dom进行对比&#xff0c;对发生变化的…

Ansys Zemax | 用于数字投影光学中均匀照明的蝇眼阵列

简介 在数字投影仪设计中&#xff0c;我们希望确保数字光源与投影图像在辐照度分布相匹配。因此&#xff0c;这一约束要求投影仪设计包含均匀照明的空间光调制器——通常以LCD面板的形式呈现。理论上听起来很容易&#xff0c;但实际上&#xff0c;此面板上的光源光束通常是高斯…

语音输入转文字怎么操作?分享几种语音转文字技巧

相信有不少小伙伴在整理语音文件的时候&#xff0c;都会有过怎样把这些语音直接转换成文字的想法吧。每次在我开完会之后&#xff0c;需要对会议语音进行整理时&#xff0c;都会产生这种想法。因为我们需要不断的去听这个会议的语音内容&#xff0c;这样做既费时又费力。但其实…

MATLAB生成2D和3D格网(GUI程序)

目录 一、写函数DataStructure_Fnc 二、控件属性 三、生成2D格网代码 三、生成3D格网代码 一、写函数DataStructure_Fnc 函数代码&#xff0c;生成三角网需要调用此函数 function DataStructureDataStructure_Fnc(Table) [row col]size(Table); Table(1:end,5:7)-1; for j1…

【配置指导】如何配置dataFEED edgeConnector Siemens以实现西门子PLC与阿里云之间的双向通信

本配置指导手册介绍了如何配置dataFEED edgeConnector Siemens&#xff0c;以通过MQTT来将西门子S7-1200 PLC数据上传到阿里云&#xff1b;以及从阿里云发布数据&#xff0c;并传输到PLC中&#xff0c;从而实现西门子S7-1200 PLC与阿里云之间的双向通信。 主要内容包括&#xf…

30-Vue之ECharts-直角坐标系的常用配置

直角坐标系的常用配置前言直角坐标系常用配置网格坐标轴区域缩放前言 本篇来学习下直角坐标系的常用配置 直角坐标系 直角坐标系的图表指的是带有x轴和y轴的图表, 常见的直角坐标系的图表有: 柱状图 折线图 散点图 常用配置 网格 grid&#xff1a;是用来控制直角坐标系的…

可落地的、不基于框架的分布式事务解决方案

两阶段提交 2PC 在MySQL InnoDB中&#xff0c;为了保证Bin Log和Redo Log的一致性&#xff0c;便采用了两阶段提交&#xff1b;ZooKeeper、ETCD集群为了保证数据一致性&#xff0c;也采用了两阶段提交&#xff0c;RocketMQ的事务消息也采用了两阶段提交&#xff0c;可见两阶段…

从VirtualBox换成KVM虚拟机管理程序?

好消息是&#xff0c;您可以轻松地将VDI格式的VirtualBox VM迁移到qcow2(即KVM的磁盘映像格式)&#xff0c;不用创建新的KVM来宾计算机。 我们在本文中将概述如何将VirtualBox VM迁移到Linux中KVM VM的逐步过程。 第一步&#xff1a;列出现有的VirtualBox映像 首先&#xff0c…

泰斯公式Thiem’s equation地下水

基本形式 泰斯公式1描述了在含水层抽水时的地下水流动。 多井作业时非承压含水层的方程形式如下 H(s)和H0(s)分别表示s点的估计地下水位和初始地下水位&#xff0c;K表示水力导率&#xff0c;ri表示预测位置与贡献井i之间的距离&#xff0c;n是贡献井的集合&#xff0c;Q表…

Win11 21H2 12月最新更新了哪些内容?

微软今天发布了12月最新的累积更新补丁&#xff0c;用户可以升级KB5021234将版本号提升至 build 22000.1335&#xff0c;并解决了远程网络问题以及可能影响数据保护应用程序编程接口 &#xff08;DPAPI&#xff09; 解密的问题。此外&#xff0c;该更新还包括之前在 11 月 15 日…

11-FreeRTOS配置函数 FreeRTOSConfig.h

1-FreeRTOSConfig.h介绍 FreeRTOS中的相关定义多数都在FreeRTOSConfig.h中&#xff0c;整个FreeRTOS的定义调用都可以在这里定义&#xff0c;当然你也可以自己命名一个文件实现自定义。 下面是这个文件的内容&#xff0c;如下&#xff1a; #ifndef FREERTOS_CONFIG_H #define…

Graph Neural Networks for Social Recommendation学习笔记

1 目标 学习user embedding和item embedding。 2 框架 3 用户建模 3.1 利用历史记录对用户建模 3.2 利用社交关系对用户建模

10.9.1-Dataway+Echarts动态图表方案

文章目录1. 技术选型2. 实现方案2.1. 方案介绍2.2. 方案实现&#xff08;demo&#xff09;2.2.1. 使用echarts绘制html静态页2.2.1.1. 选择合适的图表2.2.1.2. 下载html demo2.2.2. 使用Dataway准备数据接口2.2.2.1. 部署dataway2.2.2.2. 创建数据接口2.2.3. 调试html demo da…