如何快速在 Apache DolphinScheduler 新扩展一个任务插件?

news2024/11/28 11:36:52

file

作者 | 代立冬

编辑 | Debra Chen

Apache DolphinScheduler 是现代数据工作流编排平台,具有非常强大的可视化能力,DolphinScheduler 致力于使数据工程师、分析师、数据科学家等数据工作者都可以简单轻松地搭建各种数据工作流,让数据处理流程更简单可靠。

DolphinScheduler 非常易于使用(easy to use),目前有四种创建工作流的方法:

  • 在 UI 界面上直接通过拖放任务的方式来创建任务
  • PyDolphinScheduler,通过 Python API 创建工作流,也就是 workflow as code 的方式
  • 编写 yaml 文件,通过 yaml 创建工作流(目前必须安装 PyDolphinScheduler)
  • 通过 Open API 的方式来创建工作流

以上 4 种总有一种方式适合您的场景!

得益于 DolphinScheduler 采用无中心化的整体架构设计,使得 DolphinScheduler 调度性能也是同类开源数据工作流编排平台的 5 倍以上,如果您正有这样的性能问题或者调度延时问题,也不妨试试 DolphinScheduler。

file DolphinScheduler界面

好的,接下来言归正题,有不少用户想在 DolphinScheduler 扩展新的任务插件支持(比如添加 Kettle),DolphinScheduler 的任务插件体系是基于 SPI 来进行任务插件扩展的。

什么是 SPI 服务发现?

SPI 是 Service Provider Interface 的缩写,是一种常见的服务提供发现机制,比如知名的 OLAP 引擎 Presto 也是使用 SPI 来扩展的。在 java.util.ServiceLoader 的文档里有比较详细的介绍,其抽象的概念是指动态加载某个服务实现。

比如 java.sql.Driver 接口,不同厂商可以针对同一接口做出不同的实现,比如 MySQL 和 PostgreSQL 都有不同的实现提供给用户,而 Java 的 SPI 机制可以为某个接口寻找服务实现。Java 中 SPI 机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

SPI 整体机制图如下: file

SPI 机制中有 4 个重要的组件 :

  • 服务接口 Service Interface
  • 服务接口实现:不同的服务提供方可以提供一个或多个实现;框架或者系统本身也可以提供默认的实现
  • 提供者注册 API(Provider Registration API),这是提供者用来注册实现的
  • 服务访问 API (Service Access API) ,这是调用方用来获取服务的实例的接口

Apache DolphinScheduler 从 2.0 版本开始引入 SPI。将 Apache DolphinScheduler 的 Task 看成一个执行服务,而我们需要根据使用者的选择去执行不同的服务,如果没有的服务,则需要我们自己扩充,我们只需要完成我们的 Task 具体实现逻辑,然后遵守 SPI 的规则,编译成 Jar 并上传到指定目录,就可以使用我们自己编写的 Task 插件来执行具体的任务了。

谁在使用它?

除了前面提到的 Presto 外,还有以下技术都使用到 SPI 技术:

1、Apache DolphinScheduler

  • Task
  • Datasource

2、Apache Flink

  • Flink sql connector,用户实现了一个 Flink-connector 后,Flink 也是通过 SPI 来动态加载的

3、SpringBoot

  • Spring boot spi

4、JDBC

  • JDBC4 也基于 SPI 的机制来发现驱动提供商了,可以通过META-INF/services/java.sql.Driver 文件里指定实现类的方式来暴露驱动提供者

5、更多

  • common-logging

DolphinScheduler SPI工作流程

file

如上图,Apache DolphinScheduler 中有 2 种 Task : 逻辑 Task 和物理 Task,逻辑 Task 指 Dependent Task,Switch Task 这种控制工作流逻辑的任务插件;物理 Task 是指 Shell Task,SQL Task ,Spark Task ,Python Task 等这种执行具体任务的 Task。

在 Apache DolphinScheduler 中,我们一般扩充的都是物理 Task,物理 Task 都是由 Worker 来调用并执行的,当启动 Worker 服务时,Worker 会来加载相应的实现了规则的 Task lib,HiveTask 被 Apache DolphinScheduler TaskPluginManage 加载了。SPI 的规则图上也有描述,也可以参考 java.util.ServiceLoader 类。

如何扩展一个任务插件?

创建 Maven 项目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.dolphinscheduler \
    -DarchetypeArtifactId=dolphinscheduler-hive-client-task \
    -DarchetypeVersion=1.10.0 \
    -DgroupId=org.apache.dolphinscheduler \
    -DartifactId=dolphinscheduler-hive-client-task \
    -Dversion=0.1 \
    -Dpackage=org.apache.dolphinscheduler \
    -DinteractiveMode=false 

Maven 依赖

org.apache.dolphinscheduler
     dolphinscheduler-spi
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}




     org.apache.dolphinscheduler
     dolphinscheduler-task-api
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}

创建 Task 通道工厂(TaskChannelFactory)

org.apache.dolphinscheduler.spi.task.TaskChannel

插件实现以上接口即可。主要包含创建任务(任务初始化,任务运行等方法)、任务取消,如果是 yarn 任务,则需要实现 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask。

我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。

首先我们需要创建任务服务的工厂,其主要作用是帮助构建 TaskChannel 以及 TaskPlugin 参数,同时给出该任务的唯一标识,ChannelFactory 在 Apache DolphinScheduler 的 Task 服务组中,其作用属于是在任务组中的承上启下,交互前后端以及帮助 Worker 构建 TaskChannel。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class HiveClientTaskChannelFactory implements TaskChannelFactory {
    /**
    * Create task channel, execute task through this channel
     * @return task channel
     */
    @Override
    public TaskChannel create() {
        return new HiveCliTaskChannel();
    }
    /**
    * Returns the global unique identifier of this task
     * @return task name
     */
    @Override
    public String getName() {
        return "HIVECLI";
    }
    /**
    * Parameters required for front-end pages
     * @return
     */
    @Override
    public List getParams() {
        return null;
    }
}

创建 TaskChannel

有了工厂之后,我们会根据工厂创建出 TaskChannel,TaskChannel 包含如下两个方法,一个是取消,一个是创建,目前不需要关注取消,主要关注创建任务。

   void cancelApplication(boolean status);
    /**
     * 构建可执行任务
     */
    AbstractTask createTask(TaskRequest taskRequest);
    public class HiveClientTaskChannel implements TaskChannel {
    @Override
    public void cancelApplication(boolean b) {
        //do nothing
    }
    @Override
    public AbstractTask createTask(TaskRequest taskRequest) {
        return new HiveClientTask(taskRequest);
    }
}

构建 Task 实现

通过 TaskChannel 我们得到了可执行的物理 Task,但是我们需要给当前 Task 添加相应的实现,才能够让Apache DolphinScheduler 去执行你的任务,首先在编写 Task 之前我们需要先了解一下 Task 之间的关系:

file

通过上图我们可以看到,基于 Yarn 执行任务的 Task 都会去继承 AbstractYarnTask,不需要经过 Yarn 执行的都会去直接继承 AbstractTaskExecutor,主要是包含一个 AppID,以及 CanalApplication setMainJar 之类的方法,想知道的小伙伴可以自己去深入研究一下,如上可知我们实现的 HiveClient 就需要继承 AbstractYarnTask,在构建 Task 之前,我们需要构建一下适配 HiveClient 的 Parameters 对象用来反序列化JsonParam。

package com.jegger.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import java.util.List;
public class HiveClientParameters extends AbstractParameters {
    /**
     * 用HiveClient执行,最简单的方式就是将所有SQL全部贴进去即可,所以我们只需要一个SQL参数
     */
    private String sql;
    public String getSql() {
        return sql;
    }
    public void setSql(String sql) {
        this.sql = sql;
    }
    @Override
    public boolean checkParameters() {
        return sql != null;
    }
    @Override
    public List getResourceFilesList() {
        return null;
    }
}

实现了 Parameters 对象之后,我们具体实现 Task,例子中的实现比较简单,就是将用户的参数写入到文件中,通过 Hive -f 去执行任务。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class HiveClientTask extends AbstractYarnTask {
    /**
     * hive client parameters
     */
    private HiveClientParameters hiveClientParameters;
    /**
     * taskExecutionContext
     */
    private final TaskRequest taskExecutionContext;
    public HiveClientTask(TaskRequest taskRequest) {
        super(taskRequest);
        this.taskExecutionContext = taskRequest;
    }
    /**
     * task init method
     */
    @Override
    public void init() {
        logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
        this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);
        if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {
            throw new RuntimeException("hive client task params is not valid");
        }
    }
    /**
     * build task execution command
     *
     * @return task execution command or null
     */
    @Override
    protected String buildCommand() {
        String filePath = getFilePath();
        if (writeExecutionContentToFile(filePath)) {
            return "hive -f " + filePath;
        }
        return null;
    }
    /**
     * get hive sql write path
     *
     * @return file write path
     */
    private String getFilePath() {
        return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
    }
    @Override
    protected void setMainJarName() {
        //do nothing
    }
    /**
     * write hive sql to filepath
     *
     * @param filePath file path
     * @return write success?
     */
    private boolean writeExecutionContentToFile(String filePath) {
        Path path = Paths.get(filePath);
        try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
            writer.write(this.hiveClientParameters.getSql());
            logger.info("file:" + filePath + "write success.");
            return true;
        } catch (IOException e) {
            logger.error("file:" + filePath + "write failed.please path auth.");
            e.printStackTrace();
            return false;
        }
    }
    @Override
    public AbstractParameters getParameters() {
        return this.hiveClientParameters;
    }
}

遵守 SPI 规则

# 1,Resource下创建META-INF/services文件夹,创建接口全类名相同的文件
zhang@xiaozhang resources % tree ./
./
└── META-INF
    └── services
        └── org.apache.dolphinscheduler.spi.task.TaskChannelFactory
# 2,在文件中写入实现类的全限定类名
zhang@xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory 
org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

打包和部署

## 1,打包
mvn clean install
## 2,部署
cp ./target/dolphinscheduler-task-hiveclient-1.0.jar $DOLPHINSCHEDULER_HOME/lib/
## 3,restart dolphinscheduler server

以上操作完成后,我们查看 worker 日志 tail -200f $Apache DolphinScheduler_HOME/log/Apache DolphinScheduler-worker.log

file

Apache DolphinScheduler 的插件开发就到此完成~涉及到前端的修改可以参考: Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

  • NOTICE:目前任务插件的前端还没有实现,因此你需要单独实现插件对应的前端页面。

TaskChannelFactory 继承自 PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 IllegalArgumentException。

如果任务插件存在类冲突,你可以采用 Shade-Relocating Classes(https://maven.apache.org/plugins/maven-shade-plugin/)来解决这种问题。

如果您有兴趣试试 Apache DolphinScheduler ,欢迎微信添加小助手 Leonard-ds 或加入 DolphinScheduler Slack: https://s.apache.org/dolphinscheduler-slack, 我将免费全力支持您!

  • 参考:
    • 极速开发扩充 Apache DolphinScheduler Task 类型 | 实用教程
    • https://blog.csdn.net/s1293678392/article/details/120048318

      本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1028497.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Python绘制多个股票的K线图

K线图是金融领域常用的技术分析工具,可以洞察地展示股票的开盘价、收盘价、最高价和最低价等信息。在投资决策中,对多个股票的走势进行对比分析是非常重要的。随着金融市场的发展,投资者对于多种股票的对比分析需求越来越高。传统的方式是通过…

tokio::net学习

tokio::net 该模块包含TCP/UDP/Unix网络类型,类似于标准库,可用于实现网络协议。 networking protocols Organization TcpListener and TcpStream provide functionality for communication over TCP UdpSocket provides functionality for communication over UDP UnixLi…

信创办公–基于WPS的PPT最佳实践系列 (项目8创建电子相册)

信创办公–基于WPS的PPT最佳实践系列 (项目8创建电子相册) 目录 应用背景操作步骤 应用背景 如果我们想把图片弄成相册,或者弄成一段有音乐的视频分享给朋友。我们可以利用PPT来制作。那我们如何用PPT制作电子相册或视频呢?可以跟…

21天学会C++:Day13----动态内存管理

CSDN的uu们,大家好。这里是C入门的第十三讲。 座右铭:前路坎坷,披荆斩棘,扶摇直上。 博客主页: 姬如祎 收录专栏:C专题 目录 1. 加深对内存四区的理解 2. new-delete 与 malloc-free 2.1 能否用 fre…

Django:一、创建项目、APP及启动Django

一、准备工具 Pycharm企业版 二、创建项目 打开Pycharm企业版,创建Django项目。 注意:①删除项目下的templates文件夹;②删除setting.py文件中的一行代码 默认文件介绍: 三、创建APP 点击Pycharm左下角Terminal,打…

基于Android+OpenCV+CNN+Keras的智能手语数字实时翻译——深度学习算法应用(含Python、ipynb工程源码)+数据集(四)

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 数据预处理2. 数据增强3. 模型构建4. 模型训练及保存5. 模型评估6. 模型测试1)权限注册2)模型导入3)总体模型构建4)处理视频中的预览帧数据5)处理图片数…

区块链技术:解密去中心化的革命

文章目录 区块链的基础概念什么是区块链?区块链的核心原理1. 分布式账本2. 区块3. 加密技术4. 共识机制 区块链的工作原理区块链的交易过程区块链的安全性共识机制的作用 区块链的应用领域1. 金融服务2. 供应链管理3. 物联网4. 医疗保健5. 政府与公共服务 区块链的未…

图像练习OpenCV(01)

提取出里面最大矩形的四个顶点坐标 源图像 结果展示 代码 void getLine(std::vector<int>& data, int threshold) {for (int x 0; x < data.size(); x){if (0 data[x]){continue;}int maxValue 0, maxLoc -1, i -1;for (i x; i < data.size(); i){if …

【2023集创赛】Arm杯二等奖作品:基于Arm Cortex-M3的体感节奏音乐游戏机

本文为2023年第七届全国大学生集成电路创新创业大赛&#xff08;“集创赛”&#xff09;安谋科技杯全国二等奖作品分享&#xff0c;参加极术社区的【有奖征集】分享你的2023集创赛作品&#xff0c;秀出作品风采&#xff0c;分享2023集创赛作品扩大影响力&#xff0c;更有丰富电…

跑腿系统开发:构建实时任务分配算法的技术挑战

在跑腿系统中&#xff0c;实时任务分配算法是确保任务快速高效完成的关键因素之一。本文将介绍构建实时任务分配算法时可能面临的技术挑战&#xff0c;并提供一个简单的Python示例来解决这些挑战。 技术挑战&#xff1a; 实时数据处理&#xff1a; 跑腿系统需要处理大量的实时任…

Ganache本地测试网+cpolar内网穿透实现公网访问内网

文章目录 前言1. 本地环境服务搭建2. 局域网测试访问3. 内网穿透3.1 ubuntu本地安装cpolar内网穿透3.2 创建隧道3.3 测试公网访问 4. 配置固定二级子域名4.1 保留一个二级子域名4.2 配置二级子域名4.3 测试访问公网固定二级子域名 前言 网&#xff1a;我们通常说的是互联网&am…

K8S:Pod概念、分类及相关的策略

文章目录 一.pod相关概念&#xff11;.Pod基础概念&#xff12;.Kubrenetes集群中Pod两种使用方式&#xff13;.pause容器的Pod中的所有容器共享的资源&#xff14;.kubernetes中的pause容器主要为每个容器提供功能&#xff1a;&#xff15;.Kubernetes设计这样的Pod概念和特殊…

本地搭建CFimagehost私人图床——“cpolar内网穿透”

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

7.algorithm2e中while怎么使用

algorithm2e中while怎么使用 在 algorithm2e 宏包中&#xff0c;要使用 while 循环&#xff0c;您可以使用 \While 和 \EndWhile 命令来定义循环的开始和结束。以下是如何使用 while 循环的示例&#xff1a; \documentclass{article} \usepackage[linesnumbered,boxed]{algorit…

Mac电脑音视频播放器: Infuse for Mac中文

Infuse是一款流行的多媒体播放器应用程序&#xff0c;适用于iOS、tvOS和macOS平台。它由Firecore开发&#xff0c;旨在提供出色的媒体播放体验&#xff0c;并支持广泛的视频和音频格式。 以下是Infuse的一些主要功能和特点&#xff1a; 多媒体格式支持&#xff1a;Infuse支持…

ROS2 从头开始​​:第 1 部分 — 机器人操作系统简介

火星上的机器人&#xff08;AI生成图像&#xff09; 一、说明 ROS2是机器人的朋友&#xff0c;一个他们所依赖的平台&#xff0c;用于沟通、协调和控制&#xff0c;帮助他们实现目标。ROS2以DDS为核心&#xff0c;帮助机器人探索新世界、新任务、新可能性&#xff0c;是一个方…

代码随想录--链表-反转链表

题意&#xff1a;反转一个单链表。 示例: 输入: 1->2->3->4->5->NULL 输出: 5->4->3->2->1->NULL 双指针 public class Main {public static class ListNode {int val;ListNode next;ListNode(int x) {val x;}}public ListNode reverseList(L…

涨知识,关于代码签名证书10大常见问题解答

在当今互联网时代&#xff0c;各种软件程序充斥着这个网络世界&#xff0c;大大小小的软件层出不穷&#xff0c;如何让用户信任软件并下载软件&#xff0c;是众多软件开发公司需要解决的问题&#xff0c;由此代码签名证书应运而生&#xff0c;提供了软件程序的身份认证、完整性…

uni-app获取元素具体位置获取失败

场景&#xff1a;想要通过链接跳转传递catid&catid2类别id,商品类别id 跳到这一页左侧对应的类别栏上面,同时跳到右侧列表滚动到对应商品那一块区域。 遇到的问题&#xff1a;在for循环中通过绑定id获取不到商品列表的具体位置。 原因&#xff1a;在onReady函数和mounted函…

【Java】医院智能导诊系统源码:解决患者盲目就诊问题、降低患者挂错号比例

医院智能导诊系统解决患者盲目就诊问题&#xff0c;减轻分诊工作压力。降低患者挂错号比例&#xff0c;优化患者就诊流程&#xff0c;有效提高线上线下医疗机构接诊效率。患者可通过人体画像选择症状部位&#xff0c;了解对应病症信息和推荐就医科室。 一、医院智能导诊系统概述…