[Nacos] Nacos Server与Nacos Client间的UDP通信 (十)

news2024/11/25 0:44:10

文章目录

      • 1.Nacos Server与Nacos Client间的UDP通信
        • 1.1 Nacos Server向Nacos Client进行UDP推送
        • 1.2 Nacos Client接收Nacos Server的UDP推送

1.Nacos Server与Nacos Client间的UDP通信

  • Nacos Server向Nacos Client进行UDP推送
  • Nacos Client接收Nacos Server的UDP推送

1.1 Nacos Server向Nacos Client进行UDP推送

在这里插入图片描述

之前的文章中Nacos Server处理心跳请求的时候, 最后有一个向其他服务发布服务变更事件。

InstanceController#beat() -> Service#processClientBeat()

在这里插入图片描述

在这里插入图片描述

创建一个处理器,其是一个任务 (ClientBeatProcessor), 开启一个立即执行的任务, 执行clientBeatProcessor任务的run()

在这里插入图片描述

在这里插入图片描述

PushService#onApplicationEvent(): 发布服务变更事件给Server

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        // 启动一个定时操作,异步执行相关内容
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                // 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的
                // UDP客户端PushClient
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                // 更新最后引用时间
                long lastRefTime = System.nanoTime();
                // 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送
                for (PushClient client : clients.values()) {
                    // 若当前PushClient为僵尸客户端
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        // 将该PushClient干掉
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }

                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));

                    // UDP通信
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }

启动一个定时操作,异步执行相关内容:

  1. 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的UDP客户端PushClient
  2. 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送 udpPush(ackEntry);

PushService#udpPush(): udp通信, Nacos Server向Nacos Client进行UDP推送

    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }

        // 若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;  // 失败计数器加一
            return ackEntry;
        }

        try {
            // 计数器加一
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            // 发送UDP
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();

            // 开启定时任务,进行UPD通信失败后的重新推送
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                    ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉, ackMap和udpSendTimeMap。
最后通过DatagramSocket.send()方法发送数据并开启定时任务,进行UPD通信失败后的重新推送。

在这里插入图片描述

1.2 Nacos Client接收Nacos Server的UDP推送

在这里插入图片描述
在这里插入图片描述

在之前的分析中, 会在项目启动后注入AbstractAutoServiceRegistration

AbstractAutoServiceRegistration#bind()

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

PushReceiver#run()

    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

在这里插入图片描述

最后也是通过DatagramSocket.send()方法发送的数据, 表示Nacos Client接收Nacos Server的UDP推送成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/584987.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

黑客常用工具合集

首先恭喜你发现了宝藏。 本文章集成了全网优秀的开源攻防武器项目&#xff0c;包含&#xff1a; 信息收集工具&#xff08;自动化利用工具、资产发现工具、目录扫描工具、子域名收集工具、指纹识别工具、端口扫描工具、各种插件....etc...&#xff09;漏洞利用工具&#xff0…

枚举_源码_分析

枚举源码分析 前言 这是所有Java语言枚举类型的公共基类。关于枚举的更多信息&#xff0c;包括编译器合成的隐式声明方法的描述&#xff0c;可以在Java的第8.9节中找到™ 语言规范。 请注意&#xff0c;当使用枚举类型作为集合的类型或映射中键的类型时&#xff0c;可以使用专…

[NOIP2004 普及组] FBI 树 队列解法

[NOIP2004 普及组] FBI 树 题目描述: 我们可以把由 0 和 1 组成的字符串分为三类&#xff1a;全 0 串称为 B 串&#xff0c;全 1 串称为 I 串&#xff0c;既含 0 又含 1 的串则称为 F 串。 FBI 树是一种二叉树&#xff0c;它的结点类型也包括 F 结点&#xff0c;B 结点和 I …

RocketMQ实现一个简单的秒杀接口

预设场景&#xff1a; “秒杀”这一词多半出现在购物方面&#xff0c;但是又不单单只是购物&#xff0c;比如12306购票和学校抢课&#xff08;大学生的痛苦&#xff09;也可以看成一个秒杀。秒杀应该是一个“三高”&#xff0c;这个三高不是指高血脂&#xff0c;高血压和高血糖…

数据中间件 - MyCat2 配置文件说明

数据中间件 - MyCat2 配置文件说明 本章内容基于 MyCat2 版本. 会对 Mycat 中的配置文件作用,以及结合 Mycat 的一些概念进行介绍,比起一上来就盲目的开始操作,然后遇到各种问题,先从全局进行了解对提高效率是有帮助的. MyCat 的配置文件都存放在 conf 路径下. server.jso…

学习Node.js的9大理由以及日常开发中的14个高级特性和代码示例分享

目录 为什么要学Nodejs 1. 高级事件处理&#xff1a;事件驱动机制 2. 非阻塞I/O 3. 异步编程 4. 模块系统 5. 流式数据处理 6. 跨平台支持 7. 高性能网络编程 8. 调试工具 9. 第三方模块 10. 升级 V8 引擎至 10.7 11. 函数式编程 12. 高级路由 13. 试验 Node wat…

国内免费可用的ChatGPT网页版

ChatGPT 一、ChatGPT是个啥&#xff1f;二、16个国内免费的ChatGPT网站。三、ChatGPT使用方式 一、ChatGPT是个啥&#xff1f; chat&#xff1a;表示“聊天”。 GPT&#xff1a;则是Generative、Pre-trained、Transformer的缩写&#xff0c;表示“预训练语言模型”&#xff0…

【Unity100个实用小技巧】同一个Canvas下的UI顺序通过代码如何修改

☀️博客主页&#xff1a;CSDN博客主页&#x1f4a8;本文由 萌萌的小木屋 原创&#xff0c;首发于 CSDN&#x1f4a2;&#x1f525;学习专栏推荐&#xff1a;面试汇总❗️游戏框架专栏推荐&#xff1a;游戏实用框架专栏⛅️点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd;&#…

三维场景重建经典论文详解

来源&#xff1a;投稿 作者&#xff1a;小灰灰 编辑&#xff1a;学姐 论文标题&#xff1a; 《REAL-TIME INDOOR SCENE RECONSTRUCTION WITH RGBD AND INERTIA INPUT》 论文链接: https://arxiv.org/pdf/2008.00490.pdf https://github.com/CWanli/RecoNet 数据集&#xff1a;P…

停车系统多位多车算法

1、算法代码 下面代码是伪Java代码&#xff0c;看得懂就行。 //查询当前车牌号对应的车主的其他的车牌号的入场纪录&#xff0c;根据时间倒叙排列。 List<Record> comeRecords mapper.selectFromDB; //车主所有的在场车辆数量-车主拥有车位数量 等于需要计费的车辆数量…

虚拟机NAT模式下修改linux静态ip

首先修改VMware的虚拟网络编辑器&#xff0c;NAT设置-> 设置网关&#xff0c;一般ip的第四部分网关为2&#xff0c;第三部分自己设置。 然后设置自己windows电脑 右键属性里面的ipv4&#xff0c;设置为静态的&#xff0c;dns除了对应网关还加一个通用dns 8.8.8.8。 这些外…

基于Q-Table的强化学习笔记

基于Q-Table的强化学习笔记 1 几个概念1.1 状态空间 S S S和动作空间 A A A1.2 奖励 R R R1.3 价值函数与Q-Table1.4 马尔可夫性 2 基于Q-Table的强化学习算法2.1 SARSA算法2.2 Q-learning算法 1 几个概念 最近也从小白入手看了些强化学习(Reinforcement Learning&#xff0c;…

中国人民大学与加拿大女王大学金融硕士——每天都要优于过去的自己,加油!

职场中拉开人与人之间差距的&#xff0c;往往是日复一日微小的积累。满足已取得的成就会让人停滞不前&#xff0c;一旦停止学习&#xff0c;人就会止步不前。懂得持续学习、终生成长的人&#xff0c;能保持积极进取的状态。金融行业的你有计划来人民大学与加拿大女王大学金融硕…

累积运行时间功能块(SCL语言)

设备累积运行时间功能块梯形图源代码请参看下面的文章博客: SMART PLC设备累计运行时间功能块_RXXW_Dor的博客-CSDN博客功能块非常简单,没有什么特别需要说明的,方法不唯一仅供参考。https://blog.csdn.net/m0_46143730/article/details/129170452方法始终不唯一,受限于当…

【c】vscode c/c++环境配置

文章目录 1 mingw下载及配置1.1 mingw下载1.2 环境变量配置1.3 gdb安装 2 vscode c插件安装3 vscode文件配置 1 mingw下载及配置 1.1 mingw下载 https://sourceforge.net/projects/mingw-w64/files/ 1.2 环境变量配置 1.3 gdb安装 我下载的mingw未安装gdb调试 cmd执行: …

vue3 项目实践总结

一、挂载全局变量 1.1 main.js 中挂载 // 引入全局变量 import api from /api;const app createApp(App);// 挂载全局变量 app.config.globalProperties.$API api;app.use(store).use(router).use(ElementPlus).use(Vant).mount(#app);1.2 组件中获取 方法一 推荐使用proxy…

研发工程师玩转Kubernetes——非定时任务

在《研发工程师玩转Kubernetes——自动扩缩容》一文中&#xff0c;我们使用在本地使用wrk进行了压力测试。如果我们希望在容器中运行&#xff0c;该怎么做呢&#xff1f; 构建/推送wrk镜像 Dockerfile如下。主要就是在Ubuntu22中安装wrk。 From ubuntu:22.04 RUN apt-get up…

【Nodejs】Node-js笔记

Node.js 文章目录 Node.js一、Node.js概述1.1、介绍1.2、官网1.3、Node.js应用场景1.4、安装Node.js1.5、npm包管理器1.5.1、介绍1.5.2、切换npm源1.5.3、生成JSON配置文件1.5.4、查看当前安装的树形模块1.5.5、安装模块1.5.6、自定义脚本命令1.5.7 、自动重启应用 1.6、模块化…

VehicleHal.java - fwk层对应VehicleService

VehicleHal.java - fwk层对应VehicleService 1、VehicleHal.java初始化1.1 hal服务区分1.2 简要时序图 2、PowerHalService为例2.1 PowerHalService初始化和订阅2.2 简要时序图 android12-release 【IVI】VehicleService启动 【IVI】车载设备硬件抽象层VHAL 【IVI】CarService启…

【人脸识别】insightface 使用记录和搭建服务注意点和坑 从0到1

文章目录 前言1.开始1.1 前置1.2 再次运行&#xff0c;人脸检测跑通1.3 人脸特征抽取1.3.1 模型下载1.3.2 重新跑一下检测和识别1.3.3 人脸监测返回值分析1.3.4 计算相似度 1.4 全流程的相似度 2. 业务化人脸识别 前言 人脸识别项目&#xff0c;再走一遍。之前是公司老人留下的…