深入解析 DolphinScheduler 任务调度、拆分与执行全流程

news2025/1/9 0:59:41

Apache DolphinScheduler介绍

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

Dag背景知识

摘录了一下Dag的offical定义

A graph is formed by vertices and by edges connecting pairs of vertices, where the vertices can be any kind of object that is connected in pairs by edges. In the case of a directed graph, each edge has an orientation, from one vertex to another vertex. A path in a directed graph is a sequence of edges having the property that the ending vertex of each edge in the sequence is the same as the starting vertex of the next edge in the sequence; a path forms a cycle if the starting vertex of its first edge equals the ending vertex of its last edge.

A directed acyclic graph is a directed graph that has no cycles.[1][2][3]

A vertex v of a directed graph is said to be reachable from another vertex u when there exists a path that starts at u and ends at v. As a special case, every vertex is considered to be reachable from itself (by a path with zero edges). If a vertex can reach itself via a nontrivial path (a path with one or more edges), then that path is a cycle, so another way to define directed acyclic graphs is that they are the graphs in which no vertex can reach itself via a nontrivial path.

在offical的定义中,有两对象的集合,集合中的元素是

  1. vertex 一个实体或者元素,可以是任何抽象的object
  2. edge 一条有方向直线,包含两个vertex,分别扮演起点和终点
  • Dag约束
  1. 在Dag中,一个edge(a,b)的终点可以作为另一个edge(b,c)的起点,这个链路中所有的vertex都是可到达的, c是从a可达的。
  2. 在Dag中允许vertex不存在于任何一个edge中,这个节点可以从自己到达自己(一个孤岛,不和其他vertex有任何联系)
  3. 如果一个vertex可以从自己到达自己,但是中间经过了其他的vertex,那么这就存在一个环circle
  4. 在Dag中没有环

在DolphinScheduler中表示Dag的数据结构为

public class DAG<Node, NodeInfo, EdgeInfo> {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * node map, key is node, value is node information
     */
    private final Map<Node, NodeInfo> nodesMap;

    /**
     * edge map. key is node of origin;value is Map with key for destination node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

    /**
     * reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}

其中

  • Node表示任务的id
  • NodeInfo表示任务的详细信息
  • EdgeInfo包含任务id和依赖任务id

数仓建设任务和任务依赖

在企业数仓建设中,普遍的做法是进行数据分层(引用https://juejin.cn/post/6969874734355841031)

file

在生产环境,由于分层的需要,业务逻辑分布广泛,数据存储类型多样,这就造成了数仓建设的任务多,任务之间依赖复杂,dag就成了最佳的任务依赖和调度的存储结构。在Dag结构中每个节点表示一个具体的调度任务,任务之间的连线表示依赖关系,针对Dag结构化数据的遍历过程,就是对数仓任务的执行过程。

一个简单的数仓依赖任务关系(数仓建设中会有很多任务依赖关系和更复杂的任务依赖关系)

file

DolphinScheduler系统角色拆分

Apache DolphinScheduler核心角色包括MasterServer和WorkerServer,这遵循模块化设计,master和worker专注于自己本身的角色和任务,模块遵循高内聚低耦合的设计,大大提高了系统的稳定性和可扩展性,同时也有利于并行开发,缩短系统的研发时间,提高系统的健壮性。

MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。

WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

DolphinScheduler任务调度流程

参考官网,DolphinScheduler核心任务任务执行流程如下 file

鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解。

file

Command分发流程

处理方式

异步,分布式master server节点。

生产者

api-server将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中 一个启动工作流实例的command样例

{
    "commandType": "START_PROCESS",
    "processDefinitionCode": 14285512555584,
    "executorId": 1,
    "commandParam": "{}",
    "taskDependType": "TASK_POST",
    "failureStrategy": "CONTINUE",
    "warningType": "NONE",
    "startTime": 1723444881372,
    "processInstancePriority": "MEDIUM",
    "updateTime": 1723444881372,
    "workerGroup": "default",
    "tenantCode": "default",
    "environmentCode": -1,
    "dryRun": 0,
    "processInstanceId": 0,
    "processDefinitionVersion": 1,
    "testFlag": 0
}
消费者

master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用zk分配到自己的slot,从t_ds_command表中select属于slot的command列表处理 查询语句

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}
</select>

MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成ProcessInstance,将ProcessInstance对象插入到t_ds_process_instance表中, 同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable。 将workflowExecuteRunnablecache到本地cache processInstanceExecCacheManager,同时生产将ProcessInstanceWorkflowEventType.START_WORKFLOW生产到workflowEventQueue队列中。

Dag遍历执行任务

Master本地cache缓冲

cache实现ProcessInstanceExecCacheManagerImpl,提供如下核心功能

public interface ProcessInstanceExecCacheManager {

    /**
     * get WorkflowExecuteThread by process instance id
     *
     * @param processInstanceId processInstanceId
     * @return WorkflowExecuteThread
     */
    WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);

    /**
     * judge the process instance does it exist
     *
     * @param processInstanceId processInstanceId
     * @return true - if process instance id exists in cache
     */
    boolean contains(int processInstanceId);

    /**
     * remove cache by process instance id
     *
     * @param processInstanceId processInstanceId
     */
    void removeByProcessInstanceId(int processInstanceId);

    /**
     * cache
     *
     * @param processInstanceId     processInstanceId
     * @param workflowExecuteThread if it is null, will not be cached
     */
    void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);

    /**
     * get all WorkflowExecuteThread from cache
     *
     * @return all WorkflowExecuteThread in cache
     */
    Collection<WorkflowExecuteRunnable> getAll();

    void clearCache();
}
生产者

MasterSchedulerBootstrap loop将command transform to可以运行的任务,任务对象中包含了要处理的所有上下文信息

消费者

EventExecuteService根据dag信息,拿到第一批没有任何依赖的TaskInstance添加到待执行任务队列standByTaskInstancePriorityQueue中, standByTaskInstancePriorityQueue按照优先级先后顺序执行,处理任务状态,将待执行任务提交到globalTaskDispatchWaitingQueue队列中。

可执行任务Dispatch

Master进城内优先级队列

到了globalTaskDispatchWaitingQueue中,已经是可执行任务的最小单元了

生产者

EventExecuteService根据parent node,对Dag进行广度优先遍历,提交任务到globalTaskDispatchWaitingQueue队列中。

消费者

消费者为GlobalTaskDispatchWaitingQueueLooperGlobalTaskDispatchWaitingQueueLooper消费待dispatch的任务,根据任务类型执行任务调度,对任务的调度是走的rpc接口,目前来看根据任务类型分为两种:

  1. MasterTaskDispatcher
  2. WorkerTaskDispatcher

对于WorkerTaskDispatcher来说,rpc server收到rpc request之后提交任务到了workerTaskExecutorThreadPool执行。所以这是一个异步处理任务的过程,不至于让master server hang在这个地方。对于任务的执行进度,会在关键节点进行回调通知。

任务执行状态回调通知

Worker被dispatch任务,异步提交到线程池中之行,在任务异步执行的节点,调用rpc接口通知master任务的状态。

生产者

Worker异步执行节点,对于任务执行状态回调包括四个

  1. TaskExecutionStatus.FAILURE 执行抛出异常,运行失败
  2. TaskExecutionStatus.RUNNING_EXECUTION 开始执行
  3. TaskExecutionStatus.KILL 被杀死
  4. TaskExecutionStatus.SUCCESS 执行成功

备注:在官方的事件流程中Ack的方向搞错了,Ack不是worker通知给master,而是master通知workerer,我的这个事件状态的处理结束了。

经过校正一下,比较概括性的总结,整体的流程大致如下图

file

消费者

master节点ITaskInstanceExecutionEventListener服务,服务接受rpc请求,并将任务添加到TaskEventService eventQueue队列中。

任务状态处理

缓冲队列

master节点TaskEventService eventQueue队列。

生产者

这个生产者可能会很多

  1. api-server用户行为
  2. master节点任务调度
  3. work节点任务执行
  4. master任务执行
    消费者
    为master节点的TaskInstanceListenerImpl服务,TaskInstanceListenerImplTaskEvent transform to TaskExecuteRunnable,并且提交到线程池执行taskExecuteThreadMap待执行,在线程池中修改任务的执行状态。

    本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2202410.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

宠物空气净化器该怎么选?希喂,小米、安德迈这三款好用吗?

不得不说&#xff0c;虽然现在购物网站的活动不少&#xff0c;可力度都好弱啊&#xff01;我想买宠物空气净化器很久了&#xff0c;觉得有点贵&#xff0c;一直没舍得入手。价格一直没变化&#xff0c;平台小活动根本没什么优惠&#xff0c;只能寄希望于双十一了&#xff0c;准…

开源项目|“智慧光伏”开源项目技术文档

【基于ELF 1开发板完成的“智慧光伏”项目】 “智慧光伏”项目能够智能追踪阳光的移动轨迹。通过内置的光敏传感器和智能控制系统&#xff0c;实时感知周围光源的变化&#xff0c;并驱动太阳能板精准调整角度&#xff0c;确保每一缕阳光都能被最大化地捕捉和利用。下面就和各位…

爬虫常用正则表达式用法

在网页爬虫中&#xff0c;正则表达式&#xff08;regex&#xff09;是一种非常有用的工具&#xff0c;用于从 HTML、JSON 或其他文本格式中提取特定的数据。下面是一些常见的正则表达式及其在爬虫中的应用场景&#xff1a;

如何利用phpstudy创建mysql数据库

phpStudy诞生于2007年&#xff0c;是一款老牌知名的PHP开发集成环境工具&#xff0c;产品历经多次迭代升级&#xff0c;目前有phpStudy经典版、phpStudy V8&#xff08;2019版&#xff09;等等&#xff0c;利用phpstudy可以快速搭建一个mysql环境&#xff0c;接下来我们就开始吧…

Html 标题加图标

每个网页选项卡都有一个图标&#xff1a; <meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>主页</title><link rel"icon" href"images/记事本.png&…

linux系统不同用户登录vnc

说明&#xff1a;安装oracle时需要用oracle用户登录&#xff0c;配置vnc使用oracle用户登录 1.修改/etc/sysconfig/vncservers&#xff0c;添加oracle用户 VNCSERVERS“1:root 2:oracle” VNCSERVERARGS[1]“-geometry 1024x768” VNCSERVERARGS[2]“-geometry 1024x768” VN…

Scrapy:简单使用、xpath语法

简单使用 简介 Scrapy 是一个为了爬取网站信息&#xff0c;提取结构性数据而编写的应用框架&#xff0c;可以用于数据挖掘、检测和自动化测试。 架构 组件 引擎&#xff0c;指挥其他组件协同工作调度器 &#xff0c;接收引擎发过来的请求&#xff0c;按照先后顺序&#xff0…

APP的命令和monkey压力测试

一、命令的使用&#xff1a; 1.dos下链接&#xff1a;adb connect 127.0.0.1:62001 2.所附设备清单&#xff1a;adb devices device:已识别的设备&#xff0c;表示连接成功 unauthorized:没有授权需要手机授权才能连接 unkown:未识别设备 offline:离线设备 3.版本&#xff1a;…

基于LORA的一主多从监测系统_BMP280气压传感器

关联&#xff1a;LORA 、HAL、气压传感器 这个传感器也是比较常见的&#xff0c;但灵敏度什么的都没啥问题&#xff0c;不过气压传感器不是很好去观察这个变化&#xff0c;毕竟一个地方的大气压基本不会有太大波动&#xff0c;我们可以在百度搜索所在地的平均大气压&#xff0…

bladex漏洞思路总结

Springblade框架介绍&#xff1a; SpringBlade是一个基于Spring Boot和Spring Cloud的微服务架构框架&#xff0c;它是由商业级项目升级优化而来的综合型项目。 0x1 前言 最近跟一些大佬学习了blade的漏洞&#xff0c;所以自己总结了一下&#xff0c;在渗透测试过程中&#x…

frp+windows+宝塔+域名使用

先讲一下frp的相关概念 1. 什么是 FRP&#xff1f; FRP (Fast Reverse Proxy) 是一款开源的反向代理工具&#xff0c;支持多种协议&#xff08;如 TCP、UDP、HTTP 和 HTTPS&#xff09;的内网穿透。它能够帮助你将位于内网或防火墙后面的服务映射到外网&#xff0c;方便外网用…

产品经理想转行做大模型AI产品经理,建议看看这篇文章!(文末福利)

如果你想转行做大模型&#xff0c;作为一名AI产品经理&#xff0c;你可以怎么做呢&#xff1f;或许&#xff0c;你可以先进行自我检测&#xff0c;看看自己是否真的适合转行做大模型。这篇文章里&#xff0c;作者便给想转行做大模型的AI产品经理们提出了一些建议&#xff0c;不…

【进阶版】如何实现一个基于 HTML+CSS+JS 的任务进度条

创建一个动态任务进度条&#xff1a;进阶版实现 在现代网页开发中&#xff0c;任务进度条是用户交互中非常重要的组件&#xff0c;它能够直观地展示任务的进展情况。本文将向你展示如何使用 HTML、CSS 和 JavaScript 创建一个动态的任务进度条。在这个进阶版本中&#xff0c;用…

三网话费折扣api怎么对接合作?如何变现?

三网优惠话费充值接口对接的操作步骤通常包括以下几个阶段&#xff1a; 注册账号&#xff1a;在API服务提供商的控制台注册账号&#xff0c;例如微客云控制台 。添加店铺&#xff1a;在控制台中添加店铺&#xff0c;并保存店铺ID或只勾选CMS 。申请API密钥&#xff1a;联系客服…

临时提高电压的常用方法电荷泵、自举升压电路

一、临时提高电压 爽死*子了&#xff0c;*子也硬玩一把临时重组器……&#xff08;雷总音&#xff09;。临时提高星级或者电压真的是一件很爽的事情&#xff0c;小钱办大事轻松拿捏。常用的临时提高电压的方式主要有两种&#xff0c;电荷泵和自举升压电路&#xff0c;你的某米手…

Nvidia在AI峰会上发布了七项重大技术公告

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

文档大师:打造一站式 Word 报告解决方案

最新技术资源&#xff08;建议收藏&#xff09; https://www.grapecity.com.cn/resources/ 前言 在政府、医院、银行、财务以及销售等领域&#xff0c;常常需要创建各种报告文件来展开工作汇报&#xff0c;譬如季度销售报告、年度总结报告、体检报告和保险合同等。在没有报表工…

一个月学会Java 第7天 字符串与键盘输入

Day7 字符串与键盘输入 字符串作为所有程序都很重要的东西&#xff0c;这个东西必须好好的学习&#xff0c;在Java中String会比较好学习&#xff0c;比起C和C里面会更加的简单&#xff0c; 在Java之中也是很好理解的&#xff0c;因为底层已经封装好了char数组&#xff0c;也就是…

《深入浅出LLM基础篇》(五):Propmt工程优化

&#x1f389;AI学习星球推荐&#xff1a; GoAI的学习社区 知识星球是一个致力于提供《机器学习 | 深度学习 | CV | NLP | 大模型 | 多模态 | AIGC 》各个最新AI方向综述、论文等成体系的学习资料&#xff0c;配有全面而有深度的专栏内容&#xff0c;包括不限于 前沿论文解读、…

【C++】AVL树(AVLTree)

目录 一、AVL树概念&#xff1a; 二、定义&#xff1a; 三、AVL树的插入&#xff1a; 四、AVL树的旋转&#xff1a; 1、左单旋&#xff1a; 2、右单旋&#xff1a; 3、右左双旋&#xff1a; 4、左右双旋&#xff1a; 五、AVL树的检验&#xff1a; 一、AVL树概念&#x…