初识Kettle插件

news2024/9/23 17:14:54

问题反馈

如有问题可通过微信公众号“假装正经的程序员”反馈

前言

由于kettle设计的特殊性,kettle的处理流程均是通过插件组装的形式来进行工作,因此kettle插件开发是目前kettle二次开发的核心内容。

插件类型

  1. 转换步骤插件:在kettle转换中使用的步骤,用来处理数据行;
  2. 作业项插件:在kettle作业中使用的作业项,用来实现某个任务;
  3. 分区方法插件:利用输入字段的值指定自己的区分规则;
  4. 数据库类型插件:用来扩展不同的数据库类型;
  5. 资源库类型插件:可以把kettle元数据保存为自定义类型或格式;

插件id

这是一个字符串数组,用来唯一标识一个插件。因为旧的插件可以被新的插件代替,一个插件可以有多个ID。再大多数情况下,插件只使用一个单一的字符串,如TableInput是“表输入”步骤的ID,MYSQL是MYSQL数据类型的ID。

内部插件加载

当kettle环境初始化以后,插件注册系统首先加载所有的内部对象,kettle读取下面的配置文件来加载内部对象,这些配置文件位于Kettle的.jar文件中。

  1. kettle-steps.xml:内部转换步骤
  2. kettle-job-entries.xml:内部作业项
  3. kettle-partition-plugins.xml:内部分区类型
  4. kettle-database-types.xml:内部数据库类型
  5. kettle-repositories.xml:内部资源库类型

外部插件加载

插件注册系统加载了所有的内部对象后,就要搜索可用的外部插件。通过浏览plugins/目录的各个子目录下的.jar文件来完成。它搜索特定的Kettle annotations来判断一个类是否是插件。

因为在内部对象加载后才加载插件,所以插件会替代相同ID的已加载的内部对象。例如,你创建了插件,插件的ID是TableInput,就可以替换Kettle标准的“表输入”步骤。这个功能可以让你用插件替换Kettle内置的步骤。可以通过子类继承的方式,直接扩展已有步骤的某些功能。

转换步骤插件

转换步骤插件包括了四个Java类,它们分别实现四个接口

  1. StepMetaInterface:对外提供步骤的元数据并处理串行化;
  2. StepInterface:根据上面接口提供的元数据来实现步骤的具体功能;
  3. StepDataInterface:用来存储步骤的临时数据、文件句柄等;
  4. StepDialogInterface:Spoon里的图形界面,用来编辑步骤的元数据;

上述接口均匀对于的Base类实现了大部分功能,如BaseStepMeta、BaseStep、BaseStepData、BaseStepDialog。

StepMetaInterface

负责步骤里所有和元数据相关的任务。

基础方法

和元数据相关的工作包括:

元数据和XML(或资源库)之间的序列化和反序列化
    getXML()和loadXML()
    saveRep()和readRep()

描述输出字段
    getFields()

检查元数据是否正确
    check()

获取步骤相应的SQL语句,使步骤可以正确运行
    getSQLStatements()

给元数据设置默认值
    setDefault()

完成对数据库的影响分析
    analyseImpact()

描述各类输入和输出流
    getStepIOMeta()
    searchInfoAndTargetSteps()
    handleStreamSelection()
    getOptionalStreams()
    resetStepIoMeta()

导出元数据资源
    exportResources()
    getResourceDependencies()

描述使用的库
    getUsedLibraries()

描述使用的数据库连接
    getUsedDatabaseConnections()

描述这个步骤需要的字段(通常是一个数据库表)
    getRequiredFields()

描述步骤是否具有某些功能
    supportsErrorHandling()
    excludeFromRowLayoutVerification()
    excludeFromCopyDistributeVerification()

高级方法

这个接口还定义了几个方法来说明这四个接口如何结合到一起

String getDialogClassName():用来描述实现了StepDialogInterface接口的对话框类的名字。如果这个方法返回了null,调用类会根据实现了StepMetaInterface接口的类的类名和包名来自动生成对话框类的名字。

StepInterface getStep():创建一个实现了StepDataInterface接口的类。

StepDataInterface getStepData():创建一个实现了StepDataInterface接口的类。

注册方式

kettle-steps.xml

在kettle的原生插件中通过kettle-engine模块resources下目录下的kettle-steps.xml文件注册

注册内容如下

<step id="Normaliser"> <description>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.RowNormaliser</description> <classname>org.pentaho.di.trans.steps.normaliser.NormaliserMeta</classname> <category>i18n:org.pentaho.di.trans.step:BaseStep.Category.Transform</category> <tooltip>i18n:org.pentaho.di.trans.step:BaseStep.TypeTooltipDesc.RowNormaliser</tooltip> <iconfile>ui/images/NRM.svg</iconfile> <documentation_url>Products/Row_Normaliser</documentation_url> <cases_url/> <forum_url/> </step>

@Step

除了通过kettle-steps.xml的方式注册插件外,kettle还提供了更灵活的方式来注册插件,即通过@Step注解的方式。

kettle-steps.xml适合基于原生kettle代码改造的情况下进行内置插件处理,而@Step则更适合外置插件的处理。

@Step(id = "RowDenormaliser",name = "RowDenormaliser.name",
        categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Transform",
        image= "HelloWorld.png",
        i18nPackageName="com.xxx.plugin.trans.steps.rowdenomaliser")
public class RowDenormaliserStepMeta extends BaseStepMeta implements StepMetaInterface {
    private static Class<?> PKG = RowDenormaliserStep.class; //for i18n
//TODO 
}

这段代码中的@Step注解是用来通知Kettle的插件系统的:这个类是一个步骤类型的插件。在该注解中可以指定插件的ID、图标、国际化的包、本地化的名称、类别、描述。其中后三项是资源文件里的Key,需要在资源文件里设置真正的值。i18nPackageName指定了资源文件的包名,即我们的资源文件在xxx.plugin.trans.steps.rowdenomaliser包下面。

StepDataInterface

用来维护步骤的执行状态,以及存储临时对象。例如,把输入行的元数据、数据库连接、输入输出流等存储在这个对象里。

StepDialogInterface

用来提供一个用户界面,用户通过这个界面输入元数据(转换参数)。用户界面就是一个对话框。这个接口里面包含了类型open()和setRepository()等几个简单的方法。

StepInterface

这个类读取上个步骤传来的数据行,利用StepMetaInterface对象定义的元数据,逐行转换和处理上个步骤传来的数据。Kettle引擎直接使用这个接口里的很多方法执行转换过程,但大部分方法都已经由BaseStep类实现了。

常见需要重载方法

通常开发人员只需要重载其中的几个方法。

  • init():步骤初始化方法,用来初始化一个步骤。初始化的结果是一个true或者false的Boolean值。如果你的步骤没有任何初始化的工作,可以不用重载这个方法。
  • dispose():如果有需要释放的资源,可以在dispose()方法里释放,例如可以关闭数据库连接、释放文件、清除缓存等。在转换的最后Kettle引擎会调用这个方法。如果没有需要释放或清除的资源,可以不用重载这个方法。
  • processRow():这个方法,是步骤实际工作的地方。只要这个方法返回true,转换引擎就会重复调用这个方法。

常见功能

从上个步骤获取一行数据
  • getRows():该方法从上一个步骤获取一行数据,如果没有更多要获取的数据行,这个方法就会返回null。如果前面的步骤不能及时提供数据,这个方法就会阻塞,直到有可用的数据行。这样这个步骤的速度就会降低,也会影响到转换里的其他步骤的速度。从性能上考虑,该方法不提供数据行的元数据,只提供上个步骤的输出的数据。可以使用getInputRowMeta()方法获取元数据,元数据只获取一次即可,所以在first代码块里获取元数据。
告知其他步骤无输出数据
  • setOutputDone():用来通知其他的步骤,本步骤已经没有输出数据行。下个步骤如果再调用getRow()方法就会返回null,转换也不再调用processRow()方法。
数据传递到下个步骤
  • putRow():如果要把数据传递到下一个步骤,要使用该方法。除了输出数据,还要输出RowMetaInterface元数据。构造输出行的元数据结构只能构造一次,因为所有输出数据行的结构都是一样的,产生了输出行以后,元数据结构就不能再变化。所以输出行的元数据结构也在first代码块里构造。
String value="demonstrate putRow"
Object[] outputRowData = RowDataUtil.allocateRowData(row,data.outputRowMeta.size(),value);
putRow(data.outputRowMeta, outputRowData);
从指定步骤读取数据行
  • getRowFrom():如果你想从前面的某个特定的步骤读取数据行,则可以使用该方法
RowSet inputRowSet = findInputRowSet("自定义行转列");
Object[] rowFrom = getRowFrom(inputRowSet);
logBasic("-------stepName:{0},rowFrom:{1}","自定义行转列",JacksonUtil.object2Json(rowFrom));

从上面这个方法能够看出,整个转换都是以一行一行数据来的,所以在单个步骤内获取的数据与之前步骤的数据是存在关联性的。

特定步骤中写入数据行
  • putRowTo():如果你想把数据写入到某个特定的步骤,则可以使用该方法
RowSet inputRowSet = findOutputRowSet("自定义行转列");
putRowTo(getInputRowMeta(),r,inputRowSet);

从本方法和上面的方法可以看出,输入和输出的RowSet对象只需要各获得一次即可,这样才更有效率。

错误处理步骤写入数据行
  • putError():如果你想让你的步骤支持错误处理,而且元数据类返回的supportErrorHandling()方法返回了true,就可用把数据输出到错误处理步骤里。
try{
    ……
} catch (KettleException e) {
    if (getStepMeta().isDoingErrorHandling()) {
        errorMessage = e.toString();
        putError(getInputRowMeta(), r, 1, errorMessage, errorFieldName, "ISU001");
   }else{
        logError(BaseMessages.getString(PKG, "InsertUpdate.Log.ErrorInStep"), e);
        setErrors(1);
        stopAll();
        setOutputDone(); // signal end to receiver(s)
        return false; 
   }
}

从这个例子可以看到,这段代码把错误的行数、错误字段名、消息错误编码都传递给错误步骤,错误处理的其他工作都自动完成了。

识别一个步骤拷贝

因为一个步骤可以有多份拷贝同时执行,有时需要识别出正在使用的是哪个步骤拷贝,可以用下面几个方法

  • getCopy():获取拷贝号。拷贝号可以唯一标识出步骤的一个拷贝,拷贝号的取值范围是0-N,N=getStepMeta().getCopies()-1。
  • getUniqueStepNrAcrossSlaves():获得在集群模式下运行的步骤拷贝号。
  • getUniqueStepCountAcrossSlaves():获取在集群模式下运行的步骤拷贝总数。

通过这些方法可以把一个步骤的工作分配给多份拷贝去完成。例如“CSV文件输入”和“固定宽度文件输入”步骤里都有并行读取文件的选项,这样可以把读取文件的工作放在多个拷贝里或集群里来完成。

结果反馈

在调用getRow()和putRow()方法时,引擎会自动计算两类度量值,读行数和写行数。这两类度量值可以在界面或日志中记录下来,以监控程序运行的状态。下面几个方法用来操作这两类度量值。

  • incrementLinesRead():增加从前面步骤读取到的行数。
  • incrementLinesWritten():增加写入到后面步骤的行数。
  • incrementLinesInput():增加从文件、数据库、网络等资源读取到的行数。
  • incrementLinesOutput():增加写入到文件、数据库、网络等资源的行数。
  • incrementLinesUpdated():增加更新的行数。
  • incrementLinesSkipped():增加跳过的数据行的行数。
  • incrementLinesRejected():增加拒绝的数据行的行数。

这些度量值用来说明步骤执行的情况。可以在Spoon的转换度量面板里看到,也可以存到日志数据库表里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1906438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索TXE、TC、RXNE标志位在串口通信中的轮询与中断应用

浅谈一下STM32串口中断之TXE,TC,RXNE标志位 之前做一个项目&#xff0c;用到了串口中断&#xff0c;但是对TXE、TC和RXNE标志位的作用和使用方法不是很清楚&#xff0c;导致在调试过程中遇到了一些问题。通过查阅相关资料和实际操作&#xff0c;我对这三个标志位有了更深入的了…

GD 32中断系统实现

1.0 中断的概念 中断&#xff1a;简单来说就是打断的意思&#xff0c;在计算机系统中CPU在执行一个操作的时候&#xff0c;有一个比当前任务更为紧急的任务需要执行,cpu暂停当前任务转而去执行更为紧急任务的操作&#xff0c;执行完更为紧急任务之后再返回来执行原来未执行完的…

FullCalendar日历组件集成实战(18)

背景 有一些应用系统或应用功能&#xff0c;如日程管理、任务管理需要使用到日历组件。虽然Element Plus也提供了日历组件&#xff0c;但功能比较简单&#xff0c;用来做数据展现勉强可用。但如果需要进行复杂的数据展示&#xff0c;以及互动操作如通过点击添加事件&#xff0…

两年经验前端带你重学前端框架必会的ajax+node.js+webpack+git等技术 Day2

前端框架必会的&#xff08;ajaxnode.jswebpackgit&#xff09;个人学习心得作业及bug记录 Day2 你好,我是Qiuner. 为帮助别人少走弯路和记录自己编程学习过程而写博客 这是我的 github https://github.com/Qiuner ⭐️ ​ gitee https://gitee.com/Qiuner &#x1f339; 如果本…

LDR6282-显示器:从技术革新到视觉盛宴

显示器&#xff0c;作为我们日常工作和娱乐生活中不可或缺的一部分&#xff0c;承载着将虚拟世界呈现为现实图像的重要使命。它不仅是我们与电子设备交互的桥梁&#xff0c;更是我们感知信息、享受视觉盛宴的重要窗口。显示器在各个领域的应用也越来越广泛。在办公领域&#xf…

物料主数据BAPI 无法写入扩展(增强)字段问题

在使用BAPI_MATERIAL_SAVEDATA 去创建物料时&#xff0c;因为有增强字段。这时候需要通过extensionin 字段 进行赋值。 https://community.sap.com/t5/application-development-discussions/bapi-material-savedata-extensionin-dec-type-dump/m-p/11760099 但是赋值后仍然没…

不用SMTP实现联系表单提交后发信到指定邮箱

不使用SMTP&#xff0c;如何确保联系表单能安全发送至指定邮箱&#xff1f; 尽管SMTP是实现电子邮件发送的常用方法&#xff0c;但有时我们可能希望绕过SMTP&#xff0c;直接通过其他方式发送邮件到指定邮箱。AokSend将探讨如何在不用SMTP的情况下&#xff0c;实现联系表单提交…

基于mmap的读写工具封装案例

文章目录 注意事项C封装示例添加构造函数重载以支持追加模式支持文件大小动态变化异常安全性和资源泄漏预防提供更高级的数据访问接口示例代码改进 在很多高性能应用中&#xff0c;直接使用内存映射文件&#xff08;mmap&#xff09;进行文件的读写操作可以显著提高效率&#x…

【C++】stack和queue的模拟实现 双端队列deque的介绍

&#x1f525;个人主页&#xff1a; Forcible Bug Maker &#x1f525;专栏&#xff1a; STL || C 目录 &#x1f308;前言&#x1f525;stack的模拟实现&#x1f525;queue的模拟实现&#x1f525;deque&#xff08;双端队列&#xff09;deque的缺陷 &#x1f308;为什么选择…

从无计划到项目管理高手,只需避开这两大误区!

在项目管理的过程中&#xff0c;制定计划是不可或缺的一环。然而&#xff0c;在实践中&#xff0c;我们往往会遇到两种常见的误区&#xff0c;这些误区不仅阻碍了计划的有效实施&#xff0c;还可能让我们在追求目标的道路上迷失方向。 误区一&#xff1a;认为没有什么可计划的…

充电桩开源平台,开发流程有图有工具

慧哥充电桩开源平台产品研发流程是确保产品从概念阶段到市场推广阶段的有序进行的关键。以下是对您给出的步骤的详细解释和建议&#xff1a; 设计业务流程: 在这一步&#xff0c;团队需要确定产品的核心功能、目标用户以及如何满足用户需求。进行市场调研&#xff0c;了解竞争…

无线网卡怎么连接台式电脑?让上网更便捷!

随着无线网络的普及&#xff0c;越来越多的台式电脑用户希望通过无线网卡连接到互联网。无线网卡为台式电脑提供了无线连接的便利性&#xff0c;避免了有线网络的束缚。本文将详细介绍无线网卡怎么连接台式电脑的四种方法&#xff0c;包括使用USB无线网卡、内置无线网卡以及使用…

探索企业信用巅峰:3A企业认证的魅力与价值

在现代商业环境中&#xff0c;企业的信用和信誉是其发展的核心要素之一。3A企业认证作为信用评级的最高等级&#xff0c;正在吸引越来越多企业的关注。究竟什么是3A企业认证&#xff1f;它为什么对企业如此重要&#xff1f;本文将深入探讨3A企业认证的独特魅力和巨大价值。 3A企…

使用vue3-treeselect问题

1.当vue3-treeselect是单选时&#xff0c;使用watch监听绑定value&#xff0c;无法监听到值清空 对照后将:value改为v-model&#xff0c;如图 2.使用vue3-treeselect全部清空按钮如何置空select的值&#xff0c;使用watch监听 多选&#xff1a;pageInfo.officeName(val) {// …

mybatis中的标签

在MyBatis中&#xff0c;除了基本的SQL映射功能外&#xff0c;还有许多用于动态SQL构建的标签。这些标签允许我们根据不同的条件和需求构建复杂的SQL语句。主要的动态SQL标签包括<if>, <choose>, <when>, <otherwise>, <trim>, <set>, <…

加密与安全_ 解读非对称密钥解决密钥配送问题的四个方案

文章目录 Pre对称密钥的死穴 - 经典的密钥配送问题什么是非对称密钥非对称密钥解决密钥配送问题的四个方案共享密钥密钥分配中心&#xff08;KDC&#xff09;Diffie-Hellman 密钥交换体系公钥密码体系RSA算法 Pre 对称密钥的死穴 - 经典的密钥配送问题 假设 Alice 和 Bob 两个人…

什么是海外仓管理自动化?策略及落地实施步骤指南

作为海外仓的管理者&#xff0c;你每天都面临提高海外仓运营效率、降低成本和满足客户需求的问题。海外仓自动化管理技术为这些问题提供了不错的解决思路&#xff0c;不过和任何新技术一样&#xff0c;从策略到落地实施&#xff0c;都有一个对基础逻辑的认识过程。 今天我们整…

安装 node.js 完整教程

安装 官方下载地址: https://nodejs.org/en/ 下载LTS版本&#xff08;长期稳定版本&#xff09; 安装可以更改安装路径 其余的都是选择 下一步, 安装 安装完成查看 node -v 查看node的版本 npm -v 查看npm的版本(新版的node安装自带安装npm) 配置环境变量 在nodejs文件夹…

关于SQL NOT IN判断失效的情况记录

1.准备测试数据 CREATE TABLE tmp_1 (val integer);CREATE TABLE tmp_2 (val integer, val2 integer);INSERT INTO tmp_1 (val) VALUES (1); INSERT INTO tmp_1 (val) VALUES (2); INSERT INTO tmp_2 (val) VALUES (1); INSERT INTO tmp_2 (val, val2) VALUES (NULL,0);2.测…

基于Address Sanitizer实现Android NDK的内存错误检测DEMO

1.简介 基于Address Sanitizer实现Android NDK的内存错误检测Demo。 ps:适用于Android 13&#xff08;API 级别 33&#xff09;以下的设备&#xff0c;Android 14&#xff08;API 级别 34&#xff09;或更高版本的 ARM64设备推荐使用HWAddress Sanitizer配置更简单。 GitHub源…