基于 Apache Dolphinscheduler3.1.9中的Task 处理流程解析

news2024/12/24 8:52:15

实现一个调度任务,可能很简单。但是如何让工作流下的任务跑得更好、更快、更稳定、更具有扩展性,同时可视化,是值得我们去思考得问题。

Apache DolphinScheduler是一个分布式和可扩展的开源工作流协调平台,具有强大的DAG可视化界面,广泛应用于数据集成、数据分析和大规模数据迁移。

Master 整体启动流程

    @PostConstruct
    public void run() throws SchedulerException {
        // init rpc server
        this.masterRPCServer.start();

        // install task plugin
        this.taskPluginManager.loadPlugin();

        // self tolerant
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);

        this.masterSchedulerBootstrap.init();
        // 处理任务的核心 重点是处理任务
        this.masterSchedulerBootstrap.start();

        // 事件执行启动
        this.eventExecuteService.start();
        this.failoverExecuteThread.start();

        this.schedulerApi.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (!ServerLifeCycleManager.isStopped()) {
                close("MasterServer shutdownHook");
            }
        }));
    }

上面的代码主要做的事情:

初始化rpc的服务端,也即netty的服务端,为处理请求做好铺垫
安装Task插件,task插件主要为业务需要集成的SPI信息
master注册客户端启动
初始化master定时引导
master定时引导启动
事件执行启动
失败执行线程启动
定时任务api启动

下面我们重点看这两段代码:

this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();

this.masterSchedulerBootstrap.start()

任务的来源在t_ds_command表里面,因此需要先取出指令信息,然后将指令转处理实例,遍历处理实例,创建新的工作流线程。

(1) 将处理实例id和处理线程放入到processInstanceExecCacheManager中。

(2) 添加工作流运行状态和实例id放入到workflowEventQueue队列中。

因此可以看到消费的poolEvent,可以看到workflowEventQueue.poolEvent() 本质就是workflowEventLooper.start()启动消费。

此时我们可以看到workflowEventLooper.start(),处理放入到workflowEventQueue中的事件event,也即workflowEventQueue.take(),获取工作流处理器,处理事件。

Run中的核心处理:

刘亚洲

此时先执行工作流的流程,核心方法:workflowExecuteRunnable::call

workflowEventLooper.start()启动做的事情:

将任务放入到优先任务队列之后,就可以进行消费队列中的任务了。

ProcessInstance任务放入的核心

ProcessInstance 启动入口点为:org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#call

根据工作流线程状态可分为:

1)创建过程中的逻辑:

2)初始化DAG的过程:

3)初始化队列的过程:

4)工作流状态成功:

生成者消费者模型的产生是优先任务队列的放入和优先任务队列的消费。

因此可以看到两者在转换的过程之后基于Netty做了任务的转发操作,从而在Netty中做指令处理,从而完成消费,最终流转到具体的Task。

消费任务:TaskPriorityQueueConsumer

核心方法:this.batchDispatch(fetchTaskNum)

netty服务端消费任务消息

实质是放入任务线程,也即:

workerManager.offer(workerTaskExecuteRunnable)

处理消费在run方法里面:

waitSubmitQueue.take()

处理任务的核心:此时会具体流转到具体的任务,执行处理

此时完成任务的适配业务任务的处理,最终实现任务的处理。

this.eventExecuteService.start()

针对任务处理的状态进行处理。

stateEventHandler.handleStateEvent(this, stateEvent)

其主要过程是添加状态事件和消费状态事件,重点看队列的生产和消费。

分析的思路和上面的队列模式差不多,这里不展开了。

总结

从上面我们可以看到生产者消费者模型、线程模型在DS3.1.9版本中使用非常的多,也是我们去了解处理的思路的点。

同时对应任务的处理,为了保证任务的高效处理,使用了Netty来处理任务。

总体来说,代码写得还是很不错的,值得我们去学习。同时还使用了很多设计模式,比如SPI、工厂模式、状态模式等等。

参考:

dolphinscheduler文档:https://dolphinscheduler.apache.org/zh-cn github地址:https://github.com/apache/dolphinscheduler

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2255626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

物联网接入网关的数据安全和高效传输详解

物联网接入网关,作为连接物联网终端设备与云端或本地服务器的关键环节,不仅负责数据的汇聚与转发,更需确保数据在传输过程中的安全无虞与高效流畅。 一、数据安全:构筑坚实防线 1. 加密技术的应用 天拓四方物联网接入网关内置了…

双指针算法(超详细版)

希望大家多多关注,有三必回 1.双指针 1.1快慢双指针 快慢双指针常用来解决循环问题,或是查找中间节点 1.1.1循环链表(141. 环形链表 - 力扣(LeetCode)) 解题思路: 1.定义快慢指针fast和slo…

Rain后台权限管理系统,快速开发

这段时间一直没有更新,因为在写一个脚手架,今天Rain项目终于完工,已经发布到github,免费使用 项目github链接 https://github.com/Rain-hechang/Rain 简介 前端采用Vue3.x、Element UI。 后端采用Spring Boot、Spring Security、Redis &…

欧歌Web电视 1.2|全新修改版,新增更多频道,更稳定

欧歌Web电视App是一款功能强大的电视直播软件,通过WebView二次开发,对内置线路进行了优化和增加,让用户可以看到更多的频道。首次打开如果不会自动全屏,可以进入设置调整画面尺寸。该版本新增了多个地方频道和娱乐内容频道&#x…

嵌入式系统与移动设备开发

文章目录 1 嵌入式系统概述1.1 嵌入式系统基本概念1.1.1 嵌入式系统定义1.1.2 嵌入式系统的发展1.1.3 嵌入式系统的特点 1.2 嵌入式系统分类1.2.1 单个微处理器1.2.2 嵌入式处理器可扩展的系统1.2.3 复杂的嵌入式系统1.2.4 在制造或过程控制中使用的计算机系统 1.3 嵌入式处理器…

使用 Elastic 和 Amazon Bedrock 制作混合地理空间 RAG 应用程序

作者:来自 Elastic Udayasimha Theepireddy (Uday), Srinivas Pendyala, Ayan Ray 借助 Elasticsearch 及其向量数据库,你可以构建可配置的搜索和可信的生成式 AI (GenAI) 体验,这些体验可快速从原型扩展到生产。主要功能包括: 内…

【opencv入门教程】15. 访问像素的十四种方式

文章选自: 一、像素访问 一张图片由许多个点组成,每个点就是一个像素,每个像素包含不同的值,对图像像素操作是图像处理过程中常使用的 二、访问像素 void Samples::AccessPixels1(Mat &image, int div 64) {int nl imag…

Ansys Maxwell使用技巧

1、回到原点 点击Fit All 2、长方体做差 选中两个长方体, 点击Subtracct,就可以得到一个镂空的绕组。 3、电感仿真步骤 3.1 画磁芯 3.2 画绕组 3.3 加激励 选择截面积-右键绕组-Edit-Surface-Section-YZ 选择一个截面添加电流激励 3.4选材料 绕组一…

掌握谈判技巧,达成双赢协议

在当今竞争激烈且合作频繁的社会环境中,谈判成为了我们解决分歧、谋求共同发展的重要手段。无论是商业合作、职场交流,还是国际事务协商,掌握谈判技巧以达成双赢协议都具有极其关键的意义。它不仅能够让各方在利益分配上找到平衡点&#xff0…

MacOS 命令行详解使用教程

本章讲述MacOs命令行详解的使用教程,感谢大家观看。 本人博客:如烟花般绚烂却又稍纵即逝的主页 MacOs命令行前言: 在 macOS 上,Terminal(终端) 是一个功能强大的工具,它允许用户通过命令行直接与系统交互。本教程将详细介绍 macOS…

第十七章 使用 MariaDB 数据库管理系统

1. 数据库管理系统 数据库是指按照某些特定结构来存储数据资料的数据仓库。在当今这个大数据技术迅速崛起的年代,互联网上每天都会生成海量的数据信息,数据库技术也从最初只能存储简单的表格数据的单一集中存储模式,发展到了现如今存储海量…

11.17【大数据】Hadoop【DEBUG】

列出hdfs文件系统所有的目录和文件 主节点上 子结点 是一样的 *为什么能登进 slave 02 的主机,但是 master 当中依然显示 slave 02 为 DeadNode?* hadoop坏死节点的重启_hadoop3 子节点重启-CSDN博客 注意hadoop-daemon.sh 实际上位于 Hadoop 的 sbin 目录中,而不…

MetaGPT 安装

1. 创建环境 conda create -n metagpt python3.10 && conda activate metagpt2. 可编辑方式安装 git clone --depth 1 https://github.com/geekan/MetaGPT.git cd MetaGPT pip install -e .3. 配置 metagpt --init-config运行命令,在C盘位置C:\Users\325…

图的最小生成树(Kruskal算法,Prim算法)

无向图中的最短路径问题?No,最短路径不是最小生成树! 什么是最小生成树? 在一个无向连通图中,有一个子图连接所有顶点,并且权重和最小,那么他就是最小生成树。如果权重和不是最小的只能叫做生…

【Flink-scala】DataStream编程模型之水位线

DataStream API编程模型 1.【Flink-Scala】DataStream编程模型之 数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之 窗口计算-触发器-驱逐器 文章目录 DataStream API编程模型前言…

PHP RabbitMQ连接超时问题

问题背景 Error: The connection timed out after 3 sec while awaiting incoming data 看到这个报错,我不以为意,认为是我设置的超时时间不够导致的,那就设置长一点 Error: The connection timed out after 300 sec while awaiting incom…

【LeetCode热题100】BFS解决FloodFill算法

这篇博客主要记录了使用BFS解决FloodFill算法的几道题目&#xff0c;包括图像渲染、岛屿数量、岛屿的最大面积、被包围的区域。 class Solution {using PII pair<int, int>; public:vector<vector<int>> floodFill(vector<vector<int>>& im…

L2G3000-LMDeploy 量化部署实践

文章目录 LMDeploy 量化部署实践闯关任务环境配置W4A16 量化 KV cacheKV cache 量化Function call LMDeploy 量化部署实践闯关任务 环境配置 conda create -n lmdeploy python3.10 -y conda activate lmdeploy conda install pytorch2.1.2 torchvision0.16.2 torchaudio2.1.…

大数据新视界 -- 大数据大厂之 Hive 临时表与视图:灵活数据处理的技巧(上)(29 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

Ubuntu操作系统在Vmware中的安装、常用操作、最基础的知识、imx6ll基本开发环境配置

01-Ubuntu操作系统的安装 网盘搜索 “ubuntu18.04.zip”&#xff0c;下载下来之后用Vmware打开就行了。 我用的虚拟机是15.5.6&#xff0c;实测没有问题。 启动时用户名为book的密码为123456 提问&#xff1a;Ubuntu与Centos系统有何区别&#xff1f; 详情见 https://blog.cs…