DataX源码分析-插件机制

news2024/11/24 11:38:15

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制


文章目录

  • 系列文章目录
  • 前言
  • 一、插件分类
  • 插件目录结构
  • 插件加载原理


前言

DataX的插件机制是其核心特性之一,它使得DataX能够灵活地适应各种不同的数据源的数据同步。这一机制主要基于插件开发框架,该框架主要包括Reader插件、Transformer插件、Writer插件。

DataX的插件机制还采用了框架+插件的架构。框架负责连接Reader和Writer插件,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。这种架构使得插件只需关心数据的读取或写入本身,而同步的共性问题则由框架来处理。

此外,DataX的插件机制还具有良好的扩展性和可维护性。开发者可以根据需要开发新的Reader或Writer插件来支持新的数据源类型,而无需修改DataX的核心框架代码。这种插件化的设计使得DataX能够适应不断变化的业务需求和技术环境。

在插件的加载和初始化方面,DataX使用了类似Java SPI(Service Provider Interface)的机制。它会在指定的插件目录中查找并加载插件,然后将其注册到插件注册中心。这样,当需要使用某个插件时,就可以从注册中心中获取其实例,并进行相应的操作。

总的来说,DataX的插件机制是一种非常灵活和可扩展的设计,它使得DataX能够适应各种不同的数据源和数据存储需求,同时也为开发者提供了丰富的扩展和定制化的可能性。


一、插件分类

按照功能分:
reader, 读插件,例如mysqlReader,从mysql读取数据
writer, 写插件。例如mysqlWriter,给mysql写入数据;
transformer, 中间结果转换,例如SubstrTransformer用于字符截取;
按照运行类型分:
Job级别的插件
Task级别的插件

插件目录结构

datax\plugin下分2个reader和writer目录,下面以mysql为例
在这里插入图片描述
plugin.json内容:

{
    "name": "mysqlreader",
    "class": "xxx.plugin.reader.mysqlreader.MysqlReader",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "xx"
}

在这里插入图片描述

插件加载原理

  • DataX进程启动入口为com.alibaba.datax.core.Engineengine.entry()
    public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");
        BasicParser parser = new BasicParser();
        CommandLine cl = parser.parse(options, args);
        String jobPath = cl.getOptionValue("job");
        // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
        String jobIdString = cl.getOptionValue("jobid");
        RUNTIME_MODE = cl.getOptionValue("mode");
        Configuration configuration = ConfigParser.parse(jobPath);
    }
  • 读取并解析插件配置
    ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个Configuration对象,Configuration里解析出了reader,writer,handler等插件名称;提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。
  • 动态加载插件
    插件的加载都是通过自定义类加载器JarLoader动态加载,提供插件相关Jar隔离的加载机制。插件的加载接口由LoadUtil类负责,当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现。
  • JarLoader类
    JarLoader 负责加载指定路径下的插件 JAR 文件。它会检查 JAR 文件的合法性、有效性以及是否包含必要的插件实现类。继承自URLClassLoader提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
/**
 * 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
 */
public class JarLoader extends URLClassLoader{
    public JarLoader(String[] paths) {
        this(paths, JarLoader.class.getClassLoader());
    }

    public JarLoader(String[] paths, ClassLoader parent) {
        super(getURLs(paths), parent);
    }

    private static URL[] getURLs(String[] paths) {
        Validate.isTrue(null != paths && 0 != paths.length,
                "jar包路径不能为空.");

        List<String> dirs = new ArrayList<String>();
        for (String path : paths) {
            dirs.add(path);
            JarLoader.collectDirs(path, dirs);
        }

        List<URL> urls = new ArrayList<URL>();
        for (String path : dirs) {
            urls.addAll(doGetURLs(path));
        }

        return urls.toArray(new URL[0]);
    }

    private static void collectDirs(String path, List<String> collector) {
        if (null == path || StringUtils.isBlank(path)) {
            return;
        }

        File current = new File(path);
        if (!current.exists() || !current.isDirectory()) {
            return;
        }

        for (File child : current.listFiles()) {
            if (!child.isDirectory()) {
                continue;
            }

            collector.add(child.getAbsolutePath());
            collectDirs(child.getAbsolutePath(), collector);
        }
    }

    private static List<URL> doGetURLs(final String path) {
        Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");

        File jarPath = new File(path);

        Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
                "jar包路径必须存在且为目录.");

		/* set filter */
        FileFilter jarFilter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return pathname.getName().endsWith(".jar");
            }
        };

		/* iterate all jar */
        File[] allJars = new File(path).listFiles(jarFilter);
        List<URL> jarURLs = new ArrayList<URL>(allJars.length);

        for (int i = 0; i < allJars.length; i++) {
            try {
                jarURLs.add(allJars[i].toURI().toURL());
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.PLUGIN_INIT_ERROR,
                        "系统加载jar包出错", e);
            }
        }

        return jarURLs;
    }
}

  • LoadUtil
    LoadUtil 是一个工具类,用于辅助插件的加载和初始化过程。LoadUtil 类通常包含静态方法,这些方法简化了插件加载的逻辑,使得 DataX 的核心框架能够与具体的插件进行交互。
    LoadUtil 的主要职责包括:
    插件加载:LoadUtil 提供了加载插件的方法。这些方法会根据配置文件中指定的插件类型和名称,使用 Java 的反射机制来加载插件的类定义。加载过程可能包括查找类路径下的 JAR 文件、读取插件的元数据以及验证插件的合法性。
    插件实例化:一旦插件类被加载,LoadUtil 会负责创建插件的实例。这通常涉及到调用插件类的无参构造函数,并返回该实例的引用。LoadUtil 会处理任何与实例化相关的异常,以确保在出现问题时能够给出适当的错误消息。
    插件注册:加载并实例化插件后,LoadUtil 可能会将插件实例注册到一个全局的插件注册中心。这样,DataX 的其他部分就可以在需要时获取并使用这些插件实例。
    配置传递:LoadUtil 还可能负责将配置文件中针对插件的配置参数传递给插件实例。这确保了插件能够根据用户的配置进行正确的初始化。
    错误处理:如果在加载、实例化或配置插件过程中发生错误,LoadUtil 会负责处理这些错误。这可能包括记录日志、抛出异常或采取其他恢复措施。
public class LoadUtil {
    private static final String pluginTypeNameFormat = "plugin.%s.%s";

    private LoadUtil() {
    }

    private enum ContainerType {
        Job("Job"), Task("Task");
        private String type;

        private ContainerType(String type) {
            this.type = type;
        }

        public String value() {
            return type;
        }
    }

    /**
     * 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
     * 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
     */
    private static Configuration pluginRegisterCenter;

    /**
     * jarLoader的缓冲
     */
    private static Map<String, JarLoader> jarLoaderCenter = new HashMap();

    /**
     * 设置pluginConfigs,方便后面插件来获取
     *
     * @param pluginConfigs
     */
    public static void bind(Configuration pluginConfigs) {
        pluginRegisterCenter = pluginConfigs;
    }

    private static String generatePluginKey(PluginType pluginType,
                                            String pluginName) {
        return String.format(pluginTypeNameFormat, pluginType.toString(),
                pluginName);
    }

    private static Configuration getPluginConf(PluginType pluginType,
                                               String pluginName) {
        Configuration pluginConf = pluginRegisterCenter
                .getConfiguration(generatePluginKey(pluginType, pluginName));

        if (null == pluginConf) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
                    String.format("DataX不能找到插件[%s]的配置.",
                            pluginName));
        }

        return pluginConf;
    }

    /**
     * 加载JobPlugin,reader、writer都可能要加载
     *
     * @param pluginType
     * @param pluginName
     * @return
     */
    public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
                                                  String pluginName) {
        Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
                pluginType, pluginName, ContainerType.Job);

        try {
            AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
                    .newInstance();
            jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
            return jobPlugin;
        } catch (Exception e) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    String.format("DataX找到plugin[%s]的Job配置.",
                            pluginName), e);
        }
    }

    /**
     * 加载taskPlugin,reader、writer都可能加载
     *
     * @param pluginType
     * @param pluginName
     * @return
     */
    public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,
                                                    String pluginName) {
        Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
                pluginType, pluginName, ContainerType.Task);

        try {
            AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz
                    .newInstance();
            taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
            return taskPlugin;
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
                    String.format("DataX不能找plugin[%s]的Task配置.",
                            pluginName), e);
        }
    }

    /**
     * 根据插件类型、名字和执行时taskGroupId加载对应运行器
     *
     * @param pluginType
     * @param pluginName
     * @return
     */
    public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {
        AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
                pluginName);

        switch (pluginType) {
            case READER:
                return new ReaderRunner(taskPlugin);
            case WRITER:
                return new WriterRunner(taskPlugin);
            default:
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format("插件[%s]的类型必须是[reader]或[writer]!",
                                pluginName));
        }
    }

    /**
     * 反射出具体plugin实例
     *
     * @param pluginType
     * @param pluginName
     * @param pluginRunType
     * @return
     */
    @SuppressWarnings("unchecked")
    private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
            PluginType pluginType, String pluginName,
            ContainerType pluginRunType) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);
        JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
        try {
            return (Class<? extends AbstractPlugin>) jarLoader
                    .loadClass(pluginConf.getString("class") + "$"
                            + pluginRunType.value());
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    public static synchronized JarLoader getJarLoader(PluginType pluginType,
                                                      String pluginName) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);

        JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
                pluginName));
        if (null == jarLoader) {
            String pluginPath = pluginConf.getString("path");
            if (StringUtils.isBlank(pluginPath)) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format(
                                "%s插件[%s]路径非法!",
                                pluginType, pluginName));
            }
            jarLoader = new JarLoader(new String[]{pluginPath});
            jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
                    jarLoader);
        }

        return jarLoader;
    }
}
  • ClassLoaderSwapper

ClassLoaderSwapper有一个属性storeClassLoader, 用于保存着当前线程的classLoader切换之前的ClassLoader。

/**
 * 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer,就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码
 */
public final class ClassLoaderSwapper {
    private ClassLoader storeClassLoader = null;

    private ClassLoaderSwapper() {
    }

    public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {
        return new ClassLoaderSwapper();
    }

    /**
     * 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader
     *
     * @param
     * @return
     */
    public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
        this.storeClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
        return this.storeClassLoader;
    }

    /**
     * 将当前线程的类加载器设置为保存的类加载
     * @return
     */
    public ClassLoader restoreCurrentThreadClassLoader() {
        ClassLoader classLoader = Thread.currentThread()
                .getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.storeClassLoader);
        return classLoader;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1451928.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML | DOM | 网页前端 | 常见HTML标签总结

文章目录 1.前端开发简单分类2.前端开发环境配置3.HTML的简单介绍4.常用的HTML标签介绍 1.前端开发简单分类 前端开发&#xff0c;这里是一个广义的概念&#xff0c;不单指网页开发&#xff0c;它的常见分类 网页开发&#xff1a;前端开发的主要领域&#xff0c;使用HTML、CS…

活用 Composition API 核心函数,打造卓越应用(下)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

C#根据权重抽取随机数

&#xff08;游戏中一个很常见的简单功能&#xff0c;比如抽卡抽奖抽道具&#xff0c;或者一个怪物有多种攻击动作&#xff0c;按不同的权重随机出个攻击动作等等……&#xff09; 假如有三种物品 A、B、C&#xff0c;对应的权重分别是A&#xff08;50&#xff09;&#xff0c…

yolov8源码解读Detect层

yolov8源码解读Detect层 Detect层解读网络各层解读及detect层后的处理 关于网络的backbone,head&#xff0c;以及detect层后处理&#xff0c;可以参考文章结尾博主的文章。 Detect层解读 先贴一下全部代码,下面一一解读。 class Detect(nn.Module):"""YOLOv8 …

【开源】JAVA+Vue.js实现大学计算机课程管理平台

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 实验课程档案模块2.2 实验资源模块2.3 学生实验模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 实验课程档案表3.2.2 实验资源表3.2.3 学生实验表 四、系统展示五、核心代码5.1 一键生成实验5.2 提交实验5.3 批阅实…

如何选择一个适合自己的赛道

&#xff08;点击即可收听&#xff09; 最开始一定要先做好定位&#xff0c;也就是你做短视频的目的是什么&#xff1f;当然对大多数人来说&#xff0c;终极目的肯定是赚钱&#xff0c;但赚钱的速度是由定位决定的 如果你资金比较充裕&#xff0c;不急于赚钱&#xff0c;就可以…

配置oracle连接管理器(cman)

Oracle Connection Manager是一个软件组件&#xff0c;可以在oracle客户端上指定安装这个组件&#xff0c;Oracle连接管理器代理发送给数据库服务器的请求&#xff0c;在连接管理器中&#xff0c;我们可以通过配置各种规则来控制会话访问。 简而言之&#xff0c;不同于专用连接…

基于BP算法的SAR成像matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 BP算法的基本原理 4.2 BP算法的优点与局限性 5.完整工程文件 1.课题概述 基于BP算法的SAR成像。合成孔径雷达&#xff08;SAR&#xff09;是一种高分辨率的雷达系统&#xff0c;能够在各种天气和光…

DS:八大排序之直接插入排序、希尔排序和选择排序

创作不易&#xff0c;感谢三连支持&#xff01;&#xff01; 一、排序的概念及运用 1.1 排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起 来的操作。稳定性&…

Java图形化界面编程——五子棋游戏 笔记

2.8.5 五子棋 接下来&#xff0c;我们使用之前学习的绘图技术&#xff0c;做一个五子棋的游戏。 注意&#xff0c;这个代码只实现了五子棋的落子、删除棋子和动画等逻辑实现&#xff0c;并没有把五子棋的游戏逻辑编写完整&#xff0c;比较简单易上手。 图片素材 package…

.NET Core MongoDB数据仓储和工作单元模式实操

前言 上一章节我们主要讲解了MongoDB数据仓储和工作单元模式的封装&#xff0c;这一章节主要讲的是MongoDB用户管理相关操作实操。如&#xff1a;获取所有用户信息、获取用户分页数据、通过用户ID获取对应用户信息、添加用户信息、事务添加用户信息、用户信息修改、用户信息删除…

每日OJ题_算法_递归③力扣206. 反转链表

目录 力扣206. 反转链表 解析代码 力扣206. 反转链表 206. 反转链表 LCR 024. 反转链表 难度 简单 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,…

Leetcode3010. 将数组分成最小总代价的子数组 I

Every day a Leetcode 题目来源&#xff1a;3010. 将数组分成最小总代价的子数组 I 题目描述&#xff1a; 给你一个长度为 n 的整数数组 nums 。 一个数组的代价是它的第一个元素。比方说&#xff0c;[1,2,3] 的代价是 1 &#xff0c;[3,4,1] 的代价是 3 。 你需要将 num…

HTML5+CSS3小实例:彩色拨动开关

实例:彩色拨动开关 技术栈:HTML+CSS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8" /><meta http-equiv="X-UA-Compatible" content="IE=edge" /><…

双输入宽带混合 Doherty-Outphasing功率放大器设计(2021.02 MTT)-从理论到ADS版图

基于双输入的宽带混合 Doherty-Outphasing功率放大器设计(2021.02 MTT)-从理论到ADS版图 原文: Wideband Two-Way Hybrid Doherty Outphasing Power Amplifier 发表于FEBRUARY 2021&#xff0c;在微波顶刊IEEE T MTT上面&#xff0c;使用的GAN CGH40010F 全部工程下载&#…

Covalent Network(CQT)与卡尔加里大学建立合作,共同推动区块链技术创新

Covalent Network&#xff08;CQT&#xff09;作为领先的 Web3 数据索引器和提供者&#xff0c;宣布已经与卡尔加里大学达成了具备开创性意义的合作&#xff0c;此次合作标志着推动区块链数据研究和可访问性的重要里程碑。卡尔加里大学是首个以验证者的身份加入 Covalent Netwo…

linux-firewalld防火墙端口转发

目的:通过统一地址实现对外同一地址暴露 1.系统配置文件开启 ipv4 端口转发 echo "net.ipv4.ip_forward 1" >> /etc/sysctl.confsysctl -p 2.查看防火墙配置端口转发之前的状态 firewall-cmd --statefirewall-cmd --list-all 3.开启 IP 伪装 firewall-cm…

TIM(Timer)定时中断 P1

难点&#xff1a;定时器级联、主从模式 一、简介&#xff1a; 1.TIM&#xff08;Timer&#xff09;定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 补充&#xff1a; { 定时器本质上是一个计数器&#xff0c;可以工作在定时或计数模式&…

视频生成模型作为世界模拟器

我们探索了在视频数据上大规模训练生成模型。具体来说&#xff0c;我们联合训练文本条件扩散模型&#xff0c;处理不同持续时间、分辨率和宽高比的视频和图像。我们利用一种在时空补丁上操作视频和图像潜码的transformer架构。我们最大的模型&#xff0c;Sora&#xff0c;能够生…

SQL中的各种连接的区别总结

前言 今天主要的内容是要讲解SQL中关于Join、Inner Join、Left Join、Right Join、Full Join、On、 Where区别和用法&#xff0c;不用我说其实前面的这些基本SQL语法各位攻城狮基本上都用过。但是往往我们可能用的比较多的也就是左右连接和内连接了&#xff0c;而且对于许多初学…