Apache DolphinScheduler介绍
Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。
Dag背景知识
摘录了一下Dag的offical定义
A graph is formed by vertices and by edges connecting pairs of vertices, where the vertices can be any kind of object that is connected in pairs by edges. In the case of a directed graph, each edge has an orientation, from one vertex to another vertex. A path in a directed graph is a sequence of edges having the property that the ending vertex of each edge in the sequence is the same as the starting vertex of the next edge in the sequence; a path forms a cycle if the starting vertex of its first edge equals the ending vertex of its last edge.
A directed acyclic graph is a directed graph that has no cycles.[1][2][3]
A vertex v of a directed graph is said to be reachable from another vertex u when there exists a path that starts at u and ends at v. As a special case, every vertex is considered to be reachable from itself (by a path with zero edges). If a vertex can reach itself via a nontrivial path (a path with one or more edges), then that path is a cycle, so another way to define directed acyclic graphs is that they are the graphs in which no vertex can reach itself via a nontrivial path.
在offical的定义中,有两对象的集合,集合中的元素是
- vertex 一个实体或者元素,可以是任何抽象的object
- edge 一条有方向直线,包含两个vertex,分别扮演起点和终点
- Dag约束
- 在Dag中,一个edge(a,b)的终点可以作为另一个edge(b,c)的起点,这个链路中所有的vertex都是可到达的, c是从a可达的。
- 在Dag中允许vertex不存在于任何一个edge中,这个节点可以从自己到达自己(一个孤岛,不和其他vertex有任何联系)
- 如果一个vertex可以从自己到达自己,但是中间经过了其他的vertex,那么这就存在一个环circle
- 在Dag中没有环
在DolphinScheduler中表示Dag的数据结构为
public class DAG<Node, NodeInfo, EdgeInfo> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* node map, key is node, value is node information
*/
private final Map<Node, NodeInfo> nodesMap;
/**
* edge map. key is node of origin;value is Map with key for destination node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
/**
* reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}
其中
- Node表示任务的id
- NodeInfo表示任务的详细信息
- EdgeInfo包含任务id和依赖任务id
数仓建设任务和任务依赖
在企业数仓建设中,普遍的做法是进行数据分层(引用https://juejin.cn/post/6969874734355841031)
在生产环境,由于分层的需要,业务逻辑分布广泛,数据存储类型多样,这就造成了数仓建设的任务多,任务之间依赖复杂,dag就成了最佳的任务依赖和调度的存储结构。在Dag结构中每个节点表示一个具体的调度任务,任务之间的连线表示依赖关系,针对Dag结构化数据的遍历过程,就是对数仓任务的执行过程。
一个简单的数仓依赖任务关系(数仓建设中会有很多任务依赖关系和更复杂的任务依赖关系)
DolphinScheduler系统角色拆分
Apache DolphinScheduler核心角色包括MasterServer和WorkerServer,这遵循模块化设计,master和worker专注于自己本身的角色和任务,模块遵循高内聚低耦合的设计,大大提高了系统的稳定性和可扩展性,同时也有利于并行开发,缩短系统的研发时间,提高系统的健壮性。
MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。
WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
DolphinScheduler任务调度流程
参考官网,DolphinScheduler核心任务任务执行流程如下
鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解。
Command分发流程
处理方式
异步,分布式master server节点。
生产者
api-server将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中 一个启动工作流实例的command样例
{
"commandType": "START_PROCESS",
"processDefinitionCode": 14285512555584,
"executorId": 1,
"commandParam": "{}",
"taskDependType": "TASK_POST",
"failureStrategy": "CONTINUE",
"warningType": "NONE",
"startTime": 1723444881372,
"processInstancePriority": "MEDIUM",
"updateTime": 1723444881372,
"workerGroup": "default",
"tenantCode": "default",
"environmentCode": -1,
"dryRun": 0,
"processInstanceId": 0,
"processDefinitionVersion": 1,
"testFlag": 0
}
消费者
master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用zk分配到自己的slot,从t_ds_command表中select属于slot的command列表处理 查询语句
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit}
</select>
MasterSchedulerBootstrap loop
轮训查到待处理的command任务,将command任务和master host生成ProcessInstance
,将ProcessInstance
对象插入到t_ds_process_instance
表中, 同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable
。 将workflowExecuteRunnable
cache到本地cache processInstanceExecCacheManager
,同时生产将ProcessInstance
的WorkflowEventType.START_WORKFLOW
生产到workflowEventQueue
队列中。
Dag遍历执行任务
Master本地cache缓冲
cache实现ProcessInstanceExecCacheManagerImpl
,提供如下核心功能
public interface ProcessInstanceExecCacheManager {
/**
* get WorkflowExecuteThread by process instance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
/**
* judge the process instance does it exist
*
* @param processInstanceId processInstanceId
* @return true - if process instance id exists in cache
*/
boolean contains(int processInstanceId);
/**
* remove cache by process instance id
*
* @param processInstanceId processInstanceId
*/
void removeByProcessInstanceId(int processInstanceId);
/**
* cache
*
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteRunnable> getAll();
void clearCache();
}
生产者
MasterSchedulerBootstrap loop
将command transform to可以运行的任务,任务对象中包含了要处理的所有上下文信息
消费者
EventExecuteService
根据dag信息,拿到第一批没有任何依赖的TaskInstance
添加到待执行任务队列standByTaskInstancePriorityQueue
中, standByTaskInstancePriorityQueue
按照优先级先后顺序执行,处理任务状态,将待执行任务提交到globalTaskDispatchWaitingQueue
队列中。
可执行任务Dispatch
Master进城内优先级队列
到了globalTaskDispatchWaitingQueue
中,已经是可执行任务的最小单元了
生产者
EventExecuteService
根据parent node
,对Dag进行广度优先遍历,提交任务到globalTaskDispatchWaitingQueue
队列中。
消费者
消费者为GlobalTaskDispatchWaitingQueueLooper
,GlobalTaskDispatchWaitingQueueLooper
消费待dispatch的任务,根据任务类型执行任务调度,对任务的调度是走的rpc接口,目前来看根据任务类型分为两种:
- MasterTaskDispatcher
- WorkerTaskDispatcher
对于WorkerTaskDispatcher
来说,rpc server
收到rpc request
之后提交任务到了workerTaskExecutorThreadPool
执行。所以这是一个异步处理任务的过程,不至于让master server
hang在这个地方。对于任务的执行进度,会在关键节点进行回调通知。
任务执行状态回调通知
Worker被dispatch任务,异步提交到线程池中之行,在任务异步执行的节点,调用rpc接口通知master任务的状态。
生产者
Worker异步执行节点,对于任务执行状态回调包括四个
- TaskExecutionStatus.FAILURE 执行抛出异常,运行失败
- TaskExecutionStatus.RUNNING_EXECUTION 开始执行
- TaskExecutionStatus.KILL 被杀死
- TaskExecutionStatus.SUCCESS 执行成功
备注:在官方的事件流程中Ack的方向搞错了,Ack不是worker通知给master,而是master通知workerer,我的这个事件状态的处理结束了。
经过校正一下,比较概括性的总结,整体的流程大致如下图
消费者
master节点ITaskInstanceExecutionEventListener
服务,服务接受rpc请求,并将任务添加到TaskEventService eventQueue
队列中。
任务状态处理
缓冲队列
master节点TaskEventService eventQueue
队列。
生产者
这个生产者可能会很多
- api-server用户行为
- master节点任务调度
- work节点任务执行
- master任务执行
消费者
为master节点的TaskInstanceListenerImpl
服务,TaskInstanceListenerImpl
将TaskEvent
transform toTaskExecuteRunnable
,并且提交到线程池执行taskExecuteThreadMap
待执行,在线程池中修改任务的执行状态。本文由 白鲸开源科技 提供发布支持!