目录
-
- 概要
- 整体架构流程
- 技术名词解释
- 技术细节
- 小结
概要
在现代数据处理和ETL(提取、转换、加载)流程中,Kettle(Pentaho Data Integration, PDI)作为一种强大的开源ETL工具,被广泛应用于各种数据处理场景。然而,将Kettle集成到Spring Boot项目中时,开发者常常面临诸多挑战。目前网上常见的集成方法大多采用依赖的方式,直接在Spring Boot项目的pom.xml
文件中引入Kettle的相关依赖。
然而,这种集成方式存在诸多问题。首先,Kettle底层采用了OSGI(Open Services Gateway Initiative)模块化组件加载方式,这使得一些大数据组件无法完整集成。其次,Kettle自身引用了大量第三方包,导致依赖冲突频繁出现,严重影响项目的稳定性和可维护性。
为了解决这些问题,我尝试了多种集成方法,并找到了一种相对完善的解决方案。该方案的核心思想是通过自定义的加载机制绕过Kettle的OSGI限制,同时对依赖进行精细化管理,避免不必要的冲突。
集成策略
这种集成方式是基于了解Springboot的类加载以及Kettle的类加载方式,找到的一种能够两者都兼容起来的加载方式。
Spring Boot的类加载机制基于JDK的类加载器层级结构,通过LaunchedURLClassLoader
支持嵌套JAR文件的加载,并优化了类加载顺序,避免依赖冲突。而Kettle的OSGI机制则通过动态加载和隔离插件模块,实现灵活的功能扩展。传统集成方式中,Spring Boot的类加载器与Kettle的OSGI机制相互冲突,导致部分组件无法正常加载。
因此分析得出Springboot的类加载是自己实现的一套基于LaunchedURLClassLoader的加载,而Kettle的类加载主要是依赖的系统的类加载器。这样的情况下我们可以采用替换系统类加载器的方式实现一套自定义的类加载器对kettle的类加载进行单独处理。
技术细节
这里我们实现一个自定义的类加载器,让他继承URLClassLoader这个加载器,重新实现这个类加载器中的一些方法。
在实现中让类加载单独加载Kettle下的lib和classes目录。
实现这个自定义的类加载器以后,我们还要在启动脚本中添加一些参数,用来替换系统的类加载器
java -Djava.system.class.loader=custloader.CustUrlClassloader -cp ./custloader-0.0.1-SNAPSHOT.jar;“${springboo启动jar包}” -Dloader.path=./config,./lib
至于kettle和Springboot之间的交互,还需要定义一套交互接口,在Springboot启动的时候通过反射的方式来启动Kettle,需要写一个第三方的代理包用于实现交互接口进行Springboot和Kettle的交互实现。下面是部分关键代码
public void initStart(String[] args,java.util.Map<String,String> databaseinfo) {
ClassLoader classLoader=ClassLoader.getSystemClassLoader();//获取系统加载器
System.out.println(classLoader.getClass().getName());
if(classLoader.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) {
String tip_home = System.getProperty(KETTLE_HOME);
String libsPath = tip_home + File.separator + "lib";
String classesPath=tip_home + File.separator + "classes";
File classesFile=new File(classesPath);
if(classesFile.exists()) {
System.out.println("load classes libraries from: " + classesPath);
//LoadJarsHelper.loadClasses(classesPath);
LoadJarsHelper.loadClasses(classesPath);
}
System.out.println("load libs-jars from: " + libsPath);
//LoadJarsHelper.loadJars(libsPath);
File libsPathFile=new File(libsPath);
if(libsPathFile.exists()) {
LoadJarsHelper.loadJars(libsPath);
}
String os = System.getProperty("os.name");
String bit=System.getProperty("sun.arch.data.model");
String swtpath=tip_home+File.separator +"libswt/win64";
if(os.toLowerCase().startsWith("win")){
if(bit.equalsIgnoreCase("32")) {
swtpath=tip_home+File.separator +"libswt/win32";
}else {
swtpath=tip_home+File.separator +"libswt/win64";
}
}else if (os.startsWith("Mac OS")){
swtpath=tip_home+File.separator +"libswt/osx64";
}else {
if(bit.equalsIgnoreCase("32")) {
swtpath=tip_home+File.separator +"libswt/linux/x86";
}else {
swtpath=tip_home+File.separator +"libswt/linux/x86_64";
}
}
System.out.println("load swt from: " + swtpath);
File swtpathFile=new File(swtpath);
if(swtpathFile.exists()) {
LoadJarsHelper.loadJars(swtpath);
}
}
Thread kettleThread=new Thread(new Runnable() {
public void run() {
try {
Thread.currentThread().setContextClassLoader(classLoader);
Class<?> clazz = classLoader.loadClass("xx.xx.xx.KettleService");
IKettleService instance = (IKettleService) clazz.newInstance();
Method mainMethod = clazz.getMethod("start", String[].class,Map.class);
if(args.length>1) {
List<String> objs=new ArrayList<String>();
for(int i=1;i<args.length;i++) {
if(i==1) {
if(!args[i].equals("")) {
objs.add(args[i]);
}
}else {
objs.add(args[i]);
}
}
String[] o=objs.toArray(new String[objs.size()]);
mainMethod.invoke(instance, (Object)o,databaseinfo);
}else {
String[] objs=new String[0];
mainMethod.invoke(instance, (Object)objs,databaseinfo);
}
KettleService.setDatabaseManager(instance.getDatabaseManager());//接口实现
KettleService.setJobManager(instance.getJobManager());
KettleService.setMetaManager(instance.getMetaManager());
KettleService.setModelManager(instance.getModelManager());
KettleService.setRepositoryDirectoryManager(instance.getRepositoryDirectoryManager());
KettleService.setTransManager(instance.getTransManager());
KettleService.setLogManager(instance.getLogManager());
KettleService.setJmxService(instance);
KettleService.addLogListener();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NoSuchMethodException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
kettleThread.start();
}
小结
通过上述技术细节,我成功实现了一种兼容Spring Boot和Kettle的类加载机制。这种集成方式不仅解决了传统依赖集成的诸多弊端,还实现了Kettle功能的完整扩展,为Spring Boot项目中的数据处理提供了更灵活、高效的解决方案。