引言
随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。
在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。
通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。
Master Server启动
启动流程图
Master调度工作流流程图
MasterServer启动方法
public void run() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
// self tolerant
this.zkMasterClient.start();
this.zkMasterClient.setStoppable(this);
// scheduler start
this.masterSchedulerService.start();
// start QuartzExecutors
// what system should do if exception
try {
logger.info("start Quartz server...");
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed", e);
}
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
- nettyServer会注册三种Command
- TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
- TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
- TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
- 启动调度和定时器。
- 添加ShutdownHook,关闭资源。
Master 配置文件
master.listen.port=5678
# 限制Process Instance并发调度的线程数
master.exec.threads=100
# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20
# 每一批次可以分发的任务数
master.dispatch.task.num=3
# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight
# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10
# master提交任务失败,重试次数
master.task.commit.retryTimes=5
# master提交任务失败,重试时间间隔
master.task.commit.interval=1000
# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1
# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3
Master Scheduler启动
MasterSchedulerService初始化方法
public void init(){
// masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100
// 该线程池的核心线程数和最大线程数为100
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
MasterSchedulerService启动方法
public void run() {
logger.info("master scheduler started");
while (Stopper.isRunning()){
try {
// 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度
// 如果不能调度,Sleep 1 秒种
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// 这里才是真正去执行调度的方法
scheduleProcess();
}
} catch (Exception e) {
logger.error("master scheduler thread error", e);
}
}
}
MasterSchedulerService调度方法
private void scheduleProcess() throws Exception {
InterProcessMutex mutex = null;
try {
// 阻塞式获取分布式锁
mutex = zkMasterClient.blockAcquireMutex();
// 获取线程池的活跃线程数
int activeCount = masterExecService.getActiveCount();
// make sure to scan and delete command table in one transaction
// 获取其中一个command,必须保证操作都在一个事务里
Command command = processService.findOneCommand();
if (command != null) {
logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
try{
// 获取ProcessInstance,
// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstance
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(
new MasterExecThread(
processInstance
, processService
, nettyRemotingClient
));
}
}catch (Exception e){
logger.error("scan command error ", e);
processService.moveToErrorCommand(command, e.toString());
}
} else{
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} finally{
// 释放分布式锁
zkMasterClient.releaseMutex(mutex);
}
}
ProcessService处理Command的方法
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
// 这里是去构造ProcessInstance
ProcessInstance processInstance = constructProcessInstance(command, host);
//cannot construct process instance, return null;
if(processInstance == null){
logger.error("scan command, command parameter is error: {}", command);
moveToErrorCommand(command, "process instance is null");
return null;
}
// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量
// 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instance
if(!checkThreadNum(command, validThreadNum)){
logger.info("there is not enough thread for this command: {}", command);
return setWaitingThreadProcess(command, processInstance);
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance);
delCommandByid(command.getId());
return processInstance;
}
MasterExecThread初始化方法
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
// master.properties文件里的master.task.exec.num
int masterTaskExecNum = masterConfig.getMasterExecTaskNum();
this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
masterTaskExecNum);
this.nettyRemotingClient = nettyRemotingClient;
}
MasterExecThread启动方法
public void run() {
// 省略...
try {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){
// 补数逻辑... 暂不看
executeComplementProcess();
}else{
// 执行task方法
executeProcess();
}
}catch (Exception e){
logger.error("master exec thread exception", e);
logger.error("process execute failed, process id:{}", processInstance.getId());
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setEndTime(new Date());
processService.updateProcessInstance(processInstance);
}finally {
taskExecService.shutdown();
}
}
private void executeProcess() throws Exception {
// 前置
prepareProcess();
// 执行
runProcess();
// 后置
endProcess();
}
private void runProcess(){
// 从根task开始提交
submitPostNode(null);
boolean sendTimeWarning = false;
while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){
// 省略部分代码...
// 根据cpu load avg和Memorry判断是否可以调度
if(canSubmitTaskToQueue()){
submitStandByTask();
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}
updateProcessInstanceState();
}
logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}
// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode)));
}
// if previous node success , post node submit
for(TaskInstance task : taskInstances){
if(readyToSubmitTaskQueue.contains(task)){
continue;
}
if(completeTaskList.containsKey(task.getName())){
logger.info("task {} has already run success", task.getName());
continue;
}
if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
}else{
// task添加到priorityQueue
addTaskToStandByList(task);
}
}
}
/**
* handling the list of tasks to be submitted
*/
private void submitStandByTask(){
try {
int length = readyToSubmitTaskQueue.size();
for (int i=0;i<length;i++) {
// 从队列里面取task, 提交给worker执行
TaskInstance task = readyToSubmitTaskQueue.peek();
// 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行
// 如果运行失败,或者没有执行,则不提交
DependResult dependResult = getDependResultForTask(task);
if(DependResult.SUCCESS == dependResult){
if(retryTaskIntervalOverTime(task)){
submitTaskExec(task);
removeTaskFromStandbyList(task);
}
}else if(DependResult.FAILED == dependResult){
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(task.getName(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task would not be submit
removeTaskFromStandbyList(task);
logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
}
}
} catch (Exception e) {
logger.error("submit standby task error",e);
}
}
/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
if(taskInstance.isSubProcess()){
abstractExecThread = new SubProcessTaskExecThread(taskInstance);
}else if(taskInstance.isDependTask()){
abstractExecThread = new DependentTaskExecThread(taskInstance);
}else if(taskInstance.isConditionsTask()){
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
}else {
abstractExecThread = new MasterTaskExecThread(taskInstance);
}
Future<Boolean> future = taskExecService.submit(abstractExecThread);
activeTaskNode.putIfAbsent(abstractExecThread, future);
return abstractExecThread.getTaskInstance();
}
MasterBaseTaskExecThread
MasterBaseTaskExecThread
是SubProcessTaskExecThread
,DependentTaskExecThread
,ConditionsTaskExecThread
,MasterTaskExecThread
的父类,实现Callable接口。
SubProcessTaskExecThread
任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。
DependentTaskExecThread
Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。
ConditionsTaskExecThrea
Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。
MasterTaskExecThread
将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。
MasterBaseTaskExecThread初始化方法
public MasterBaseTaskExecThread(TaskInstance taskInstance){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
initTaskParams();
}
MasterBaseTaskExecThread执行方法
@Override
public Boolean call() throws Exception {
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
return submitWaitComplete(); // 由各子类实现
}
MasterBaseTaskExecThread公共方法
submit()
protected TaskInstance submit(){
// 提交任务重试次数. master.task.commit.retryTimes=5
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
// 提交任务失败,重试间隔时间 master.task.commit.interval=1000
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1;
boolean submitDB = false;
boolean submitTask = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes){
try {
if(!submitDB){
// 持久化TaskInstance到数据库
task = processService.submitTask(taskInstance);
if(task != null && task.getId() != 0){
submitDB = true;
}
}
if(submitDB && !submitTask){
// 分发任务到Woroker执行
submitTask = dispatchTask(task);
}
if(submitDB && submitTask){
return task;
}
if(!submitDB){
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
}else if(!submitTask){
logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and dispatcht task failed",e);
}
retryTimes += 1;
}
return task;
}
dispatchTask(TaskInstance task)
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
// 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行
if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
|| taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){
logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
logger.info("task ready to submit: {}", taskInstance);
/**
* taskPriority
*/
TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
// 放入TaskPriorityQueue中,
// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return true;
}catch (Exception e){
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}
MasterTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {
Boolean result = false;
// 提交任务
this.taskInstance = submit();
if(this.taskInstance == null){
logger.error("submit task instance to mysql and queue failed , please check and fix it");
return result;
}
if(!this.taskInstance.getState().typeIsFinished()) {
// 等待任务执行结果
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
}
waitTaskQuit()
public Boolean waitTaskQuit(){
// query new state
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
while (Stopper.isRunning()){
try {
if(this.processInstance == null){
logger.error("process instance not exists , master task exec thread exit");
return true;
}
// task instance add queue , waiting worker to kill
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();
}
if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
// taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl
// taskInstance在触发ack和response Command会被添加到taskInstanceCache里
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break;
}
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
}
// updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
SubProcessTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {
Boolean result = false;
try{
// submit task instance
this.taskInstance = submit();
if(taskInstance == null){
logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");
return result;
}
setTaskInstanceState();
waitTaskQuit();
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
// at the end of the subflow , the task state is changed to the subflow state
if(subProcessInstance != null){
if(subProcessInstance.getState() == ExecutionStatus.STOP){
this.taskInstance.setState(ExecutionStatus.KILL);
}else{
this.taskInstance.setState(subProcessInstance.getState());
}
}
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
result = true;
}catch (Exception e){
logger.error("exception: ",e);
if (null != taskInstance) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
return result;
}
waitTaskQuit()
private void waitTaskQuit() throws InterruptedException {
logger.info("wait sub work flow: {} complete", this.taskInstance.getName());
if (taskInstance.getState().typeIsFinished()) {
logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",
this.taskInstance.getName(),
this.taskInstance.getState(),
this.processInstance.getState());
return;
}
while (Stopper.isRunning()) {
// waiting for subflow process instance establishment
if (subProcessInstance == null) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if(!setTaskInstanceState()){
continue;
}
}
subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
updateParentProcessState();
if (subProcessInstance.getState().typeIsFinished()){
break;
}
if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){
// parent process "ready to pause" , child process "pause"
pauseSubProcess();
}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
// parent Process "Ready to Cancel" , subflow "Cancel"
stopSubProcess();
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
ConditionsTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {
try{
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
// 等待判断
waitTaskQuit();
// 更新最终依赖结果
updateTaskState();
}catch (Exception e){
logger.error("conditions task run exception" , e);
}
return true;
}
waitTaskQuit
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
);
for(TaskInstance task : taskInstances){
completeTaskList.putIfAbsent(task.getName(), task.getState());
}
// 获取所有依赖结果
List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){
List<DependResult> itemDependResult = new ArrayList<>();
for(DependentItem item : dependentTaskModel.getDependItemList()){
itemDependResult.add(getDependResultForItem(item));
}
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
// 根据逻辑运算符,合并依赖结果
conditionResult = DependentUtils.getDependResultForRelation(
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", conditionResult);
}
DependentTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {
try{
logger.info("dependent task start");
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
initDependParameters();
waitTaskQuit();
updateTaskState();
}catch (Exception e){
logger.error("dependent task run exception" , e);
}
return true;
}
waitTaskQuit()
private Boolean waitTaskQuit() {
logger.info("wait depend task : {} complete", this.taskInstance.getName());
if (taskInstance.getState().typeIsFinished()) {
logger.info("task {} already complete. task state:{}",
this.taskInstance.getName(),
this.taskInstance.getState());
return true;
}
while (Stopper.isRunning()) {
try{
if(this.processInstance == null){
logger.error("process instance not exists , master task exec thread exit");
return true;
}
// 省略部分代码
// allDependentTaskFinish()等待所有依赖任务执行结束
if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){
break;
}
// update process task
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
TaskPriorityQueueConsumer
@Override
public void run() {
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
while (Stopper.isRunning()){
try {
// 每一批次分发任务数量,master.dispatch.task.num = 3
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
failedDispatchTasks.clear();
for(int i = 0; i < fetchTaskNum; i++){
if(taskPriorityQueue.size() <= 0){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// if not task , blocking here
// 从队列里面获取task
TaskPriority taskPriority = taskPriorityQueue.take();
// 分发给worker执行
boolean dispatchResult = dispatch(taskPriority);
if(!dispatchResult){
failedDispatchTasks.add(taskPriority);
}
}
if (!failedDispatchTasks.isEmpty()) {
// 分发失败的任务,需要重新加入队列中,等待重新分发
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If there are tasks in a cycle that cannot find the worker group,
// sleep for 1 second
if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}catch (Exception e){
logger.error("dispatcher task error",e);
}
}
}
/**
* dispatch task
*
* @param taskPriority taskPriority
* @return result
*/
protected boolean dispatch(TaskPriority taskPriority) {
boolean result = false;
try {
int taskInstanceId = taskPriority.getTaskId();
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
if (taskInstanceIsFinalState(taskInstanceId)){
// when task finish, ignore this task, there is no need to dispatch anymore
return true;
}else{
// 分发任务
// 分发算法支持:低负载优先算法,随机算法, 轮询算法。
result = dispatcher.dispatch(executionContext);
}
} catch (ExecuteException e) {
logger.error("dispatch error: {}",e.getMessage());
}
return result;
}
通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。
如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。
本文完!
本文由 白鲸开源科技 提供发布支持!