海豚调度调优 | 正在运行的工作流(DAG)如何重新拉起失败的任务(Task)

news2024/11/24 4:46:58

💡  本系列文章是DolphinScheduler由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。

*祝开卷有益。 *

本系列教程基于 DolphinScheduler 2.0.5 做的优化。(稳定版推荐使用3.1.9

file

先抛出问题

1.场景描述

工作流 A 正在运行,里面有很多节点,依赖关系比较复杂。凌晨用户接到报警,a 节点失败了,此时其他分支的任务还在运行。此时工作流是不会是失败的,需要等待其他分支的任务运行结束,整个工作流才会失败。

失败的任务:是失败状态且重试次数用完了。

2.目前的处理流程

目前的做法是,先 kill 工作流,等待工作流变成失败状态,然后再点击恢复失败按钮,即可把失败的节点重新拉起来。

画外音:kill 工作流我也做了优化,后面会有文章介绍,kill之后,工作流会变成失败状态,这样做是为了可以恢复失败。

3.困惑

这种做法会影响正在运行的任务,强制把正在跑的任务 kill 掉,对一些运行时间比较久的任务来说,会降低执行效率。跑的好好的,被干掉了,恢复失败,又得重新跑。

这非常不划算。

优化建议:如何在不停止工作流的情况下,单独把失败的节点重新拉起来呢?

解决方案

后端优化:

分析了工作流启动、停止、恢复失败等操作类型 Master 和 Worker 的原理,打算新增一个操作类型、命令类型枚举值:RUN_FAILED_ONLY。

优化后的大致流程如下:

  1. 用户在页面上点击按钮,提交 executeType  = RUN_FAILED_ONLY、processInstanceId=xxxx的请求。

  2. API服务收到请求,判断是 RUN_FAILED_ONLY 操作,就封装一个 StateEventChangeCommand 命令,进行RPC请求。

  3. Master服务的 StateEventProcessor 监听到命令,提交给 StateEventResponseService ,它负责找到对应的工作流 WorkflowExecuteThread ,然后把这个stateEvent 给这个 WorkflowExecuteThread.

  4. WorkflowExecuteThread处理 stateEvent,判断这个 stateEvent 的 StateEventType  是 RUN_FAILED_ONLY_EVENT ,进行下面的处理:

找到改工作流失败且重试次数用完的任务列表,然后依次处理它们的执行记录(标记为失效,从失败列表移除,添加到待提交队列),最后提交到等待队列。

后端流程结束。

前端页面优化:

比较简单:新增一个按钮,文案是【重新失败节点】,在工作流列表上展示,用户可以点击。

源码

其中,新增了三个枚举类的值: 

org.apache.dolphinscheduler.api.enums.ExecuteType 

RUN_FAILED_ONLY

org.apache.dolphinscheduler.common.enums.CommandType

RUN_FAILED_ONLY(44, "run failed only");

org.apache.dolphinscheduler.common.enums.StateEventType

RUN_FAILED_ONLY_EVENT(4, "run failed only event");

几个关键步骤的代码,为了方便查看,上下文的代码、涉及改动的方法也会贴出来。

提示:序号对应上面的流程。

②  org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl#execute

switch (executeType) {
    case REPEAT_RUNNING:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
        break;
    case RECOVER_SUSPENDED_PROCESS:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
        break;
    // 新增 9-11 行代码
    case RUN_FAILED_ONLY:
        result = sendRunFailedOnlyMsg(processInstance, CommandType.RUN_FAILED_ONLY);
        break;
    case START_FAILURE_TASK_PROCESS:
        result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
        break;
    case STOP:
        if (processInstance.getState() == ExecutionStatus.READY_STOP) {
            putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
        } else {
            result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
        }
        break;
    case PAUSE:
        if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
            putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
        } else {
            result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
        }
        break;
    default:
        logger.error("unknown execute type : {}", executeType);
        putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");

        break;
}

sendRunFailedOnlyMsg 方法的逻辑,封装 stateEventChangeCommand,提交 RPC 请求。

 /**
     * send msg to master, run failed only
     *
     * @param processInstance process instance
     * @param commandType command type
     * @return update result
     */
    private Map<String, Object> sendRunFailedOnlyMsg(ProcessInstance processInstance, CommandType commandType) {
        Map<String, Object> result = new HashMap<>();
        String host = processInstance.getHost();
        String address = host.split(":")[0];
        int port = Integer.parseInt(host.split(":")[1]);
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0,StateEventType.RUN_FAILED_ONLY_EVENT
        );
        stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
        putMsg(result, Status.SUCCESS);
        return result;
    }

这里也给 StateEventChangeCommand 家了一个状态类型的字段,对应枚举类:StateEventType

方便下游判断状态。

查看下面 ③ 处的代码,用这个状态赋值给 stateEvent 的 type。

③ org.apache.dolphinscheduler.server.master.processor.StateEventProcessor#process 用上面 ② 处的 StateEventType,赋值给 stateEvent 的 type,然后提交给 stateEventResponseService 的 BlockingQueueeventQueue 队列。

@Override
    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));

        StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class);
        StateEvent stateEvent = new StateEvent();
        stateEvent.setKey(stateEventChangeCommand.getKey());
        if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) {
            stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        } else {
            stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus());
        }
        stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
        stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
        // TODO 修改
        StateEventType stateEventType = stateEventChangeCommand.getStateEventType();
        if (stateEventType != null){
            stateEvent.setType(stateEventType);
        }else {
            StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
            stateEvent.setType(type);
        }

        logger.info("received command : {}", stateEvent);
        stateEventResponseService.addResponse(stateEvent);
    }

StateEventResponseWorker 线程一直扫描这个队列,拿到 stateEvent,找到要处理的工作流对应的 WorkflowExecuteThread 线程,把这个事件提交给 WorkflowExecuteThread 线程。

 /**
 * task worker thread
 */
class StateEventResponseWorker extends Thread {

    @Override
    public void run() {

        while (Stopper.isRunning()) {
            try {
                // if not task , blocking here
                StateEvent stateEvent = eventQueue.take();
                persist(stateEvent);
            } catch (InterruptedException e) {
                logger.warn("persist task error", e);
                Thread.currentThread().interrupt();
                break;
            }
        }
        logger.info("StateEventResponseWorker stopped");
    }
}

private void persist(StateEvent stateEvent) {
    try {
        if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) {
            writeResponse(stateEvent, ExecutionStatus.FAILURE);
            return;
        }

        WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId());
        workflowExecuteThread.addStateEvent(stateEvent);
        writeResponse(stateEvent, ExecutionStatus.SUCCESS);
    } catch (Exception e) {
        logger.error("persist event queue error, event: {}", stateEvent, e);
    }
}

④WorkflowExecuteThread 内部循环扫描事件列表。

private void handleEvents() {
    while (!this.stateEvents.isEmpty()) {

        try {
            StateEvent stateEvent = this.stateEvents.peek();
            if (stateEventHandler(stateEvent)) {
                this.stateEvents.remove(stateEvent);
            }
        } catch (Exception e) {
            logger.error("state handle error:", e);

        }
    }
}

stateEventHandler 处理 RUN_FAILED_ONLY_EVENT 类型的事件,处理方法是:runFailedHandler

private boolean stateEventHandler(StateEvent stateEvent) {
        logger.info("process event: {}", stateEvent.toString());

        if (!checkStateEvent(stateEvent)) {
            return false;
        }
        boolean result = false;
        switch (stateEvent.getType()) {
            case RUN_FAILED_ONLY_EVENT:
                result = runFailedHandler(stateEvent);
                break;
            case PROCESS_STATE_CHANGE:
                result = processStateChangeHandler(stateEvent);
                break;
            case TASK_STATE_CHANGE:
                result = taskStateChangeHandler(stateEvent);
                break;
            case PROCESS_TIMEOUT:
                result = processTimeout();
                break;
            case TASK_TIMEOUT:
                result = taskTimeout(stateEvent);
                break;
            default:
                break;
        }

        if (result) {
            this.stateEvents.remove(stateEvent);
        }
        return result;
    }

runFailedHandler 的内部逻辑如下:找到改工作流失败且重试次数用完的任务列表,然后依次处理它们的执行记录(标记为失效,从失败列表移除,添加到待提交队列),最后提交到等待队列。

private boolean runFailedHandler(StateEvent stateEvent) {
    try {
        logger.info("process:{} will do {}", processInstance.getId(), stateEvent.getExecutionStatus());
        // find failed tasks with max retry times and init these tasks
        List<Integer> failedList = processService.queryTaskByProcessIdAndStateWithMaxRetry(processInstance.getId(), ExecutionStatus.FAILURE);
        logger.info("run failed task size is : {}", failedList.size());
        for (Integer taskId : failedList) {
            logger.info("run failed task id is : {}", taskId);
            TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
            taskInstance.setFlag(Flag.NO);
            // remove it from errorTaskList
            errorTaskList.remove(Long.toString(taskInstance.getTaskCode()));
            processService.updateTaskInstance(taskInstance);
            // submit current task nodes
            if (readyToSubmitTaskQueue.contains(taskInstance)) {
                continue;
            }
            logger.info("run failed task ,submit current task nodes : {}", taskInstance.toString());
            addTaskToStandByList(taskInstance);
        }
        submitStandByTask();
//            updateProcessInstanceState();
    } catch (Exception e) {
        logger.error("process only run failed task error:", e);
    }
    return true;
}

最终效果

再次回到文章开头的场景:

工作流 A 正在运行,里面有很多节点,依赖关系比较复杂。凌晨用户接到报警,a 节点失败了,此时其他分支的任务还在运行。

此时用户可以直接点击【重新拉起失败任务】按钮,失败的任务就会重新进入等待队列,后续流程就像任务正常运行一样,也会继续拉起下游任务。

画外音:本次优化简化了失败任务运维的复杂度,提高了效率。

作者从1.x开始使用海豚调度,那是还叫做 Easy Scheduler,是一个忠实用户,我们基于 2.x版本做了很多内部的改造,后续会分享出来,同样社区也推荐大家使用3.1.9版本,这是相对比较稳定的版本。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1846294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp 打包 H5 实现在 uniapp 打包 APP 的 webview 通信

一、前言 遇到 uniapp 打包的 APP 在 webview 内嵌入 uniapp 打包的 H5 页面的需求&#xff0c;并实现通信。本篇主要总结了如何实现并总结遇到的问题&#xff0c;希望可以帮助大家减少负担。 实现需求主要有三个地方需要处理&#xff1a; index.html 的打包配置导入 uni.we…

SVN学习(003 svn安装和配置subversive)

尚硅谷SVN高级教程(svn操作详解) 总时长 4:53:00 共72P 此文章包含第30p-第p41的内容 介绍 安装 字符集记得先改成utf-8 这里无svn的插件 打开应用插件市场&#xff08;需要有网络&#xff09; 重启 svn文件夹 安装svn连接器 这两个用哪个都行 svn版本号是1.6 所以…

代码随想录算法训练营第六十六天 |101.孤岛的总面积、102.沉没孤岛、103.水流问题、104.建造最大岛屿

101.孤岛的总面积 文字讲解&#xff1a;101. 孤岛的总面积 | 代码随想录 解题思路 本题要求找到不靠边的陆地面积&#xff0c;那么我们只要从周边找到陆地然后 通过 dfs或者bfs 将周边靠陆地且相邻的陆地都变成海洋&#xff0c;然后再去重新遍历地图 统计此时还剩下的陆地就…

360°环绕拍摄图片的作用 欧保图 ORBITVU

360环绕拍摄图片在电商中的应用及其优势 ORBITVU 波兰 欧保图 360环绕拍摄图片是一种动态的产品展示方式&#xff0c;它允许消费者从不同角度全方位地查看产品&#xff0c;这种交互式的体验大大增强了用户的购物体验。以下是360环绕拍摄图片在电商中的一些具体应用及其带来的优…

FreeRTOS学习 -- 时间管理

在使用 FreeRTOS 的过程中通常会在一个任务函数中使用延时函数对这个任务延时&#xff0c;当执行延时函数的时候会进行任务切换&#xff0c;并且此任务就会进入阻塞态&#xff0c;直到延时完成&#xff0c;任务重新进入就绪态。 FreeRTOS 延时函数 1、函数 vTaskDelay() 在F…

Ubuntu下载QT5.8安装包-bestswinger课程

最近在看UP的QT开发课&#xff0c;真的找了巨久这个安装包&#xff0c;谁都不想在安装上花太多时间。。出一版小小教程吧&#xff5e; 首先打开qt download官网&#xff0c;5.8好像在镜像网站上没有看到&#xff0c;所以我最后还是老老实实官网了&#xff0c;而且5.8会小一点 …

90%网络工程师还是搞不清三层交换机是啥?

6月29日&#xff0c;HCIA新开班啦什么是三层交换机&#xff1f; 首先&#xff0c;我们先来解释一下三层交换机是什么。简单来说&#xff0c;它就是网络中的一个小老板&#xff0c;负责管理数据的流动。与普通的网络设备不同&#xff0c;三层交换机不仅能认识「MAC地址」&#…

小程序使用接口wx.getLocation配置

开通时需详细描述业务&#xff0c;否则可能审核不通过 可能需要绑定腾讯位置服务&#xff0c;新建应该&#xff0c;绑定到小程序 配置 权限声明&#xff1a;在使用wx.getLocation前&#xff0c;需要在app.json的permission字段中声明对用户位置信息的使用权限&#xff0c;并提…

可编程非线性RCD负载原理与应用

可编程非线性RCD负载&#xff08;Resistor-Capacitor-Diode&#xff09;是一种电子元件&#xff0c;其电阻、电容和二极管的特性可以通过编程进行控制和调整。这种负载广泛应用于电力系统、通信设备、电子设备等领域&#xff0c;具有很高的实用价值。 RCD负载的基本原理是利用电…

【CT】LeetCode手撕—160. 相交链表

目录 题目1- 思路2- 实现⭐160. 相交链表——题解思路 3- ACM 实现 题目 原题连接&#xff1a;160. 相交链表 1- 思路 模式识别&#xff1a;相交链表 ——> 判断是否相交 思路 保证 headA 是最长的那个链表&#xff0c;之后对其开始依次遍历 2- 实现 ⭐160. 相交链表—…

大厂晋升学习方法一:海绵学习法

早晨 30 分钟 首先&#xff0c;我们可以把起床的闹钟提前 30 分钟&#xff0c;比如原来 07:30 的闹钟可以改为 07:00。不用担心提前 30 分钟起床会影响休息质量&#xff0c;习惯以后&#xff0c;早起 30 分钟不但不会影响一天的精力&#xff0c;甚至可能反而让人更有精神。早起…

leetcode 二分查找·系统掌握 猜数字大小

题目&#xff1a; 题解&#xff1a; 使用最经典普通二分即可 int guessNumber(int n) {long l0,rn,mid;while(l<r){mid(rl)>>1;if(guess(mid)0)return mid;else if(guess(mid)-1)rmid-1;else lmid1;}return 0;}

kylin v10 离线安装chrome centos离线安装chrome linux离线安装谷歌浏览器

1. 先用自己联网的计算机&#xff0c;下载离线安装包&#xff0c;浏览器输入链接下载安装包&#xff1a; https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm 1.2. 信创环境不用执行下面&#xff0c;因为没网 1.3. 若为阿里云服务器&#xff0c;或服…

基于自主发明专利的杰林码哈希算法、对称加密算法和无损压缩算法的可文件追踪管控且支持linux和windows的文件压缩包工具SDK和JLM PACK软件介绍

基于自主发明专利的杰林码哈希算法、对称加密算法和无损压缩算法的可文件追踪管控且支持linux和windows的文件压缩包工具SDK1.0版发布&#xff0c;下载链接为&#xff1a; JLM PACK SDK和软件的官方网站 注意测试授权证书yesine_jlmpack_test.license的有效期到2024年12月&am…

【AI副业指南】用AI做心理测试图文号,单月稳赚7000+(附详细教程)

大家好&#xff0c;我是画画的小强 因为AI的出现&#xff0c;很多自媒体副业项目变得简单容易上手&#xff0c;也给予很多想要在业余时间变现的朋友更丰富的项目选择。 今天分享的赛道绝对颠覆大家的认知&#xff0c;本期将叫大家如何通过AI在自媒体平台上做心理测试账号。 …

湖南(焦点小组)源点咨询 用户座谈会现场访谈之一点心得

湖南源点调研认为&#xff1a;访谈前&#xff0c;务必先明确最核心的目的。 一些初学的同学都知道&#xff0c;访谈首先要准备一个提纲&#xff0c;的确这是一个基础工作。但我在实际情况更常见到的是&#xff1a; 访谈提纲看似全面&#xff0c;各种维度都去问&#xff0c;但…

防晒服饰「进化论」:从标准到当代人的OOTD

【潮汐商业评论/原创】 “我&#xff0c;我老公&#xff0c;我儿子&#xff0c;身上80%都被‘防晒’承包了。我怕光老化&#xff0c;我老公怕热&#xff0c;我儿子容易晒脱皮。买蕉下&#xff0c;是因为看重它的标准&#xff0c;而且此‘防晒’已非彼‘防晒’了。”宝妈Timy正…

使用VisualBox+Vagrant搭建Centos虚拟机环境

1.下载并安装VisualBox&#xff1b; 2.下载并安装Vagrant; 3.打开cmd窗口&#xff0c;执行命令vagrant init centos/7&#xff0c;初始化centos环境&#xff0c;该步骤受网络带宽影响&#xff0c;可能挂级30分钟到1个小时&#xff1b; 4.启动虚拟机&#xff1a;vagrant up&…

如何使用LiveTargetsFinder生成实时活动主机URL列表

关于LiveTargetsFinder LiveTargetsFinder是一款功能强大的实时活动主机生成工具&#xff0c;该工具可以为广大研究人员以自动化的形式生成可供分析和测试的实时活动主机URL列表&#xff0c;并通过MassDNS、Masscan和Nmap自动过滤出无法访问的主机。 我们只需要提供一个域名作…

使用mysql的binlog进行数据恢复

1.mysql安装环境 在你本地电脑windows上建一个和生产环境一样的mysql版本 我的是 mysql5.7.43 安装教程可以自行上网搜&#xff08;这里不做介绍&#xff09; 可参考&#xff1a; 1.1安装路径 我的mysql安装路径&#xff1a; D:\mysql\mysql-5.7.43-winx64\bin * 1.2my.in…