Spring Boot 集成 Kettle

news2024/11/17 22:14:49

Kettle 简介

Kettle 最初由 Matt Casters 开发,是 Pentaho 数据集成平台的一部分。它提供了一个用户友好的界面和丰富的功能集,使用户能够轻松地设计、执行和监控 ETL 任务。Kettle 通过其强大的功能和灵活性,帮助企业高效地处理大规模数据集成任务。

主要组成部分
  1. Spoon
    • 用途:Spoon 是 Kettle 的图形化设计工具。用户可以使用 Spoon 设计和调试 ETL 转换和作业。
    • 功能:拖放式界面、预览数据、测试 ETL 流程、管理连接、编写脚本等。
  2. Pan
    • 用途:Pan 是一个命令行工具,用于执行由 Spoon 设计的 ETL 转换。
    • 功能:通过命令行执行转换、调度作业、集成到其他自动化流程中。
  3. Kitchen
    • 用途:Kitchen 是一个命令行工具,用于执行由 Spoon 设计的 ETL 作业。
    • 功能:通过命令行执行作业、调度作业、集成到其他自动化流程中。
  4. Carte
    • 用途:Carte 是一个轻量级的 Web 服务器,提供远程执行和监控功能。
    • 功能:远程执行和监控 ETL 转换和作业、查看日志、管理集群等。
  5. Repositories
    • 用途:存储和管理 ETL 转换和作业的地方。
    • 功能:可以使用数据库或文件系统作为存储库,支持版本控制和共享。
主要功能和特点
  1. 数据提取

    • 支持多种数据源,如关系数据库、文件(CSV、Excel、XML 等)、大数据平台(Hadoop、Hive 等)、云存储(Amazon S3、Google Drive 等)、Web 服务和 API 等。
  2. 数据转换

    • 丰富的转换步骤,包括数据清洗、数据聚合、数据过滤、数据排序、数据连接、数据拆分、数据类型转换等。
  3. 数据加载

    • 支持将数据加载到多种目标系统中,如关系数据库、大数据平台、文件系统、云存储等。
  4. 调度和自动化

    • 支持通过命令行工具(Pan 和 Kitchen)和调度器(如 cron 或 Windows 任务计划)进行调度和自动化执行。
  5. 扩展性

    • 提供了插件机制,用户可以编写自定义插件,扩展 Kettle 的功能。
    • 支持 JavaScript 和 Java 进行脚本编写,增强转换和作业的灵活性。
  6. 集群和并行处理

    • 支持集群模式,能够在分布式环境中并行处理大规模数据。
    • 提供了分布式 ETL 执行和负载均衡功能。
  7. 数据质量和数据治理

    • 提供了数据验证、数据一致性检查和数据校验功能,帮助确保数据的质量和一致性。
  8. 实时数据处理

    • 支持实时数据流处理,通过集成 Kafka、MQTT 等流处理平台,实现实时数据的提取、转换和加载。

集成 Kettle

将 Kettle(Pentaho Data Integration, PDI)集成到 Spring Boot 项目中,可以实现 ETL 流程的自动化和集成化处理。以下是详细的集成过程:

准备工作
  1. 下载 Kettle:从 Pentaho 官网下载 Kettle(PDI)的最新版本,并解压到本地目录。
  2. Spring Boot 项目:确保已有一个 Spring Boot 项目,或新建一个 Spring Boot 项目。
引入 Kettle 依赖

在 Spring Boot 项目的 pom.xml 文件中添加 Kettle 所需的依赖。你可以将 Kettle 的 JAR 文件添加到本地 Maven 仓库,或直接在项目中引入这些 JAR 文件。

<dependencies>
    <!-- Spring Boot 依赖 -->

    <!-- Kettle 依赖 -->
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-core</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-engine</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-dbdialog</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-vfs2</artifactId>
        <version>2.7.0</version>
    </dependency>
    <!-- 根据需要添加其他 Kettle 依赖 -->
    
    <!-- 操作数据库数据时添加相应的数据库依赖 -->
    
</dependencies>
处理密码加密

resources 目录下创建 kettle-password-encoder-plugins.xml 文件,用于配置密码加密插件:

<password-encoder-plugins>

    <password-encoder-plugin id="Kettle">
        <description>Kettle Password Encoder</description>
        <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
    </password-encoder-plugin>

</password-encoder-plugins>

kettle-core依赖中org.pentaho.support.encryption.KettleTwoWayPasswordEncoder类实现了TwoWayPasswordEncoderInterface接口,用于处理密码的加密和解密操作。

添加 Spoon 的任务文件

在 Kettle(Pentaho Data Integration,PDI)中,作业(Job)和转换(Transformation)是两种核心的 ETL 组件,它们在设计和功能上有着本质的区别。

转换(Transformation)
  1. 数据处理流程:转换是一个数据处理流程,专注于数据的提取(Extract)、转换(Transform)和加载(Load)。
  2. 行级处理:转换以行级处理数据,每次处理一行数据,并将其传递给下一步骤。
  3. 任务文件为.ktr文件。
作业(Job)
  1. 任务管理和控制流程:作业是一个任务管理和控制流程,负责调度和控制一系列任务的执行顺序。
  2. 步骤级处理:作业以步骤为单位处理任务,每次执行一个步骤,然后根据条件决定执行下一个步骤。
  3. 任务文件为.kjb文件。
区别
  1. 转换处理数据行,作业处理任务步骤。
  2. 转换中的步骤是并行执行的,而作业中的步骤是顺序执行的。
  3. 转换侧重于数据的处理和转换,作业侧重于任务的调度和管理。
  4. 转换主要通过数据流控制,作业提供了丰富的逻辑控制(条件判断、循环、错误处理等)。
  5. 转换适用于复杂的数据处理流程,作业适用于任务调度和控制。

在 Spring Boot 项目的 resources 目录下,创建一个 kettle 目录,并将 Kettle 的任务文件(如 转换1.ktr)复制到该目录中。

编写 Kettle 服务类

创建一个服务类,用于执行 Kettle 转换或作业。

package com.example.kettletest.service.impl;

import com.example.kettletest.service.KettleJobService;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;

/**
 * @author 罗森
 * @date 2024/6/6 13:21
 */
@Service
public class KettleJobServiceImpl implements KettleJobService {
    @Override
    public void runTaskFile(String taskFileName) {
        // 初始化 Kettle 环境
        try {
            KettleEnvironment.init();
            EnvUtil.environmentInit();
        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
        // 执行任务文件
        if (taskFileName.endsWith(".ktr")) {
            taskFileKTR(taskFileName);
        } else if (taskFileName.endsWith(".kjb")) {
            taskFileKJB(taskFileName);
        } else {
            throw new IllegalArgumentException("Unsupported file type: " + taskFileName);
        }
    }

    /**
     * 针对kjb文件的操作
     * @param taskFileName
     */
    public void taskFileKJB(String taskFileName) {
        try {
            // 获取资源文件路径
            ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
            File jobFile = resource.getFile();
            // 加载 KJB 文件
            JobMeta jobMeta = new JobMeta(jobFile.getAbsolutePath(), null);
            // 创建作业对象
            Job job = new Job(null, jobMeta);
            // 启动作业
            job.start();
            // 等待作业完成
            job.waitUntilFinished();

            if (job.getErrors() > 0) {
                System.out.println("There were errors during job execution.");
            } else {
                System.out.println("Job executed successfully.");
            }
        } catch (IOException | KettleXMLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 针对ktr文件的操作
     * @param taskFileName
     */
    public void taskFileKTR(String taskFileName) {
        try {
            // 获取资源文件路径
            ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
            File transFile = resource.getFile();
            // 加载 KTR 文件
            TransMeta transMeta = new TransMeta(transFile.getAbsolutePath());
            // 创建转换对象
            Trans trans = new Trans(transMeta);
            // 启动作业
            trans.execute(null);
            // 等待作业完成
            trans.waitUntilFinished();

            if (trans.getErrors() > 0) {
                System.err.println("There were errors during Transformation execution.");
            } else {
                System.out.println("Transformation executed successfully!");
            }
        } catch (IOException | KettleException e) {
            e.printStackTrace();
        }
    }
}

常见问题解决办法

  1. 运行后报错信息为:Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath.

    **解决办法:**在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件。

  2. 运行后报错信息为:ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : A serious error occurred during job execution: 无法找到作业的开始点.

    **解决办法:**为Spoon制作的作业任务增加开始节点。

  3. 运行后报错信息为:Can't run transformation due to plugin missing.

    **解决办法:**此问题通常出现在涉及类似于导出excel文件、json文件时。在初始化 Kettle 环境之前指明相关插件的绝对路径(相关插件通常在Kettle本地解压文件夹中的plugins目录下),新增以下代码:

    StepPluginType.getInstance().getPluginFolders().add(new PluginFolder("E:\\Kettle\\pdi-ce-9.4.0.0-343\\data-integration\\plugins", false, true));
    

    将代码中的地址换成您本地的绝对地址。


(END)
by luosen.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242433.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes 10 问,测测你对 k8s 的理解程度

Kubernetes 10 问 假设集群有 2 个 node 节点&#xff0c;其中一个有 pod&#xff0c;另一个则没有&#xff0c;那么新的 pod 会被调度到哪个节点上&#xff1f; 应用程序通过容器的形式运行&#xff0c;如果 OOM&#xff08;Out-of-Memory&#xff09;了&#xff0c;是容器重…

Java项目实战II基于微信小程序的课堂助手(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在数字化教…

高光谱深度学习调研

综述 高光谱深度学习只有小综述&#xff0c;没有大综述。小综述里面场景分类、目标检测的综述比较多。 Wang C, Liu B, Liu L, et al. A review of deep learning used in the hyperspectral image analysis for agriculture[J]. Artificial Intelligence Review, 2021, 54(7)…

抖音热门素材去哪找?优质抖音视频素材网站推荐!

是不是和我一样&#xff0c;刷抖音刷到停不下来&#xff1f;越来越多的朋友希望在抖音上创作出爆款视频&#xff0c;但苦于没有好素材。今天就来推荐几个超级实用的抖音视频素材网站&#xff0c;让你的视频内容立刻变得高大上&#xff01;这篇满是干货&#xff0c;直接上重点&a…

人工智能之数学基础:数学在人工智能领域中的地位

人工智能&#xff08;AI&#xff09;是一种新兴的技术&#xff0c;它的目标是构建能够像人类一样思考、学习、推理和解决问题的智能机器。AI已经成为了许多行业的重要组成部分&#xff0c;包括医疗、金融、交通、教育等。而数学则是AI领域中不可或缺的基础学科。本文将阐述数学…

高翔【自动驾驶与机器人中的SLAM技术】学习笔记(十三)图优化SLAM的本质

一、直白解释slam与图优化的结合 我从b站上学习理解的这个概念。 视频的大概位置是1个小时以后&#xff0c;在第75min到80min之间。图优化SLAM是怎么一回事。 slam本身是有运动方程的&#xff0c;也就是运动状态递推方程&#xff0c;也就是预测过程。通过t1时刻&#xff0c…

STM32单片机设计防儿童人员误锁/滞留车内警报系统

目录 目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 1.电路图采用Altium Designer进行设计&#xff1a; 2.实物展示图片 三、程序源代码设计 四、获取资料内容 前言 近年来在车辆逐渐普及的情况下&#xff0c;由于家长的疏忽&#xff0c;将…

stm32——通用定时器时钟知识点

&#xff08;该图来自小破站 铁头山羊老师的stm32标准库教学&#xff09;

GPIO相关的寄存器(重要)

目录 一、GPIO相关寄存器概述 二、整体介绍 三、详细介绍 1、端口配置低寄存器&#xff08;GPIOx_CRL&#xff09;&#xff08;xA...E&#xff09; 2、端口配置高寄存器&#xff08;GPIOx_CRH&#xff09;&#xff08;xA...E&#xff09; 3、端口输入数据寄存器&#xff…

CSS基础也要进行模电实验

盒子阴影 圆角边框已经介绍过哩&#xff0c;现在先介绍一下盒子阴影的效果如何实现 CSS3中新增了盒子阴影&#xff0c;可以使用box-shadow属性为盒子添加阴影 这是固定的语法&#xff1a; text-shadow: h-shadow v-shadow blur color; 它有这些可选的值&#xff1a; 哦。 …

i春秋-登陆(sql盲注爆字段,.git缓存利用)

练习平台地址 竞赛中心 题目描述 先登陆再说 题目内容 就是一个登录框 测试登录 用户名&#xff1a;admin or 11# 密码&#xff1a;随便输 返回密码错误 用户名&#xff1a;随便输 密码&#xff1a;随便输 返回用户名不存在 这里就可以确定时一个bool盲注了 这里提供一个lik…

探索KubeVirt:如何利用InfiniBand提升虚拟机性能

在高性能计算&#xff08;HPC&#xff09;中&#xff0c;网络性能对于集群效率起着至关重要的作用。为了支持大规模并行计算&#xff0c;HPC集群通常依赖高带宽、低延迟的网络&#xff0c;而InfiniBand&#xff08;IB&#xff09;正是其中的首选技术。它能够提供超过100Gbps的带…

基于树莓派的边缘端 AI 目标检测、目标跟踪、姿态估计 视频分析推理 加速方案:Hailo with ultralytics YOLOv8 YOLOv11

文件大纲 加速原理硬件安装软件安装基本设置系统升级docker 方案Demo 测试目标检测姿态估计视频分析参考文献前序树莓派文章hailo加速原理 Hailo 发布的 Raspberry Pi AI kit 加速原理,有几篇文章介绍的不错 https://ubuntu.com/blog/hackers-guide-to-the-raspberry-pi-ai-ki…

小白进!QMK 键盘新手入门指南

经常玩键盘的伙伴应该都知道&#xff0c;现在的键盘市场可谓是百花齐放&#xff0c;已经不是之前的单一功能产品化时代。我们可以看到很多诸如&#xff1a;机械轴键盘、磁轴键盘、光轴键盘、电感轴键盘&#xff0c;以及可能会上市的光磁轴键盘&#xff0c;更有支持屏幕的、带旋…

《操作系统 - 清华大学》3 -3:连续内存分配:内存碎片与分区的动态分配

文章目录 0. 概述1. 内存碎片问题2. 动态分配3. 首次适配算法4. 最优适配算法5. 最差适配算法 0. 概述 内存分配是操作系统管理过程中很重要的环节&#xff0c;首先需要考虑的是一块连续区域分配的过程&#xff0c;这个过程中会有很多问题&#xff0c;首先比较关注的一个问题是…

vue内置指令和自定义指令

常见的指令&#xff1a; v-bind : 单向绑定解析表达式, 可简写为 :xxx v-model : 双向数据绑定 v-for : 遍历数组/对象/字符串 v-on : 绑定事件监听, 可简…

蓝桥杯备赛(持续更新)

16届蓝桥杯算法类知识图谱.pdf 1. 格式打印 %03d&#xff1a;如果是两位数&#xff0c;将会在前面添上一位0 %.2f&#xff1a;会保留两位小数 如果是long&#xff0c;必须在数字后面加上L。 2. 进制转化 2.1. 十进制转任意进制&#xff1a; 十进制转任意进制时&#xff…

vue 项目使用 nginx 部署

前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…

TensorFlow 2.0 环境配置

官方文档&#xff1a;CUDA Installation Guide for Windows 官方文档有坑&#xff0c;windows的安装指南直接复制了linux的指南内容&#xff1a;忽略这些离谱的信息即可。 可以从官方文档知悉&#xff0c;cuda依赖特定版本的C编译器。但是我懒得为了一个编译器就下载整个visua…

【计算机网络】【传输层】【习题】

计算机网络-传输层-习题 文章目录 10. 图 5-29 给出了 TCP 连接建立的三次握手与连接释放的四次握手过程。根据 TCP 协议的工作原理&#xff0c;请填写图 5-29 中 ①~⑧ 位置的序号值。答案技巧 注&#xff1a;本文基于《计算机网络》&#xff08;第5版&#xff09;吴功宜、吴英…