Nacos 消息通知系统 源码讲解

news2025/1/10 3:27:06

目录

1. 介绍

2. Nacos 中的生产者

3. Nacos 中的消费者

4. Nacos 中的事件

5. 统一事件通知中心 NotifyCenter

6. 总结


1. 介绍

Nacos 节点内部使用了大量的事件机制进行通信,这样各种操作进行了解耦,提高了性能。

接下来就介绍。

所谓事件机制也就是 生产者消费者模式。

生产者向队列发送生产一条消息,消费者从队列拿出一个消息进行消费。

结构如下:

事件机制也是 Nacos 高性能的原因之一,事件生产后放到队列后请求就直接返回了。客户端不用等待完成,全都异步处理了,所以Nacos 支持高并发访问。

Nacos 中的事件机制非常庞大,接下来以不同的模块的角度来介绍。

2. Nacos 中的生产者

首先,我们需要有生产者来生产一条消息。

生产者结构如下

EventPublisher

生产者顶层接口,定义规范

  1. 初始化

  2. 发布事件

  3. 获取当前事件数量

  4. 添加订阅者(消费者)

  5. 移除订阅者(消费者)

  6. 通知消费者

DefaultPublisher

默认生产者实现,消息队列对应在了其内部,在 init 方法中完成了队列初始化。

public class DefaultPublisher extends Thread implements EventPublisher {
    
    // 省略非关键代码
    
    private BlockingQueue<Event> queue;
    
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        this.queue = new ArrayBlockingQueue<>(bufferSize);
    }
    
}

订阅者的加入、移除

public class DefaultPublisher { 
    
    // 省略非关键代码
    
    protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();
    
    /**
     * 添加订阅者
     */
    @Override
    public void addSubscriber(Subscriber subscriber) {
        subscribers.add(subscriber);
    }
    
    /**
     * 移除订阅者
     */
    @Override
    public void removeSubscriber(Subscriber subscriber) {
        subscribers.remove(subscriber);
    }
    
}

可见,就是对消费者数据结构简单的 add 和 remove。

生产消息:当发布消息(消息就是事件 Event 类)时,就往队列加入一个消息.

@Override
public boolean publish(Event event) {
      
    boolean success = this.queue.offer(event);
​
    return true;
    
    // 省略非关键代码
}  

消费消息

由于 DefaultPublisher 继承了 Thread 并且在 init 方法中执行了start 方法。

所以它也是一个线程,在线程的 run 方法内最终调用了如下方法:

void openEventHandler() {
    
    // 省略非关键代码
    
    for (; ; ) {
        // 从队列中获取消息(事件):如果没有消息该方法则会阻塞,直到有消息了才返回该消息
        final Event event = queue.take();
        // 处理事件
        receiveEvent(event);
    }       
}
​
​
// 处理事件
void receiveEvent(Event event) {
    // 省略非关键代码
 
    // 获取注册到当前 生产者 的全部消费者
    for (Subscriber subscriber : subscribers) {
​
        // 循环通知消费者
        notifySubscriber(subscriber, event);
    }
}

总结:DefaultPublisher 是一个线程,在它的 run 方法中,无限循环的从队列中获取数据并通知注册到自身的全部消费者。

ShardedEventPublisher

DefaultPublisher 看起来已经很完备了,那么这个类与上面的 DefaultPublisher 有何不同呢。

ShardedEventPublisher 继承了 DefaultPublisher ,也就是说,是在 DefaultPublisher 基础上增加了一些功能。

增加了哪些功能呢?就是支持多事件类型功能。

而 DefaultPublisher 只能支持一种事件类型。

在其初始化方法中,对事件类型指定。

public class DefaultPublisher {
​
    // 省略非关键代码
 
    private Class<? extends Event> eventType; //(不是集合,支持一种事件类型)
 
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        this.eventType = type;
    }
 
}

在运行的时候,判断订阅者是否支持该事件类型。

// 该方法由 Thread 的 run 方法调用
void receiveEvent(Event event) {
​
    // 省略非关键代码
   
   
    // 循环全部消费者
    for (Subscriber subscriber : subscribers) {
        if (!subscriber.scopeMatches(event)) {
            // 如果订阅者不是该事件类型的 则不处理此次事件。
            continue;
        }
        
        // 通知订阅者处理事件
        notifySubscriber(subscriber, event);
    }
}

而在 ShardedEventPublisher 中,消费者管理代码如下

public class DefaultSharePublisher extends DefaultPublisher {
​
    // 省略非关键代码
   
    
    /**
     * 以 Map<事件类型,Set<订阅者>> 结构存放订阅者
     */
    private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings 
        = new ConcurrentHashMap<>();
​
    /**
     * 添加消费者
     */
    @Override
    public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
        // 获取事件类型
        Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
       
        // 调用了 DefaultPublisher 的 add
        subscribers.add(subscriber);
        
        // 根据事件类型 获取该事件下的 订阅者 Set
        Set<Subscriber> sets = subMappings.get(subSlowEventType);
        // 加入此次订阅者
        sets.add(subscriber); 
    }
    
    @Override
    public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
        // 获取事件类型
        Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
        // 调用了 DefaultPublisher 的 remove
        subscribers.remove(subscriber);
       
        // 根据事件类型 获取该事件下的 订阅者 Set
        Set<Subscriber> sets = subMappings.get(subSlowEventType);
        sets.remove(subscriber);  
    }
}

ShardedEventPublisher 中另外维护了一个 Map,Map<事件类型,Set<订阅者>>, 支持多种事件类型。

因为在 ShardedEventPublisher 中 以订阅者订阅的事件类型 做了一个分类。

再看 DefaultSharePublisher 的接收事件处理,根据事件类型只通知该事件类型下的订阅者。

// 该方法由 Thread 的 run 方法调用
public void receiveEvent(Event event) {
        
    // 省略非关键代码
    
    // 获取当前事件类型
    Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();
    
    // 获取该事件类型下的全部订阅者
    Set<Subscriber> subscribers = subMappings.get(slowEventType);
 
    // 循环通知订阅者
    for (Subscriber subscriber : subscribers) {  
        notifySubscriber(subscriber, event);
    }
}

3. Nacos 中的消费者

类图如下:

Subscriber

抽象类 ,最顶层接口类。定义规范

public abstract class Subscriber<T extends Event> {
​
    // 接收到事件 回调
    public abstract void onEvent(T event);
    
    // 返回当前订阅者监听的事件类型
    public abstract Class<? extends Event> subscribeType();
    
    // 订阅者线程池,如果返回 null (默认 null ),则代表直接调用 onEvent 回调。
    // 如果返回了线程池对象,则代表通过线程池调用 onEvent 回调。
    public Executor executor() {
        return null;
    }
    
    // 省略非关键代码
}

如何使用呢 使用 NotifyCenter 完成注册

// 示例
​
NotifyCenter.registerSubscriber(new Subscriber<ClientEvent>() {
    @Override
    public void onEvent(ClientEvent event) {
        // 业务处理     
    }
​
    @Override
    public Class<? extends Event> subscribeType() {
        // 返回该订阅者监听的事件类型
        return ClientEvent.class;
    }
});

这是一个支持单事件类型的订阅者,其生产者便是由同样为支持单事件类型的 DefaultPublisher 来生产消息

SmartSubscriber

SmartSubscriber 是一个抽象类,继承自 Subscriber。

public abstract class SmartSubscriber extends Subscriber {
    
    // 新增抽象方法,用于返回多个事件类型
    public abstract List<Class<? extends Event>> subscribeTypes();
    
    // 重写 将父类的获取单个的订阅事件类型 返回 null
    @Override
    public final Class<? extends Event> subscribeType() {
        return null;
    }
​
     // 省略非关键代码
}

使用方法

// 示例
public class NamingMetadataManager extends SmartSubscriber {
    
    // 返回多个订阅的事件类型,表示当前订阅者需要订阅以下多个事件
    @Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        result.add(MetadataEvent.InstanceMetadataEvent.class);
        result.add(MetadataEvent.ServiceMetadataEvent.class);
        result.add(ClientEvent.ClientDisconnectEvent.class);
        return result;
    }
    
    // 事件回调
    @Override
    public void onEvent(Event event) {
        
        // 根据不同事件类型处理。
        
        if (event instanceof MetadataEvent.InstanceMetadataEvent) {
            // 处理事件
            
        } else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
            // 处理事件
            
        } else {
            // 处理事件
            
        }
    }
}

这是一个支持多事件类型的订阅者,其生产者便是由同样为支持多事件类型的 ShardedEventPublisher来生产消息

4. Nacos 中的事件

Event

Event 就是整个消息通知系统中的消息,消息就是 Event。

Event 是一个抽象类,所以消息事件都是继承该类。

SlowEvent

SlowEvent 标识着多事件版本。

全部事件类图

看看 Nacos 中都有哪些事件吧

 

5. 统一事件通知中心 NotifyCenter

整个消息通知系统都由 NotifyCenter 类来操作完成。

com.alibaba.nacos.common.notify.NotifyCenter

NotifyCenter 是整个消息通知系统中的非常关键的类,门面模式,负责管理整个消息通知系统。

提供快捷方法完成 注册或卸载生产者、注册或卸载消费者等

public class NotifyCenter {
    
    // 注册发布者
    public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
        ...
    }
    
    // 注册订阅者
    public static void registerSubscriber(final Subscriber consumer) {
        ...
    }
    
    // 发布事件
    public static boolean publishEvent(final Event event) {
        ....
    }
    
    // 省略部分代码
}

注册发布者

// 第一个参数指定事件类型,表示该发布者处理的事件类型
// 第二个参数指定队列大小
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
// 最终注册方法重载到如下方法
​
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
            final EventPublisherFactory factory, final int queueMaxSize) {
    
    // 如果是 SlowEvent 就返回 对应的多事件的 发布者
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher;    
    }
    
    // 获取当前事件的类名作为 topic 例如:com.alibaba.nacos.naming.core.v2.event.client.ClientEvent
    final String topic = ClassUtils.getCanonicalName(eventType);
    
    synchronized (NotifyCenter.class) {
        // 如果发布者map中还没有当前注册者,就加入注册者
        MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);   
    }
    return INSTANCE.publisherMap.get(topic);
}
​
    
/**
 * 存放发布者 Map<Topic,事件> 一一对应
 */
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);

可以发现,注册发布者的过程就是讲事件类型的类名 和 发布者实例 放入 map 中。

注册订阅者

注册订阅者可以直接传一个订阅者。

NotifyCenter.registerSubscriber(new Subscriber() {...});
// 最终注册方法重载到如下方法   
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
            EventPublisherFactory factory) {
        
    final String topic = ClassUtils.getCanonicalName(subscribeType);
        synchronized (NotifyCenter.class) {        
            MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType,       ringBufferSize);
        }
    
        // 获取到处理当前事件的发布者
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher instanceof ShardedEventPublisher) {
            
            // 调用发布者的 添加订阅者方法
            ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
        } else {
            
            // 调用发布者的 添加订阅者方法
            publisher.addSubscriber(consumer);
        }
    
}

可见,注册订阅者最终是调用发布者的添加订阅者方法。

为了找到发布者,发布者必须先注册到 NotifyCenter 中。才能注册订阅者。

发布事件

NotifyCenter.publishEvent(new RegisterServiceTraceEvent(System.currentTimeMillis(), namespaceId,
                NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName)));
// 最终发布重载到如下方法   
​
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);
        }
        
        final String topic = ClassUtils.getCanonicalName(eventType);
        
        // 根据事件找到 发布者
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            // 调用发布者的 发布
            return publisher.publish(event);
        }
        if (event.isPluginEvent()) {
            return true;
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
}

可见,发布事件实现原理是 找到发布者,然后调用发布者的发布方法。

6. 总结

消息通知系统可分为两种系列

单事件:由 DefaultPublisher、Subscriber、Event 组成

多事件:由 ShardedEventPublisher、SmartSubscriber、SlowEvent 组成

整个消息通知系统由 NotifyCenter 类来操作完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/95078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

virtualenv系列 (5) · 打通virtualenvwrapper和PyCharm/IntelliJ IDEA的虚拟环境

PyCharm/IntelliJ IDEA已经内置了对virtualenv的支持&#xff0c;当我们为一个项目创建一个新的Python SDK时&#xff0c;就会进入到Add Python Interpreter对话框&#xff0c;选择Virtualenv Environment -> New environment就将创建一套新的virtualenv环境。这里&#xff…

关于长三角某高校能效管理平台应用成效探讨

更多请关注&#xff1a;安科瑞电气网 摘要&#xff1a; 随着现代科学技术的发展&#xff0c;在高校中开始广泛应用智能化技术&#xff0c;改善学生宿舍的用电管理模式&#xff0c;提高宿舍的管理水平&#xff0c;有利于实现高校宿舍用电管理的科学化。本文主要阐述传统高校宿…

D2. Seating Arrangements (hard version)(贪心+排序)

Problem - 1566D2 - Codeforces 这是该问题的困难版本。唯一的区别是&#xff0c;在这个版本中&#xff0c;1≤n≤300。 电影院的座位可以表示为有n行和m列的表格。每行的座位都用1到n的整数编号&#xff0c;从左到右连续编号&#xff1a;在第k行&#xff0c;从m(k-1)1到mk&am…

前端的视角聊聊如何快速入门Python

对于 NodeJs 开发的小伙伴&#xff0c;使用 node-gyp 将 C 模块转换成 NodeJs 的 addon 插件时会依赖 Python 的安装环境&#xff08;针对使用了 nvm 等版本管理工具的情况&#xff09;。对于前端小伙伴来说&#xff0c;Python 本身确实不是一个必须学习的语言&#xff0c;但也…

提前做好网络安全分析,运维真轻松(一)

背景 某汽车总部已部署NetInside流量分析系统&#xff0c;使用流量分析系统提供实时和历史原始流量。汽车配件电子图册系统是某汽车集团的重要业务系统。本次分析重点针对汽车配件电子图册系统进行预见性分析&#xff0c;以供安全取证、性能分析、网络质量监测以及深层网络分析…

SQL基础——聚合与排序

聚合与排序前言思维导图聚合函数示例表3-1![在这里插入图片描述](https://img-blog.csdnimg.cn/9c286053f87a4ae882bece8128bdcab5.png)计算表中数据的行数 COUNT函数示例代码3.1 计算全部数据的行数执行结果计算NULL之外的数据的行数代码示例3.2 计算NULL之外的数据行数执行结…

【DevOps实战系列】第五章:基于Gitlab/Maven/Jenkins/Docker实战案例详解

个人亲自录制全套DevOps系列实战教程 &#xff1a;手把手教你玩转DevOps全栈技术 从创建Jenkins的job开始 1.gitlab设置&#xff1a; 我们从新建一个jenkins任务开始&#xff0c;建一个自由风格项目&#xff0c;我们暂时只让他能拉取git的代码。 路径&#xff1a;从gitlab上新…

【ML笔记】5、支持向量机(SVM)

支持向量机&#xff08;SVM&#xff09;是一个强大的和通用的ML模型&#xff0c;能够执行分类&#xff0c;回归&#xff0c;甚至异常值检测&#xff0c;特别适合于复杂的中小型数据集。 1、线性SVM分类 我们可以看到&#xff0c;这两个类可以很容易地用一条直线(线性可分)分开…

GAN入门知识

GAN入门知识 结构 正式说 GAN 之前我们先说一下判别式模型和生成式模型。 判别器 判别式模型 判别式模型&#xff0c;即 Discriminative Model&#xff0c;又被称为条件概率模型&#xff0c;它估计的是条件概率分布(conditional distribution)&#xff0c; p(class|context) 。…

图书网上商店

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 1

docker-compose安装部署kafka

文章目录前言一、环境信息二、准备部署1.准备路径2.安装docker-compse&#xff0c;下载镜像3.生成yml文件2.执行部署三、登陆页面前言 记录一下使用docker-compose部署kafka平台的过程 参考&#xff1a;https://blog.csdn.net/QQ83512272/article/details/126368978 一、环境信…

Socket Websocket 客户端和服务端实现

最近在写一个上位机&#xff0c;用到了Websocket&#xff0c;这里就整理一下&#xff0c;顺便把Socket的东西也整理的了&#xff0c;方便以后查阅。 Socket Websocket 客户端和服务端实现Socket客户端和服务端实现Socket客户端Socket服务端实现效果Websocket 客户端和服务端实现…

QT Linux环境搭建——VM虚拟机和Ubuntu的安装

1、从ubuntu官网上下载iso镜像文件&#xff0c;该镜像文件用于安装linux操作系统&#xff0c;以下微官网链接 Enterprise Open Source and Linux | Ubuntu 选择一个最新的稳定版本下载即可&#xff08;好处是&#xff0c;不需要像一些别的网站&#xff0c;必须要注册&#xff…

基于java+springmvc+mybatis+vue+mysql的远程家庭健康监测管理系统小程序

项目介绍 本系统采用java语言开发&#xff0c;后端采用ssm框架&#xff0c;前端采用vue技术&#xff0c;数据库采用mysql进行数据存储。 前台&#xff1a; 登录注册、查看个人信息、留言反馈、查看健康信息、查看百科、社区交流 后台&#xff1a; 首页、个人中心、用户管理、…

陈天老师的Rust培训(2)学习笔记

所有权&#xff1a; Rust中的每一个值都有一个被称为其 所有者&#xff08;owner&#xff09;的变量值在任一时刻有且只有一个所有者。当所有者(变量)离开作用域的时候&#xff0c;这个值将被丢弃。 Copy的类型&#xff1a; 所有整数类型&#xff0c;比如u32。布尔类型所有浮…

本地springboot jar 部署到云服务器linux [安装jdk 安装msyql]

A). 安装jdk 参考博客CentOS 8 安装 JAVA 三种方式(yum / rpm / tar.gz) [rootiZt4ned91xzjstx1s6ftjvZ local]# rpm -qa |grep java [rootiZt4ned91xzjstx1s6ftjvZ local]# rpm -qa |grep jdk [rootiZt4ned91xzjstx1s6ftjvZ local]# rpm -qa |grep jre [rootiZt4ned91xzjstx…

bug:解决java.security.InvalidKeyException: Illegal key size or default parameters

bug:解决java.security.InvalidKeyException: Illegal key size or default parameters 1 复现 今天对接外链需要使用AES加密的时候&#xff0c;对方使用的是AES&#xff1a;AES/ECB/PKCS7Padding&#xff0c;但是在加密过程中&#xff0c;发现报错 报错信息&#xff1a;java.…

Spark Shell 的使用

Spark Shell 的使用 Spark shell 作为一个强大的交互式数据分析工具&#xff0c;提供了一个简单的方式学习 API。它可以使用 Scala&#xff08;在Java 虚拟机上运行现有的Java库的一个很好方式&#xff09;或 Python。 Spark Shell 命令 启动 Spark Shell 的时候我们可以指定…

JavaIO

CPU指令与内核态、用户态 在操作系统中&#xff0c;CPU负责执行指令,这些指令有些来自应用程序&#xff0c;有些是来自底层系统。 有些指令是非常危险的&#xff0c;如清除内存&#xff0c;网络连接等等&#xff0c;如果错误调用的话有可能导致系统崩溃。 因而CPU将指令分为特…

Excel表格的打开密码如何设置和取消?

给Excel表格设置“打开密码”是保护表格的其中一种方法&#xff0c;这样只有输入正确的密码才能打开表格。 那Excel表格的打开密码如何设置和取消呢&#xff1f; 首先&#xff0c;打开Excel表格&#xff0c;点击菜单栏的【文件】选项&#xff0c;然后依次选择【信息】-【保护…