5、Nacos服务注册服务端源码分析(四)之NotifyCenter

news2025/1/11 14:05:58

上篇我们讲server端处理服务注册源码时,遇到了一个关键类NotifyCenter,本篇就主要来分析下这个类。

NotifyCenter

这个类所在包:nacos-common

/**
 * Unified Event Notify Center.
 */

通过类注释可以看出来这个类是一个统一的事件通知中心,那也就足可见这个类的重要性了。

public class NotifyCenter

通过类定义发现这个类就是一个普通类,不会被注册为spring bean且类中的方法都是静态方法,也就是说这个类是整个进程共享的了,所以其中的内部变量必然要是线程安全的,结合源码看看是如何处理的。

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);
//这两个变量是在静态块中初始化的,所以不需要特殊处理
public static int ringBufferSize;
public static int shareBufferSize;

private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
//初始化EventPublisher的工厂
private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;

private static final NotifyCenter INSTANCE = new NotifyCenter();

private DefaultSharePublisher sharePublisher;
//SPI机制获取到EventPublisher实现之后,获取class,赋值到此
private static Class<? extends EventPublisher> clazz;

/**
 * Publisher management container.
 */
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);

static {
    // Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
    // this value needs to be increased appropriately. default value is 16384
    String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
    ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
    
    // The size of the public publisher's message staging queue buffer
    String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
    shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
    //采用SPI拓展机制,NacosServiceLoader内部封装了JDK的ServiceLoader,在此基础上加了一层缓存
    final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
    Iterator<EventPublisher> iterator = publishers.iterator();
    if (iterator.hasNext()) {
        clazz = iterator.next().getClass();
    } else {
    	//默认使用DefaultPublisher
        clazz = DefaultPublisher.class;
    }
    
    DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
        try {
        	//实例化EventPublisher
            EventPublisher publisher = clazz.newInstance();
            //调用EventPublisher初始化方法
            publisher.init(cls, buffer);
            return publisher;
        } catch (Throwable ex) {
            LOGGER.error("Service class newInstance has error : ", ex);
            throw new NacosRuntimeException(SERVER_ERROR, ex);
        }
    };
    
    try {
        
        // Create and init DefaultSharePublisher instance.
        INSTANCE.sharePublisher = new DefaultSharePublisher();
        INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
        
    } catch (Throwable ex) {
        LOGGER.error("Service class newInstance has error : ", ex);
    }
    //JVM销毁前执行的钩子函数 NotifyCenter::shutdown
    ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}

查看代码可以知道关键的处理逻辑是:创建EventPublisher


EventPublisher

以上涉及到两个EventPublisher一个是默认的DefaultPublisher,另一个是DefaultSharePublisher
在这里插入图片描述
我们来一个个梳理下UML中的类:

interface EventPublisher

事件通知的接口,定义了事件通知的一系列方法

/**
  * Initializes the event publisher.
  * 初始化
  * @param type       {@link Event >}
  * @param bufferSize 消息队列大小
  */
 void init(Class<? extends Event> type, int bufferSize);
 
 /**
  * The number of currently staged events.
  * 当前暂存的事件数量
  * @return event size
  */
 long currentEventSize();
 
 /**
  * Add listener.
  *	添加订阅者
  * @param subscriber {@link Subscriber}
  */
 void addSubscriber(Subscriber subscriber);
 
 /**
  * Remove listener.
  * 移除订阅者
  * @param subscriber {@link Subscriber}
  */
 void removeSubscriber(Subscriber subscriber);
 
 /**
  * publish event.
  *	发布事件
  * @param event {@link Event}
  * @return publish event is success
  */
 boolean publish(Event event);
 
 /**
  * Notify listener.
  * 通知订阅者
  * @param subscriber {@link Subscriber}
  * @param event      {@link Event}
  */
 void notifySubscriber(Subscriber subscriber, Event event);
class DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher

DefaultPublisher 是事件发布接口EventPublisher的默认实现,除此之外,还继承了Thread,那么也就意味着DefaultPublisher除了有事件处理能力,本身还是一个线程。

先看接口中定义的初始化方法init是如何实现的,从这里看起会很好的帮助我们理解这个类:

@Override
public void init(Class<? extends Event> type, int bufferSize) {
	//设置为守护线程
    setDaemon(true);
    //设置线程名称,一定程度上这可以帮助我们排查问题
    setName("nacos.publisher-" + type.getName());
    //Event类型
    this.eventType = type;
    //队列长度
    this.queueMaxSize = bufferSize;
    //队列
    this.queue = new ArrayBlockingQueue<>(bufferSize);
    //Thread的start()方法
    start();
}
//全局变量,用来标识当前Publisher是否初始化完成
private volatile boolean initialized = false;
@Override
public synchronized void start() {
    if (!initialized) {
        // start just called once
        super.start();
        if (queueMaxSize == -1) {
            queueMaxSize = ringBufferSize;
        }
        initialized = true;
    }
}

即然继承了Thread那么就肯定回去重写关键的run()方法,我们看下是如何重写的:

@Override
public void run() {
    openEventHandler();
}

void openEventHandler() {
    try {
        
        // This variable is defined to resolve the problem which message overstock in the queue.
        // 这个变量用于解决队列中消息积压的问题
        int waitTimes = 60;
        // To ensure that messages are not lost, enable EventHandler when
        // waiting for the first Subscriber to register
        // 没有关闭 && 没有订阅者 && waitTimes>0 --> 休眠1s,最多可以sleep 60s
        while (!shutdown && !hasSubscriber() && waitTimes > 0) {
            ThreadUtils.sleep(1000L);
            waitTimes--;
        }

        while (!shutdown) {
        	//如果没有获取到事件,这里会阻塞
            final Event event = queue.take();
            //处理事件
            receiveEvent(event);
            UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
        }
    } catch (Throwable ex) {
        LOGGER.error("Event listener exception : ", ex);
    }
}

可以看到,这里有一个死循环,只要没有收到shutdown事件,那么就会一直尝试从队列中获取事件去处理。这里获取事件的逻辑是从阻塞队列中取数据,所以当没有时间要处理时,当前线程会阻塞在获取事件的逻辑中,不会对cpu有过多的压力。

/**
 * Receive and notifySubscriber to process the event.
 *
 * @param event {@link Event}.
 */
void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();
    
    if (!hasSubscriber()) {
        LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
        return;
    }
    
    // Notification single event listener
    for (Subscriber subscriber : subscribers) {
        if (!subscriber.scopeMatches(event)) {
            continue;
        }
        
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }
        
        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        notifySubscriber(subscriber, event);
    }
}

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
    //将订阅者对事件的处理构建成一个Runnable
    final Runnable job = () -> subscriber.onEvent(event);
    //获取订阅者处理事件的Executer,这里的设计逻辑是:将异步或者同步的决定权交给订阅者本身,
    //是否异步处理决定于订阅者对executor()方法的实现方式
    final Executor executor = subscriber.executor();
    
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
        	//如果没有executor,那就直接执行Runnable
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception: ", e);
        }
    }
}

看了这么多代码,简单对DefaultPublisher做个总结:

  • 是什么?
    • 事件通知的默认实现,用于处理事件通知逻辑
    • 继承了线程
  • 有什么?
    • 订阅者集合
    • 事件集合
  • 做什么?
    • 处理事件通知,同时处理订阅者的相关逻辑(添加/删除)
  • 怎么做?
    • 初始化方法中会启动线程
    • run()处理事件,从自己的事件集合中获取事件,然后交给订阅者去处理
class DefaultSharePublisher

这个类继承了DefaultPublisher,设计的目的应该是处理一些比较耗时的事件。本身有一个map做缓存,没有什么特别的逻辑。

private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();

这个类去处理事件的逻辑还是其父类DefaultPublishernotifySubscriber(subscriber, event)方法。我们上文已经看过这个方法了。


回过头再看NotifyCenter

看完EventPublisher之后,别忘记我们为什么出发🫣。
回过头再看NotifyCenter:我们从接收一个事件开始,从头梳理代码。

接收事件
/**
 * Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is
 * actually published.
 *
 * @param event class Instances of the event.
 */
public static boolean publishEvent(final Event event) {
    try {
        return publishEvent(event.getClass(), event);
    } catch (Throwable ex) {
        LOGGER.error("There was an exception to the message publishing : ", ex);
        return false;
    }
}
发布事件
/**
 * Request publisher publish event Publishers load lazily, calling publisher.
 *
 * @param eventType class Instances type of the event type.
 * @param event     event instance.
 */
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
    	//当前事件如果是SlowEvent,那么交与sharePublisher处理
        return INSTANCE.sharePublisher.publish(event);
    }
    //获取Event的类名
    final String topic = ClassUtils.getCanonicalName(eventType);
    
    //根据事件的类名,从map中获取EventPublisher去处理当前事件
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        return publisher.publish(event);
    }
    if (event.isPluginEvent()) {
        return true;
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

ps:Class.getCanonicalName()的用法

EventPublisher是如何添加到publisherMap中的?
public static void registerSubscriber(final Subscriber consumer) {
    registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);
}
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
    // If you want to listen to multiple events, you do it separately,
    // based on subclass's subscribeTypes method return list, it can register to publisher.
    //SmartSubscriber是为了处理一个订阅者订阅多个事件而设计的
    if (consumer instanceof SmartSubscriber) {
        for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
            // For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
            if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
                INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
            } else {
                // For case, producer: defaultPublisher -> consumer: subscriber.
                addSubscriber(consumer, subscribeType, factory);
            }
        }
        return;
    }
    
    final Class<? extends Event> subscribeType = consumer.subscribeType();
    if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
        INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
        return;
    }
    //添加订阅者
    addSubscriber(consumer, subscribeType, factory);
}

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
        EventPublisherFactory factory) {
    
    final String topic = ClassUtils.getCanonicalName(subscribeType);
    synchronized (NotifyCenter.class) {
        // MapUtils.computeIfAbsent is a unsafe method.
        MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
    }
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher instanceof ShardedEventPublisher) {
        ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
    } else {
        publisher.addSubscriber(consumer);
    }
}

最后的一个方法中我们看到了Map<String, EventPublisher> publisherMap的数据添加逻辑:根据Event的类名做为key,EventPublisher作为value。
同时我们也看到了事件订阅者是如何和事件发布者建立关系的:publisher.addSubscriber(consumer);,publisher中有一个集合,用于存储事件的订阅者。

总结

好了,总结下NotifyCenter:

  • 是什么?
    • 统一的事件处理类,所有的事件通知都借助此类去处理
    • 组织事件-发布者的关联关系
  • 有什么?
    • 事件处理类EventPublisher
    • 共享事件处理类DefaultSharePublisher,用于处理耗时的事件
    • Map<String, EventPublisher> publisherMap
  • 做什么?
    • 注册发布者到publisherMap
    • 接收事件之后交与对应的订阅者处理
  • 怎么做?
    • 根据Event的类名去publisherMap 中获取对应的EventPublisher,然后发布事件,交与对应的订阅者处理事件

本篇主要讲了NotifyCenter相关的事件代码逻辑,特别对 EventPublisher做了详细的分析。我们从中看到了一个事件发布与订阅解耦的设计方式,很多框架中都会使用事件驱动去设计发布-订阅模型,日常工作中如果遇到类似的情景,也可以尝试使用这种方式去实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1039491.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP8中的构造方法和析构方法-PHP8知识详解

今日分享的内容是php8中的构造方法和析构方法&#xff0c;我们把构造方法和析构方法这两个方法分开来讲&#xff1a; 1、构造方法 构造方法存在于每个声明的类中&#xff0c;主要作用是执行一些初始化任务。如果类中没有直接声明构造方法&#xff0c;那么类会默认地生成一个没…

仿网吧游戏菜单-超好用

GG软件菜单(X64)仿网吧菜单: https://url75.ctfile.com/f/1834175-943877085-2d844a?p6775 (访问密码: 6775)

如何将前后端分离的项目部署在服务器上

宝塔Linux部署&#xff1a; 因为要部署前端我们先下个nigx Tomcat,下载这个只是为了java&#xff0c;它里面包含java的 前端 在去添加站点&#xff0c;域名暂时是自己的公网 然后打开新建的站点&#xff0c;把里面的文件全删掉&#xff0c;再把自己的前端dist里的文件全选拖…

高速USB转8路RS422串口

基于480Mbps 高速USB转8路串口芯片CH348&#xff0c;可以为各类主机扩展出8个独立的串口。使用厂商提供的VCP串口驱动程序&#xff0c;可支持Windows、Linux、Android、macOS等操作系统。使用单个CH348芯片即可实现USB一拖八串口转接产品&#xff0c;高速USB收发器和控制器、高…

SpringCloud 简单的了解

什么是SpringCloud ? 基于 Spring Boot 的 Spring 集成应用程序&#xff0c;它利用 Spring Boot 的开发便利性简化了分布式系统的开发&#xff0c;提供与外部系统的集成。 如服务注册与发现、配置中心、负载均衡、断路器、消息总线、数据监控等&#xff1b;换句话说&#xff…

拼多多商品详情数据接口

拼多多商品详情接口的具体内容。获取拼多多商品详情&#xff0c;可以参考如下方式&#xff1a; item_get_app-根据ID取商品详情原数据接口包括&#xff1a;标题&#xff0c;价格&#xff0c;促销价&#xff0c;优惠券&#xff0c;库存&#xff0c;销量&#xff0c;详情图片&am…

Android开发之状态栏的设置

Android页面开发通常是根据UI设计进行&#xff0c;真机会遇到顶部状态栏和页面背景色或背景图片不协调的情况&#xff0c;这时候需要对状态栏进行设置。默认状态栏是有固定高度和背景色的&#xff0c;基本上我们需要将状态栏背景色设置透明并且图标能够在页面显示&#xff0c;下…

【项目实战】Linux系统下jar包自启动

什么是jar包自启动 在Linux系统中&#xff0c;"jar包自启动"是指通过配置将Java程序打包成可执行的Jar文件&#xff0c;并设置其在系统启动时自动运行。以下是与jar包自启动相关的一些概念&#xff1a; Jar文件&#xff1a;Jar&#xff08;Java Archive&#xff09…

Postgresql事务测试

参考一个事务中 可以查询自己未提交的数据吗_最详细MySQL事务隔离级别及原理讲解&#xff01;&#xff08;二&#xff09;-CSDN博客 一个事务中 可以查询自己未提交的数据吗_趣说数据库事务隔离级别与原理_weixin_39747293的博客-CSDN博客 【MySql&#xff1a;当前读与快照读…

虚拟机与物理机之寄生贴贴

虚拟机 虚拟机指通过【软件模拟的】、【具有完整硬件系统功能的】、【运行在一个完全隔离环境中的】完整计算机【系统】。它是一个系统&#xff0c;一个挂在物理机上的系统&#xff0c;也就是“寄生”在别人身上的东西。可以称为“寄生兽” 物理机 物理机&#xff0c;是真实…

uni-app使用HBuilder X编辑器本地打包apk步骤说明

1.下载安装Android Studio 下载地址官方地址&#xff1a;Android Studio 下载文件归档 | Android 开发者 | Android Developers 安装Android SDK和Google USB Driver即可&#xff0c;后者主要是为了后期使用USB设置的&#xff0c;如果不需要可以不点。 2.下载uni-app提供…

适合新手自学的网络安全基础技能“蓝宝书”:《CTF那些事儿》

CTF比赛是快速提升网络安全实战技能的重要途径&#xff0c;已成为各个行业选拔网络安全人才的通用方法。但是&#xff0c;本书作者在从事CTF培训的过程中&#xff0c;发现存在几个突出的问题&#xff1a; 1&#xff09;线下CTF比赛培训中存在严重的“最后一公里”问题&#xf…

工具学习--easyexcel-3.x 使用--写入基本使用,自定义转换--动态表头以及宽设置-

写在前面&#xff1a; easyexcel是alibaba开发简单导出未excel的工具。使用的情况还是比较多的。 文章目录 依赖导入写Excel快速入门对象设置ExcelProperty设置列属性ExcelIgnore 忽视列宽、行高格式转换时间格式化数字格式化自定义格式化 合并单元格其他更加个性化需求动态表…

简单理解旁路电容和去耦电容

1、本文内容如有错误&#xff0c;欢迎交流指正。 2、本文仅作为本人学习笔记&#xff0c;部分内容来源于网络、书籍&#xff0c;如涉及侵权&#xff0c;请联系删除。 什么是旁路电容&#xff1f; 旁路电容的英文原文是Bypass capacitor&#xff0c;bypass就是绕过&#xff0c;避…

《DATASET CONDENSATION WITH GRADIENT MATCHING》

本文提出了一种用于数据效率学习的训练集合成技术&#xff0c;称为“数据集凝聚”(Dataset)&#xff0c;它学习将大数据集压缩成一个小的信息合成样本集&#xff0c;用于从头开始训练深度神经网络。我们将这个目标表述为在原始数据和合成数据上训练的深度神经网络权值的梯度之间…

sizeof与strlen区别

一、sizeof与strlen区别 sizeof是关键字&#xff0c;参数可以是各种数据&#xff08;包括函数&#xff0c;类型&#xff0c;对象&#xff0c;数组&#xff0c;指针……&#xff09;用于计算数据所占字节大小 strlen是函数&#xff0c;参数类型必须是字符型指针&#xff08;char…

检验过程管理

声明 本文是学习GB-T 42893-2023 电子商务交易产品质量监测实施指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件提供了开展电子商务交易的有形产品质量监测的总则&#xff0c;监测准备、监测实施、监测效果评价 与反馈等过程指导…

vue在el-tab中使用echart(出现canvas高宽一直是100px问题+echart随外层div变化而自适应)

问题1:canvas高宽一直是100px问题 解决方法&#xff1a;使用v-if&#xff0c;参考文献https://blog.csdn.net/qq_42527726/article/details/106147539?utm_mediumdistribute.pc_relevant.none-task-blog-2~default~baidujs_utm_term~default-0-106147539-blog-132323416.235^…

全流量安全分析的重要性

网络安全的重要性体现在以下几个方面&#xff1a; 保护数据安全&#xff1a;随着数字化时代的到来&#xff0c;企业和组织的数据变得越来越重要和敏感。网络安全能够有效保护敏感数据不被未授权的访问、篡改或泄露。通过加密、访问控制、身份认证等手段&#xff0c;网络安全确…

Redis怎么测?这篇文章写的太全了

Redis是一个高性能、内存数据库和缓存系统&#xff0c;在开发和生产环境中被广泛应用。本文将介绍如何进行有效的Redis软件测试&#xff0c;以确保其稳定性、高性能和可靠性。 Redis作为一种非关系型数据库和缓存系统&#xff0c;被广泛用于支持高流量、低延迟的应用。为了保证…