系列文章目录
一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制
文章目录
- 系列文章目录
- 前言
- 一、插件分类
- 插件目录结构
- 插件加载原理
前言
DataX的插件机制是其核心特性之一,它使得DataX能够灵活地适应各种不同的数据源的数据同步。这一机制主要基于插件开发框架,该框架主要包括Reader插件、Transformer插件、Writer插件。
DataX的插件机制还采用了框架+插件的架构。框架负责连接Reader和Writer插件,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。这种架构使得插件只需关心数据的读取或写入本身,而同步的共性问题则由框架来处理。
此外,DataX的插件机制还具有良好的扩展性和可维护性。开发者可以根据需要开发新的Reader或Writer插件来支持新的数据源类型,而无需修改DataX的核心框架代码。这种插件化的设计使得DataX能够适应不断变化的业务需求和技术环境。
在插件的加载和初始化方面,DataX使用了类似Java SPI(Service Provider Interface)的机制。它会在指定的插件目录中查找并加载插件,然后将其注册到插件注册中心。这样,当需要使用某个插件时,就可以从注册中心中获取其实例,并进行相应的操作。
总的来说,DataX的插件机制是一种非常灵活和可扩展的设计,它使得DataX能够适应各种不同的数据源和数据存储需求,同时也为开发者提供了丰富的扩展和定制化的可能性。
一、插件分类
按照功能分:
reader, 读插件,例如mysqlReader,从mysql读取数据
writer, 写插件。例如mysqlWriter,给mysql写入数据;
transformer, 中间结果转换,例如SubstrTransformer用于字符截取;
按照运行类型分:
Job级别的插件
Task级别的插件
插件目录结构
datax\plugin下分2个reader和writer目录,下面以mysql为例
plugin.json内容:
{
"name": "mysqlreader",
"class": "xxx.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "xx"
}
插件加载原理
- DataX进程启动入口为com.alibaba.datax.core.Engineengine.entry()
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
}
- 读取并解析插件配置
ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个Configuration对象,Configuration里解析出了reader,writer,handler等插件名称;提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。 - 动态加载插件
插件的加载都是通过自定义类加载器JarLoader动态加载,提供插件相关Jar隔离的加载机制。插件的加载接口由LoadUtil类负责,当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现。 - JarLoader类
JarLoader 负责加载指定路径下的插件 JAR 文件。它会检查 JAR 文件的合法性、有效性以及是否包含必要的插件实现类。继承自URLClassLoader提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
/**
* 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
*/
public class JarLoader extends URLClassLoader{
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}
public JarLoader(String[] paths, ClassLoader parent) {
super(getURLs(paths), parent);
}
private static URL[] getURLs(String[] paths) {
Validate.isTrue(null != paths && 0 != paths.length,
"jar包路径不能为空.");
List<String> dirs = new ArrayList<String>();
for (String path : paths) {
dirs.add(path);
JarLoader.collectDirs(path, dirs);
}
List<URL> urls = new ArrayList<URL>();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}
return urls.toArray(new URL[0]);
}
private static void collectDirs(String path, List<String> collector) {
if (null == path || StringUtils.isBlank(path)) {
return;
}
File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}
for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}
collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}
private static List<URL> doGetURLs(final String path) {
Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");
File jarPath = new File(path);
Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
"jar包路径必须存在且为目录.");
/* set filter */
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar");
}
};
/* iterate all jar */
File[] allJars = new File(path).listFiles(jarFilter);
List<URL> jarURLs = new ArrayList<URL>(allJars.length);
for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR,
"系统加载jar包出错", e);
}
}
return jarURLs;
}
}
- LoadUtil
LoadUtil 是一个工具类,用于辅助插件的加载和初始化过程。LoadUtil 类通常包含静态方法,这些方法简化了插件加载的逻辑,使得 DataX 的核心框架能够与具体的插件进行交互。
LoadUtil 的主要职责包括:
插件加载:LoadUtil 提供了加载插件的方法。这些方法会根据配置文件中指定的插件类型和名称,使用 Java 的反射机制来加载插件的类定义。加载过程可能包括查找类路径下的 JAR 文件、读取插件的元数据以及验证插件的合法性。
插件实例化:一旦插件类被加载,LoadUtil 会负责创建插件的实例。这通常涉及到调用插件类的无参构造函数,并返回该实例的引用。LoadUtil 会处理任何与实例化相关的异常,以确保在出现问题时能够给出适当的错误消息。
插件注册:加载并实例化插件后,LoadUtil 可能会将插件实例注册到一个全局的插件注册中心。这样,DataX 的其他部分就可以在需要时获取并使用这些插件实例。
配置传递:LoadUtil 还可能负责将配置文件中针对插件的配置参数传递给插件实例。这确保了插件能够根据用户的配置进行正确的初始化。
错误处理:如果在加载、实例化或配置插件过程中发生错误,LoadUtil 会负责处理这些错误。这可能包括记录日志、抛出异常或采取其他恢复措施。
public class LoadUtil {
private static final String pluginTypeNameFormat = "plugin.%s.%s";
private LoadUtil() {
}
private enum ContainerType {
Job("Job"), Task("Task");
private String type;
private ContainerType(String type) {
this.type = type;
}
public String value() {
return type;
}
}
/**
* 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
*/
private static Configuration pluginRegisterCenter;
/**
* jarLoader的缓冲
*/
private static Map<String, JarLoader> jarLoaderCenter = new HashMap();
/**
* 设置pluginConfigs,方便后面插件来获取
*
* @param pluginConfigs
*/
public static void bind(Configuration pluginConfigs) {
pluginRegisterCenter = pluginConfigs;
}
private static String generatePluginKey(PluginType pluginType,
String pluginName) {
return String.format(pluginTypeNameFormat, pluginType.toString(),
pluginName);
}
private static Configuration getPluginConf(PluginType pluginType,
String pluginName) {
Configuration pluginConf = pluginRegisterCenter
.getConfiguration(generatePluginKey(pluginType, pluginName));
if (null == pluginConf) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
String.format("DataX不能找到插件[%s]的配置.",
pluginName));
}
return pluginConf;
}
/**
* 加载JobPlugin,reader、writer都可能要加载
*
* @param pluginType
* @param pluginName
* @return
*/
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);
try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX找到plugin[%s]的Job配置.",
pluginName), e);
}
}
/**
* 加载taskPlugin,reader、writer都可能加载
*
* @param pluginType
* @param pluginName
* @return
*/
public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,
String pluginName) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Task);
try {
AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz
.newInstance();
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return taskPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX不能找plugin[%s]的Task配置.",
pluginName), e);
}
}
/**
* 根据插件类型、名字和执行时taskGroupId加载对应运行器
*
* @param pluginType
* @param pluginName
* @return
*/
public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {
AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
pluginName);
switch (pluginType) {
case READER:
return new ReaderRunner(taskPlugin);
case WRITER:
return new WriterRunner(taskPlugin);
default:
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format("插件[%s]的类型必须是[reader]或[writer]!",
pluginName));
}
}
/**
* 反射出具体plugin实例
*
* @param pluginType
* @param pluginName
* @param pluginRunType
* @return
*/
@SuppressWarnings("unchecked")
private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
+ pluginRunType.value());
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路径非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}
return jarLoader;
}
}
- ClassLoaderSwapper
ClassLoaderSwapper有一个属性storeClassLoader, 用于保存着当前线程的classLoader切换之前的ClassLoader。
/**
* 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer,就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码
*/
public final class ClassLoaderSwapper {
private ClassLoader storeClassLoader = null;
private ClassLoaderSwapper() {
}
public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {
return new ClassLoaderSwapper();
}
/**
* 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader
*
* @param
* @return
*/
public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
this.storeClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
return this.storeClassLoader;
}
/**
* 将当前线程的类加载器设置为保存的类加载
* @return
*/
public ClassLoader restoreCurrentThreadClassLoader() {
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.storeClassLoader);
return classLoader;
}
}