7、Nacos服务注册服务端源码分析(六)

news2024/11/27 19:35:51

本文收录于专栏 Nacos 中 。

文章目录

  • 前言
  • 一、Nacos的任务设计中有哪些关键类?
    • 定义任务:NacosTask
    • 执行任务:NacosTaskProcessor
    • 执行引擎:NacosTaskExecuteEngine
  • 二、PushDelayTaskExecuteEngine、NacosExecuteTaskExecuteEngine
    • NacosDelayTaskExecuteEngine
    • PushDelayTaskExecuteEngine
  • 三、NamingExecuteTaskDispatcher
  • 四、PushExecuteTask
    • NacosTask
    • AbstractExecuteTask
    • PushExecuteTask
  • 总结


前言

上文我们结束了客户端注册中,涉及到的Event逻辑。我们发现事件流转之后,程序走到了一个任务执行引擎NacosDelayTaskExecuteEngine中。

继续查看后续源码之前,我们做一个回顾,梳理下NamingSubscriberServicecV2ImplServiceChangedEvent被订阅之后的流程:

  1. 当前事件被组装成PushDelayTask添加到PushDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中。
  2. NacosDelayTaskExecuteEngine中维护的延时线程池ScheduledExecutorService会定时扫描tasks,然后交由PushDelayTaskProcessor1处理。

PushDelayTaskProcessor处理流程如下:

@Override
public boolean process(NacosTask task) {
    PushDelayTask pushDelayTask = (PushDelayTask) task;
    Service service = pushDelayTask.getService();
    NamingExecuteTaskDispatcher.getInstance()
            .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
    return true;
}

从上文中我们可以看到几个关键类,PushDelayTaskExecuteEngineNacosDelayTaskExecuteEngineNamingExecuteTaskDispatcherPushExecuteTask。接下来我们逐个分析下这几个类,消化理解其中的设计和逻辑。

一、Nacos的任务设计中有哪些关键类?

首先,我们整体梳理下Nacos中任务处理的几个关键类,了解顶层代码设计之后,后续接触其不同分支实现时,也就可以做到在整体性上的认知是准确的🙆。

定义任务:NacosTask

/**
 * Nacos task.
 *
 * @author xiweng.yy
 */
public interface NacosTask {
    
    /**
     * Judge Whether this nacos task should do.
     *
     * @return true means the nacos task should be done, otherwise false
     */
    boolean shouldProcess();
}

顶层接口中只定义了一个方法shouldProcess(),用于判断当前任务是否需要被执行。

执行任务:NacosTaskProcessor

/**
 * Task processor.
 *
 * @author Nacos
 */
public interface NacosTaskProcessor {
    
    /**
     * Process task.
     *
     * @param task     task.
     * @return process task result.
     */
    boolean process(NacosTask task);
}

任务执行的顶层接口中也只有一个方法,入参是NacosTask,出参是一个代表是当前任务处理成功与否的布尔值。

执行引擎:NacosTaskExecuteEngine

public interface NacosTaskExecuteEngine<T extends NacosTask> extends Closeable {
    
    /**
     * Get Task size in execute engine.
     * 获取当前引擎中需要执行任务的数量,也就是NacosTask的数量
     * @return size of task
     */
    int size();
    
    /**
     * Whether the execute engine is empty.
     * 判断当前引擎中是否还有需要执行的NacosTask
     * @return true if the execute engine has no task to do, otherwise false
     */
    boolean isEmpty();
    
    /**
     * Add task processor {@link NacosTaskProcessor} for execute engine.
     * 添加一个任务执行类
     * @param key           key of task
     * @param taskProcessor task processor
     */
    void addProcessor(Object key, NacosTaskProcessor taskProcessor);
    
    /**
     * Remove task processor {@link NacosTaskProcessor} form execute engine for key.
     *
     * @param key key of task
     */
    void removeProcessor(Object key);
    
    /**
     * Try to get {@link NacosTaskProcessor} by key, if non-exist, will return default processor.
     *
     * @param key key of task
     * @return task processor for task key or default processor if task processor for task key non-exist
     */
    NacosTaskProcessor getProcessor(Object key);
    
    /**
     * Get all processor key.
     *
     * @return collection of processors
     */
    Collection<Object> getAllProcessorKey();
    
    /**
     * Set default task processor. If do not find task processor by task key, use this default processor to process
     * task.
     *
     * @param defaultTaskProcessor default task processor
     */
    void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor);
    
    /**
     * Add task into execute pool.
     *
     * @param key  key of task
     * @param task task
     */
    void addTask(Object key, T task);
    
    /**
     * Remove task.
     *
     * @param key key of task
     * @return nacos task
     */
    T removeTask(Object key);
    
    /**
     * Get all task keys.
     *
     * @return collection of task keys.
     */
    Collection<Object> getAllTaskKeys();
}

通过代码我们可以看出执行引擎的作用就是组织当前类型的任务<T extends NacosTask>,然后组织任务(NacosTask)任务执行者(NacosTaskProcessor) 的关联关系。

二、PushDelayTaskExecuteEngine、NacosExecuteTaskExecuteEngine

在这里插入图片描述
我们先看顶层接口NacosTaskExecuteEngine的抽象实现AbstractNacosTaskExecuteEngine,我们上边已经贴过接口NacosTaskExecuteEngine的代码,知道里其中的接口逻辑主要是面对NacosTaskNacosTaskProcessor的。那么这里我们主要看下这个抽象模版类实现的私有变量即可:

//组织NacosTask和NacosTaskProcessor的关联关系
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();

private NacosTaskProcessor defaultTaskProcessor;

它有两个私有变量,分别是ConcurrentHashMap类型的缓存,存放的是任务处理类接口NacosTaskProcessor,还有一个默认的任务处理类defaultTaskProcessor,在缓存中没有查找到对应处理类时,使用这个默认处理类去处理。

NacosDelayTaskExecuteEngine

这个类一看就是去延迟处理执行任务的引擎,我们看下代码是如何设计的:

private final ScheduledExecutorService processingExecutor;

protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    //待处理任务的缓存
    tasks = new ConcurrentHashMap<>(initCapacity);
    //一个单线程的处理任务线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    //开启线程池,间隔processInterval时间,执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

//ProcessRunnable的设计就是实现Runnable,重写run方法,处理task
private class ProcessRunnable implements Runnable {
     @Override
     public void run() {
         try {
             processTasks();
         } catch (Throwable e) {
             getEngineLog().error(e.toString(), e);
         }
     }
 }

/**
 * process tasks in execute engine.
 */
protected void processTasks() {
	//从缓存中获取所有待处理的task
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
    	//先从缓存中移除这个task,因为默认这个task接下来会被处理掉
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //从父类方法中获取需要处理当前task的任务处理类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
            	//如果任务处理失败,那么将任务重新添加到缓存中
                task.setLastProcessTime(System.currentTimeMillis());
        		addTask(key, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

总结:通过定义一个延时执行的线程池定时去扫描task缓存,执行任务

PushDelayTaskExecuteEngine

public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                  ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                  PushExecutor pushExecutor, SwitchDomain switchDomain) {
    super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
    this.clientManager = clientManager;
    this.indexesManager = indexesManager;
    this.serviceStorage = serviceStorage;
    this.metadataManager = metadataManager;
    this.pushExecutor = pushExecutor;
    this.switchDomain = switchDomain;
    //设置默认的任务处理器
    setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
}

private static class PushDelayTaskProcessor implements NacosTaskProcessor {
     
     private final PushDelayTaskExecuteEngine executeEngine;
     
     public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
         this.executeEngine = executeEngine;
     }
     
     @Override
     public boolean process(NacosTask task) {
         PushDelayTask pushDelayTask = (PushDelayTask) task;
         Service service = pushDelayTask.getService();
         NamingExecuteTaskDispatcher.getInstance()
                 .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
         return true;
     }
 }

设置了默认的任务处理器PushDelayTaskProcessor,那么看到这里我们就回到了我们前言中开头说到的环节了。

三、NamingExecuteTaskDispatcher

NamingExecuteTaskDispatcher.getInstance()很明显就是一个单例的写法了,看下dispatchAndExecuteTask()的逻辑:

private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();
private final NacosExecuteTaskExecuteEngine executeEngine;

//executeEngine是在构造方法中实例化的
private NamingExecuteTaskDispatcher() {
    executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);
}

public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
    executeEngine.addTask(dispatchTag, task);
}

我们这里又遇到一个新的执行引擎NacosExecuteTaskExecuteEngine,它有什么特点呢?

private final TaskExecuteWorker[] executeWorkers;

它只有这么一个私有变量,TaskExecuteWorker其实就是一个自定义之后的线程,内部封住了处理任务的一些逻辑,这里不做展开。

四、PushExecuteTask

我们先从PushExecuteTask这个类看起:
在这里插入图片描述

NacosTask

/**
 * Judge Whether this nacos task should do.
 *
 * @return true means the nacos task should be done, otherwise false
 */
boolean shouldProcess();

这个接口中只定义了一个方法shouldProcess(),作用是判断这个任务是否需要被处理。
那也就意味着,Nacos中不是所有的task都会被处理的,但截止到目前我们还没有遇见这种task。
在这里插入图片描述
我们可以看到NacosTask有着诸多实现,每一种都对shouldProcess()方法有着不同的实现。

AbstractExecuteTask

/**
 * Abstract task which should be executed immediately.
 */
public abstract class AbstractExecuteTask implements NacosTask, Runnable {
    protected static final long INTERVAL = 3000L;
    
    @Override
    public boolean shouldProcess() {
        return true;
    }
}

这是一个抽象类,默认这种task是需要被处理的。除此之外,这个抽象类还实现了Runnable2,那就要着重关注其子类重写的run()方法了。

PushExecuteTask

回到PushExecuteTask,其父类实现了Runnable,那我们便先看本类的run()方法入手。

@Override
public void run() {
    try {
        PushDataWrapper wrapper = generatePushData();
        ClientManager clientManager = delayTaskEngine.getClientManager();
        for (String each : getTargetClientIds()) {
            Client client = clientManager.getClient(each);
            if (null == client) {
                // means this client has disconnect
                continue;
            }
            Subscriber subscriber = client.getSubscriber(service);
            // skip if null
            if (subscriber == null) {
                continue;
            }
            delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
        }
    } catch (Exception e) {
        Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
        delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
    }
}

总结

NamingSubscriberServiceV2Impl

  1. 当前事件被组装成PushDelayTask添加到PushDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中。
  2. NacosDelayTaskExecuteEngine中维护的延时线程池 ScheduledExecutorService会定时扫描tasks,然后交由PushDelayTaskProcessor1处理。

PushDelayTaskProcessor

  1. 将当前PushDelayTask封装成PushExecuteTask
  2. 封装后的task,交由NamingExecuteTaskDispatcher做分发
  3. NamingExecuteTaskDispatcher会将任务交由执行引擎NacosExecuteTaskExecuteEngine 执行
  4. NacosExecuteTaskExecuteEngine内部封装了线程实现类TaskExecuteWorker去执行任务

我们发现,一个简单的任务执行,在分发过程中经过了很多类的处理,在梳理源码的过程中我们要学习Nacos中对事件和任务的封装,加深对低耦合的理解。

一个客户端注册事件,梳理到这里其实都是数据分发的逻辑,接下来我们马上就要看到数据处理的逻辑了。
查看上述总结,相信你也知道我们接下来摇去看哪个关键类了。


  1. NacosTaskProcessor接口是任务处理代码的顶层接口,只有一个处理NacosTask的方法。从这里可以看出Nacos中关于解耦的工作是做了良好的设计的。 ↩︎ ↩︎

  2. 抽象类实现接口有什么意义? ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1055902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python操作.xlsx文件

from openpyxl import load_workbook from openpyxl.styles import Font,colors, Alignment from openpyxl.styles import Border, Side #打开已经存在的Excel workbook load_workbook(filenameC:\\Users\\yh\\Documents\\测试.xlsx) #创建表&#xff08;sheet&#xff09;,插…

崇州街子古镇2023中秋国庆双节第四天一瞥

今天已是2023中秋国庆双节第四天&#xff0c;上午近10时许&#xff0c;笔者继昨下午又走出寄居养老成都市崇州街子古镇青城神韵小区大门&#xff0c;看看节日气氛是否仍然浓厚...... 笔者手机拍摄&#xff1a;国旗飘扬的青城神韵小区一号大门 笔者手机拍摄&#xff1a;街子古镇…

了解”变分下界“

“变分下界”&#xff1a;在变分推断中&#xff0c;我们试图找到一个近似概率分布q(x)来逼近真实的概率分布p(x)。变分下界是一种用于评估近似概率分布质量的指标&#xff0c;通常用来求解最优的近似分布。它的计算涉及到对概率分布的积分或期望的估计

Spring5应用之JDK动态代理

作者简介&#xff1a;☕️大家好&#xff0c;我是Aomsir&#xff0c;一个爱折腾的开发者&#xff01; 个人主页&#xff1a;Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客 当前专栏&#xff1a;Spring5应用专栏_Aomsir的博客-CSDN博客 文章目录 前言JDK动态代理开…

计算机竞赛 深度学习猫狗分类 - python opencv cnn

文章目录 0 前言1 课题背景2 使用CNN进行猫狗分类3 数据集处理4 神经网络的编写5 Tensorflow计算图的构建6 模型的训练和测试7 预测效果8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习猫狗分类 ** 该项目较为新颖&a…

[C++随笔录] stack queue模拟实现

stack && queue模拟实现 stack的实现stack测试用例queue的实现queue测试用例deque stack的实现 &#x1f5e8;️stack的容器适配器应该选什么比较好呢? 首先, stack的特点是 头部入, 尾部出 ⇒ 尾插 和 尾删操作比较频繁 我们前面学过的容器有 vector 和 list, vecto…

代理服务器拒绝连接

在使用电脑时&#xff0c;有些时候会出现 代服务器拒绝连接 的提示&#xff0c;在这个时候通常采用的一种解决方案是&#xff1a; 首先 点击 winR &#xff0c; 后在其中输入 inetcpl.cpl&#xff0c;点击 连接->局域网设置&#xff0c;把选择框全部清空&#xff0c;点击确…

QT:绘图

widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> //绘图事件class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent 0);~Widget();void paintEvent(QPaintEvent *event); //重写绘图事件void timerEve…

Claude一个比chat-gpt相同但使用门槛更低的ai生产力

本篇文章主要介绍Claude的官网、使用方法&#xff0c;以及Claude 的特性和与chat-gpt的区别。 日期&#xff1a;2023年6月17日 作者&#xff1a;任聪聪 Claude 的介绍及相关信息 Claude 也是一个与chat-gpt等同的nlp大语言模型&#xff0c;效果和gpt几乎差不多&#xff0c;能够…

程序在线报刊第一期

文章目录 程序在线报刊第一期排序算法&#xff1a;优化数据处理效率的核心技术回顾区块链技术&#xff1a;去中心化引领数字经济新时代展望AI未来&#xff1a;智能化时代的无限可能 程序在线报刊第一期 排序算法&#xff1a;优化数据处理效率的核心技术 近年来&#xff0c;随…

区块链(9):java区块链项目的Web服务实现之实现web服务

1 引入pom依赖 <dependency><groupId>org.eclipse.jetty</groupId><artifactId>jetty-server</artifactId><version>9.4.8.v20171121</version></dependency><dependency><groupId>org.eclipse.jetty</groupId…

Tomcat多实例、负载均衡、动静分离

Tomcat多实例部署 安装jdk [rootlocalhost ~]#systemctl stop firewalld.service [rootlocalhost ~]#setenforce 0 [rootlocalhost ~]#cd /opt [rootlocalhost opt]#ls apache-tomcat-8.5.16.tar.gz jdk-8u91-linux-x64.tar.gz rh [rootlocalhost opt]#tar xf jdk-8u91-linu…

Vivado与Notepad++关联步骤

填写内容 先看"关联步骤"再看此处&#xff1a; 在“editor”栏中填写 Notepad的路径&#xff0c;并加上[file name] -n[line number]&#xff0c; 这里我的 Notepad 的路径为 C:/Program Files (x86)/Notepad/notepad.exe &#xff1b; 故这里我就填上以下内容即可…

【人工智能 | 认知观与系统类别】从宏观角度看人工智能认知观与系统类别:探索人工智能无垠领域

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

Spirng Cloud Alibaba Nacos注册中心的使用 (环境隔离、服务分级存储模型、权重配置、临时实例与持久实例)

文章目录 一、环境隔离1. Namespace&#xff08;命名空间&#xff09;&#xff1a;2. Group&#xff08;分组&#xff09;&#xff1a;3. Services&#xff08;服务&#xff09;&#xff1a;4. DataId&#xff08;数据ID&#xff09;&#xff1a;5. 实战演示&#xff1a;5.1 默…

SpringBoot终极讲义第二章笔记

01.关于Import 和 ImportResource Import注解用法(类上): 一般和Configuration一起使用,用来导入里面Bean方法返回的对象 ImportResource(类上):一般和Configuration一起使用,用来导入某个.XML文件里的bean 个人觉得这两个注解有点鸡肋 SpringBoot启动类默认扫描的是启动类…

电子地图 | VINS-FUSION | 小觅相机D系列

目录 一、相关介绍 二、VINS-FUSION环境安装及使用 &#xff08;一&#xff09;Ubuntu18.04安装配置 1、Ubuntu下载安装 2、设置虚拟内存&#xff08;可选&#xff09; &#xff08;二&#xff09;VINS-FUSION环境配置 1、ros安装 2、ceres-solver安装 3、vins-fusion…

智慧公厕与传统公共厕所对比五大优势是什么?

随着科技的不断发展&#xff0c;智慧公厕成为城市建设的新亮点。与传统公共厕所相比&#xff0c;它具备许多独特优势和巨大的价值。本文将以智慧公厕领先厂家广州中期科技有限公司&#xff0c;大量精品案例项目实景实例实图&#xff0c;深入探讨智慧公厕的各个方面的特点&#…

PHP禁止单个用户多设备同时登陆,限制单个用户在多端重复登录

逻辑简单,主要是3点&#xff1a; 1.登录的时候写入一个最新的登录IP到user表其中一个last_login_ip字段 2.登录成功的时候,转入到index控制器或者index方法之前先进行查询&#xff1a; 1).当前IP 2).数据库字段当前用户存储的last_login_ip里面的IP 3.然后进行判断&#xff0…