死磕Nacos系列:Nacos事件发布订阅模型

news2025/1/23 12:10:35

前言

在Nacos源码中,你是否也经常看到NotifyCenter.publishEvent这样的代码块?

这个事件发布出去后,有哪些类接收到通知并进行了逻辑处理呢?

这里面的实现逻辑是什么呢?

如果你不太清楚,那我们一起来梳理下。Let’s go!

NotifyCenter

如果我们不靠其它资料,从代码层面去解读的话,还是得先从这个类入手,因为都是调用其静态方法完成通知的。

这个类的静态代码块里面有这样的逻辑:

static {
    // 从系统环境中取"nacos.core.notify.ring-buffer-size",如果没有设置,则取默认值16384
    String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
    ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
    
    // 从系统环境中取"nacos.core.notify.share-buffer-size",如果没有设置,则取默认值1024
    String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
    shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
    
    // 通过SPI的方式加载EventPublisher的实现类
    final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
    Iterator<EventPublisher> iterator = publishers.iterator();
    
    if (iterator.hasNext()) {
        clazz = iterator.next().getClass();
    } else {
        clazz = DefaultPublisher.class;
    }
    
    // 创建一个默认的EventPublisherFactory,用来产生EventPublisher
    DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
        try {
            EventPublisher publisher = clazz.newInstance();
            publisher.init(cls, buffer);
            return publisher;
        } catch (Throwable ex) {
            LOGGER.error("Service class newInstance has error : ", ex);
            throw new NacosRuntimeException(SERVER_ERROR, ex);
        }
    };
    
    try {
        
        // 创建一个默认共享的EventPublisher
        INSTANCE.sharePublisher = new DefaultSharePublisher();
        INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
        
    } catch (Throwable ex) {
        LOGGER.error("Service class newInstance has error : ", ex);
    }
    
    // 添加一个ShutdownHook
    ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}

从这里我们可以知道,有EventPublisherFactoryEventPublisher的概念,在EventPublisher中,还专门创建了一个DefaultSharePublisher,还知道了一个事件叫做SlowEvent

那我们就看一下这几个模型:

模型

EventPublisherFactory

public interface EventPublisherFactory extends BiFunction<Class<? extends Event>, Integer, EventPublisher> {
    
    /**
     * Build an new {@link EventPublisher}.
     *
     * @param eventType    eventType for {@link EventPublisher}
     * @param maxQueueSize max queue size for {@link EventPublisher}
     * @return new {@link EventPublisher}
     */
    @Override
    EventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize);
}

可以看到,接口中就一个方法,就是通过事件类型和最大队列大小来得到一个EventPublisher

image-20231125165128331

其中有NamingEventPublisherFactoryTraceEventPublisherFactory两个实现类。

EventPublisher

public interface EventPublisher extends Closeable {
    
    // 初始化EventPublisher
    void init(Class<? extends Event> type, int bufferSize);
    
    // 获取当前事件的数量
    long currentEventSize();
    
    // 添加一个订阅者
    void addSubscriber(Subscriber subscriber);
    
    // 删除一个订阅者
    void removeSubscriber(Subscriber subscriber);
    
    // 发布事件
    boolean publish(Event event);
    
    // 通知订阅者
    void notifySubscriber(Subscriber subscriber, Event event);
    
}

其类图如下:

image-20231125165544405

这里SharedEventPublisher看起来是做了更多的扩展,来看看呢:

public interface ShardedEventPublisher extends EventPublisher {
    
    // 给指定的一个事件添加订阅者
    void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
    
    // 给指定的一个事件删除订阅者
    void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}

这里新加入了个Subscriber,那我们来看看:

Subscriber

public abstract class Subscriber<T extends Event> {
    
    // 接收事件
    public abstract void onEvent(T event);
    
    // 获取当前订阅的事件类型
    public abstract Class<? extends Event> subscribeType();
    
    // 获取异步处理器
    public Executor executor() {
        return null;
    }
    
    // 忽略失效的事件
    public boolean ignoreExpireEvent() {
        return false;
    }
    
    // 获取事件是否匹配作用域
    public boolean scopeMatches(T event) {
        return true;
    }
}

有个特殊的SmartSubscriber,它是Subscriber的子类,是一个抽象类:

public abstract class SmartSubscriber extends Subscriber {
    
    // 返回订阅的事件列表
    public abstract List<Class<? extends Event>> subscribeTypes();
    
    @Override
    public final Class<? extends Event> subscribeType() {
        return null;
    }
    
    // 默认不忽略过期的事件
    @Override
    public final boolean ignoreExpireEvent() {
        return false;
    }
}

Event

public abstract class Event implements Serializable {
    
    private static final long serialVersionUID = -3731383194964997493L;
    
    private static final AtomicLong SEQUENCE = new AtomicLong(0);
    
    private final long sequence = SEQUENCE.getAndIncrement();
    
    // 获取事件序号
    public long sequence() {
        return sequence;
    }
    
    // 获取事件作用域
    public String scope() {
        return null;
    }
    
    // 判断是否是一个插件的事件
    public boolean isPluginEvent() {
        return false;
    }
}

SlowEvent中,仅仅是将sequence统一默认为0。

public abstract class SlowEvent extends Event {
    
    @Override
    public long sequence() {
        return 0;
    }
}

综上,我们通过接口关系就可以大概得出一个结论:

EventPublisher可通过EventPublisherFactory创建得来,EventPublisher可以给Event添加Subscriber,并发布事件让对应的Subscriber执行。这不妥妥的观察者模式嘛!

我们来画张图:

image-20231125171510677

了解了NotifyCenter大概的模型之后,我们来看看内部的实现。

实现方法

registerToPublisher

给时间注册一个EventPublisher。

public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
        final EventPublisherFactory factory, final int queueMaxSize) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher;
    }
    
    final String topic = ClassUtils.getCanonicalName(eventType);
    synchronized (NotifyCenter.class) {
        // MapUtils.computeIfAbsent is a unsafe method.
        MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
    }
    return INSTANCE.publisherMap.get(topic);
}

诶,如果事件是SlowEvent,就会统一返回一个sharePublisher,这个就是静态代码块中初始化的DefaultSharePublisher

好,(敲黑板),所有SlowEvent的事件共用一个DefaultSharePublisher

接着给NotifyCenter上锁,如果publisherMap中没有当前事件的EventPublisher,那就用EventPublisherFactory创建一个EventPublisher,并存入publisherMap中,再返回。

registerSubscriber

给事件注册一个订阅者。

public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
    // 如果订阅者是一个SmartSubscriber,可以订阅多个事件
    if (consumer instanceof SmartSubscriber) {
        for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
            // 如果事件是SlowEvent,则添加到sharePublisher中
            if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
                INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
            } else {
                // 其他事件则添加到对应的EventPublisher中
                addSubscriber(consumer, subscribeType, factory);
            }
        }
        return;
    }
    
    // 其他类型的Subscriber
    final Class<? extends Event> subscribeType = consumer.subscribeType();
    if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
        // 如果事件是SlowEvent,则添加到sharePublisher中
        INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
        return;
    }
    // 其他事件则添加到对应的EventPublisher中
    addSubscriber(consumer, subscribeType, factory);
}

publishEvent

发送事件。

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {\
    
    // 如果事件是SlowEvent,则用sharePublisher发送
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    
    final String topic = ClassUtils.getCanonicalName(eventType);
    
    // 根据事件找到对应的EventPublisher进行发送                                                  
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        return publisher.publish(event);
    }
    if (event.isPluginEvent()) {
        return true;
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

总结

Nacos的事件发布是基于观察者模式进行设计的。

如果一个处理器需要接收到某个特定事件的通知,那么就要先通过NotifyCenter来订阅自己感兴趣的Event,当显示调用NotifyCenter.publishEvent时,会通过EventPublisher进行通知到订阅者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1251638.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

企业计算机中了locked勒索病毒怎么解锁,locked勒索病毒解密,数据恢复

科技的进步为企业的生产生活提供了极大便利&#xff0c;但随之而来的网络安全威胁也不断增加&#xff0c;近期云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业的计算机服务器遭到了locked勒索病毒攻击&#xff0c;导致企业的所有业务无法正常开展&#xff0c;所有计算…

Python武器库开发-前端篇之CSS盒模型(三十一)

前端篇之CSS盒模型(三十一) CSS盒模型是指网页中的每个元素可以看做是一个矩形盒子&#xff0c;该盒子有四个主要部分组成&#xff1a;content、padding、border和margin。其中&#xff1a; content&#xff1a;指盒子中的内容区域&#xff0c;可以包含文本、图像、视频、其他…

安装最新版WebStorm来开发JavaScript应用程序

安装最新版WebStorm来开发JavaScript应用程序 Install the Latest Version of JetBrains WebStorm to Develop JavaScript Applications By JacksonML 2023-11-25 1. 系统要求 WebStorm是个跨平台集成开发环境&#xff08;IDE&#xff09;。按照JetBrains官网对WebStorm软件…

Elasticsearch集群部署,配置head监控插件

Elasticsearch是一个开源搜索引擎&#xff0c;基于Lucene搜索库构建&#xff0c;被广泛应用于全文搜索、地理位置搜索、日志处理、商业分析等领域。它采用分布式架构&#xff0c;可以处理大规模数据集和支持高并发访问。Elasticsearch提供了一个简单而强大的API&#xff0c;可以…

【JavaEE初阶】线程安全问题及解决方法

目录 一、多线程带来的风险-线程安全 1、观察线程不安全 2、线程安全的概念 3、线程不安全的原因 4、解决之前的线程不安全问题 5、synchronized 关键字 - 监视器锁 monitor lock 5.1 synchronized 的特性 5.2 synchronized 使用示例 5.3 Java 标准库中的线程安全类…

修改YOLOv5的模型结构第三弹

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制&#x1f680; 文章来源&#xff1a;K同学的学习圈子 文章目录 任务任务拆解 开始修改C2模块修改yolo.py修改模型配置文件 模型训练 上次已…

rfc4301- IP 安全架构

1. 引言 1.1. 文档内容摘要 本文档规定了符合IPsec标准的系统的基本架构。它描述了如何为IP层的流量提供一组安全服务&#xff0c;同时适用于IPv4 [Pos81a] 和 IPv6 [DH98] 环境。本文档描述了实现IPsec的系统的要求&#xff0c;这些系统的基本元素以及如何将这些元素结合起来…

第十三章 深度解读预训练与微调迁移,模型冻结与解冻(工具)

一个完整的代码 pythonCopy codeimport torch import torchvision import torchvision.transforms as transforms import torch.nn as nn import torch.optim as optim # 设置设备&#xff08;CPU或GPU&#xff09; device torch.device("cuda" if torch.cuda.is_a…

canvas扩展001:利用fabric绘制图形,可以平移,旋转,放缩

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 示例…

stm32实现0.96oled图片显示,菜单功能

stm32实现0.96oled图片显示&#xff0c;菜单功能 功能展示简介代码介绍oled.coled.holedfont.h&#xff08;字库文件&#xff09;main函数 代码思路讲解 本期内容&#xff0c;我们将学习0.96寸oled的进阶使用&#xff0c;展示图片&#xff0c;实现菜单切换等功能&#xff0c;关…

MySQL日期函数sysdate()与now()的区别,获取当前时间,日期相关函数

select sleep(2) as datetime union all select sysdate() -- sysdate() 返回的时间是当前的系统时间&#xff0c;而 now() 返回的是当前的会话时间。 union all select now() -- 等价于 localtime,localtime(),localtimestamp,localtimestamp(),current_timestamp,curre…

基于鱼鹰算法优化概率神经网络PNN的分类预测 - 附代码

基于鱼鹰算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于鱼鹰算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于鱼鹰优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神经网络的光滑…

程序的编译与链接(详解)

程序的编译与链接 本章内容如下&#xff1a; 1:程序的翻译环境与执行环境的介绍 2:详解程序的翻译环境(编译链接) 2.1预处理阶段干了啥2.2编译阶段干了啥2.3汇编阶段干了啥2.4链接阶段干了啥 3:预处理详解 预定义符号的介绍#define 的介绍(宏与标识符号)#与##的介绍宏与函数…

对象的内部结构

在HotSpot 虚拟机里&#xff0c;对象在堆内存中的存储布局可以划分为三个部分&#xff1a;对象头&#xff08; Header &#xff09;、实例数据&#xff08;Instance Data &#xff09;和对齐填充&#xff08; Padding &#xff09;。 对象头 Mark Word&#xff08;标记字段&a…

RK3568驱动指南|第八篇 设备树插件-第72章 设备树插件语法和编译实验

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

[element-ui] el-dialog 中的内容没有预先加载,因此无法获得内部元素的ref 的解决方案

问题描述 在没有进行任何操作的时候&#xff0c;使用 this.$refs.xxxx 无法获取el-dialog中的内部元素&#xff0c;这个问题会导致很多bug. 官方解释&#xff0c;在open事件回调中进行&#xff0c;但是open()是弹窗打开时候的会调&#xff0c;有可能在此处获取的时候&#xff…

教师授课技巧

一名教师&#xff0c;授课技巧是提高教学效率和质量的关键。以下是几个实用的授课技巧&#xff0c;可以帮你更好的传授知识&#xff0c;激发学习兴趣。 一、做好课前准备 课前准备是授课技巧的重要环节。认真备课&#xff0c;熟悉教材内容&#xff0c;制定教学计划&#xff0c…

redis运维(二十一)redis 的扩展应用 lua(三)

一 redis 的扩展应用 lua redis加载lua脚本文件 ① 调试lua脚本 redis-cli 通过管道 --pipe 快速导入数据到redis中 ② 预加载方式 1、错误方式 2、正确方式 "案例讲解" ③ 一次性加载 执行命令&#xff1a; redis-cli -a 密码 --eval Lua脚本路径 key …

【Docker】Docker与Kubernetes:区别与优势对比

前言 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。   kubernetes&#xff0c;简称K8s&a…

MybatisPlus集成baomidou-dynamic,多数据源配置使用、MybatisPlus分页分组等操作示例

文章目录 pom配置示例代码 pom <dependencies><!--mybatisPlus集成SpringBoot起步依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version>&l…