使用Apache SeaTunnel高效集成和管理SftpFile数据源

news2024/9/19 13:47:48

file

本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档,旨在帮助读者理解如何高效地使用SFTP文件源连接器,以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。

SftpFile 是指通过 SFTP(Secure File Transfer Protocol)协议进行文件操作的对象或组件。在网络编程和数据集成中,SFTPFile 通常用来表示和操作存储在远程 SFTP 服务器上的文件。SFTP 是一种安全的文件传输协议,基于 SSH(Secure Shell)协议,提供了加密的文件传输和远程文件操作功能。

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

  • 批处理
  • 列投影
  • 并行处理
  • 文件格式类型
    • 文本
    • CSV
    • JSON
    • Excel

描述

从 SFTP 文件服务器读取数据。

支持的数据源信息

使用 SftpFile 连接器,需要以下依赖项。可以通过 install-plugin.sh 下载,也可以从 Maven 中央仓库获取。

数据源支持的版本依赖项
SftpFile通用下载
  • 提示

如果你使用的是 Spark/Flink,请确保 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

如果使用 SeaTunnel 引擎,安装 SeaTunnel 引擎时会自动集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib 目录下检查这个 JAR 包是否存在。

为了支持更多的文件类型,我们做了一些妥协,所以在内部访问 Sftp 时我们使用了 HDFS 协议,这个连接器需要一些 Hadoop 依赖项,且仅支持 Hadoop 版2.9.X+ 版本。

数据类型映射

文件没有特定的类型列表,我们可以通过在配置中指定模式来指示要将哪个 SeaTunnel 数据类型转换为相应的数据。

SeaTunnel 数据类型
STRING
SHORT
INT
BIGINT
BOOLEAN
DOUBLE
DECIMAL
FLOAT
DATE
TIME
TIMESTAMP
BYTES
ARRAY
MAP

Source选项

名称类型必填默认值描述
host字符串-目标 SFTP 主机地址
port整数-目标 SFTP 端口号
user字符串-目标 SFTP 用户名
password字符串-目标 SFTP 密码
path字符串-源文件路径
file_format_type字符串-请查看下文的 #file_format_type
file_filter_pattern字符串-用于文件过滤的过滤器模式。
delimiter字符串\001字段分隔符,用于告诉连接器如何在读取文本文件时切割字段。默认 \001,与 Hive 的默认分隔符相同。
parse_partition_from_path布尔型true控制是否从文件路径中解析分区键和值。
例如,如果从路径中读取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
那么文件中的每条记录将添加这两个字段:
name age
tyrantlucifer 26
提示:不要在模式选项中定义分区字段
date_format字符串yyyy-MM-dd日期类型的格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:
yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd,默认 yyyy-MM-dd
datetime_format字符串yyyy-MM-dd HH:mm:ss日期时间类型的格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:
yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss,默认 yyyy-MM-dd HH:mm:ss
time_format字符串HH:mm:ss时间类型的格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:
HH:mm:ss HH:mm:ss.SSS,默认 HH:mm:ss
skip_header_row_number长整型0跳过前几行,仅对 txt 和 csv 文件有效。
例如,设置如下:
skip_header_row_number = 2
那么 SeaTunnel 将跳过源文件的前两行。
sheet_name字符串-读取工作簿的工作表名称,仅在文件格式为 Excel 时使用。
schema配置项-请查看下文的 #schema
通用选项-Source 插件通用参数,请参考 Source通用选项 获取详细信息。

file_format_type [字符串]

支持以下文件类型: text csv parquet orc json excel 如果将文件类型指定为 json,需要配置 Schema 模式选项,向连接器说明如何解析数据为你需要所需的 Row。 例如:上游数据如下:

{"code":  200, "data":  "get success", "success":  true}

也可以将多个数据保存在一个文件中,并通过换行符进行分隔:

{"code":  200, "data":  "get success", "success":  true}
{"code":  300, "data":  "get failed", "success":  false}

需要按照以下方式配置 Schema:

schema {
    fields {
        code = int
        data = string
        success = boolean
    }
}

连接器将生成以下数据:

codedatasuccess
200获取成功true
如果将文件类型指定为 parquetorc,则无需指定模式选项,连接器可以自动查找上游数据的模式。
如果将文件类型指定为 textcsv,则可以选择是否指定模式信息或不指定。

例如,上游数据如下:

tyrantlucifer#26#male

如果不配置 Schema,Connector 将这样处理上游数据:

如果分配数据模式,则除了 CSV 文件类型外,还应分配选项 delimiter。

内容
tyrantlucifer#26#male
如果配置了数据 Schema,除了CSV文件类型,还需要配置选项分隔符。

需要配置 Schema 和分隔符如下:

delimiter = "#"
schema {
    fields {
        name = string
        age = int
        gender = string 
    }
}

连接器将生成以下数据: | 姓名 | 年龄 | 性别 | |---------------|-----|------| | tyrantlucifer | 26 | 男 |

Schema [配置项]

fields [配置项] 上游数据的 Schema。

如何创建 Sftp 数据同步任务

以下示例演示了如何创建一个数据同步任务,从 Sftp 读取数据并在本地客户端上打印出来:

# 设置要执行的任务的基本配置
env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

# 创建连接到 Sftp 的源
source {
  SftpFile {
    host = "sftp"
    port = 22
    user = seatunnel
    password = pass
    path = "tmp/seatunnel/read/json"
    file_format_type = "json"
    result_table_name = "sftp"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          C_MAP = "map<string, string>"
          C_ARRAY = "array<int>"
          C_STRING = string
          C_BOOLEAN = boolean
          C_TINYINT = tinyint
          C_SMALLINT = smallint
          C_INT = int
          C_BIGINT = bigint
          C_FLOAT = float
          C_DOUBLE = double
          C_BYTES = bytes
          C_DATE = date
          C_DECIMAL = "decimal(38, 18)"
          C_TIMESTAMP = timestamp
        }
      }
    }
  }
}

# 控制台打印读取的 Sftp 数据
sink {
  Console {
    parallelism = 1
  }
}

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2146369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CC工具箱1.3.6更新_免费_120+工具

CC工具箱1.3.6更新【2024.9.11】 使用环境要求&#xff1a;ArcGIS Pro 3.0 一、下载链接 https://pan.baidu.com/s/1OJmO6IPtMfX_vob3bMtvEg?pwduh5r 二、使用方法 1、在下载链接中下载安装文件【CC工具箱1.3.6.esriAddinX】&#xff0c;直接点击安装即可。 2、安装好后打…

诸葛智能助力唐山银行、三峡媒体斩获「数智卓越企业」大奖

近日&#xff0c;2024爱分析第六届数据智能高峰论坛在北京成功举办&#xff0c;旨在深入探讨AI大模型与数据要素在企业智能化转型中的重要作用&#xff0c;分享前沿技术成果与头部企业实践经验。 诸葛智能作为数智化转型的优秀厂商&#xff0c;携手同样走在行业转型前沿的先锋…

网络爬虫的最佳实践:结合 set_time_limit() 与 setTrafficLimit() 抓取云盘数据

在数据爬取领域&#xff0c;百度云盘作为国内领先的云存储服务平台&#xff0c;拥有海量的用户和数据资源。因此&#xff0c;对于技术开发者而言&#xff0c;如何高效、稳定地对百度云盘中的公开资源进行数据抓取成为了一个值得探讨的课题。本文将结合 PHP 的 set_time_limit()…

PVE虚拟机被锁定locked解决方法

打开pve节点的shell&#xff0c;执行以下命令 qm unlock <VMID> 示例&#xff1a; qm unlock 112

Mobile net V系列详解 理论+实战(1)

Mobilenet 系列 论文精讲部分0.摘要1. 引文2. 引文3. MobileNet 模型架构3.0 卷积个人理解3.1 深度可分离卷积3.2 网络结构和训练3.3 宽度乘数&#xff1a;更细的模型 α3.4 分辨率乘数&#xff1a;降低表示的维度ρ 4. 实验4.1 模型选择4.2. 模型缩减超参数4.3. 细粒度识别4.4…

YOLOv9改进策略【卷积层】| HWD,引入`Haar小波变换`到下采样模块中,减少信息丢失

一、本文介绍 本文记录的是利用Haar小波下采样对YOLOv9网络进行改进的方法研究。传统的卷积神经网络中常用的最大池化、平均池化和步长为2的卷积等操作进行下采样可能会导致信息丢失&#xff0c;为了解决信息丢失问题&#xff0c;HWD作者受无损信息变换方法的启发&#xff0c;…

python本地进程通讯----共享内存变量

背景 最近在开发实践中&#xff0c;接触到了需要多进程开发的场景。众所周知&#xff0c;进程和线程最大的区别就在于&#xff1a;进程是资源分配的最小单位&#xff0c;线程是cpu调度的最小单位。对于多进程开发来说&#xff0c;每一个进程都占据一块独立的虚拟内存空间&#…

大数据新视界 --大数据大厂之探索ES:大数据时代的高效搜索引擎实战攻略

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

致计算机新生们

欢迎你们踏入计算机科学的世界&#xff0c;这是一个充满挑战与机遇的领域。在你们即将开始的大学旅程中&#xff0c;了解计算机专业的就业方向和行业现状是非常重要的。以下是一些关于计算机专业就业方向和行业现状的介绍&#xff0c;希望能够帮助你们更好地规划自己的未来。 …

土豆王国小乐队携手阿派朗创造力乐园,打造2024年okgo儿童音乐节

艺术与科技的完美融合&#xff0c;为首都少年儿童带来音乐盛宴 北京&#xff0c;2024年9月19日 —— 备受期待的2024年okgo儿童音乐节即将于9月21日至22日在北京阿派朗创造力乐园盛大开幕。这场由土豆王国小乐队与阿派朗创造力乐园联合举办的音乐节&#xff0c;旨在为首都及全国…

波分技术基础 -- WDM/OTN介绍

什么是WDM WDM&#xff08;Wavelength Division Multiplexing&#xff09;&#xff1a;波分复用技术&#xff0c;将不同波长的光信号复用到一根光纤中进行传送的方式&#xff08;每个波长承载一个业务信号&#xff09;&#xff0c;主要功能是传送和复用。在波分技术出现之前&am…

Gephi 0.9.2中文版百度云下载(附教程)

如大家所了解的&#xff0c;Gephi常用于各种图形和网络的可视化和探索&#xff0c;是最受欢迎的网络可视化软件之一。在生物科学领域&#xff0c;常用于基因共表达网络、蛋白互作网络、微生物相互关系网络等等类似的网络图形绘制。 目前用的比较多的版本为Gephi 0.9.2&#xf…

使用rust自制操作系统内核

一、系统简介 本操作系统是一个使用rust语言实现&#xff0c;基于32位的x86CPU的分时操作系统。 项目地址&#xff08;求star&#xff09;&#xff1a;GitHub - CaoGaorong/os-in-rust: 使用rust实现一个操作系统内核 详细文档&#xff1a;自制操作系统 语雀 1. 项目特性 …

深度学习自编码器 - 使用自编码器学习流形篇

序言 在数据科学的浩瀚宇宙中&#xff0c;深度学习如同一颗璀璨的星辰&#xff0c;引领着我们对复杂数据内在规律的探索。其中&#xff0c;自编码器作为深度学习家族中的一位独特成员&#xff0c;以其非凡的能力——通过无监督学习捕捉数据的有效表示&#xff0c;而备受瞩目。…

Tomcat_WebApp

Tomcat的目录的介绍 /bin&#xff1a; 这个目录包含启动和关闭 Tomcat 的脚本。 startup.bat / startup.sh&#xff1a;用于启动 Tomcat&#xff08;.bat 文件是 Windows 系统用的&#xff0c;.sh 文件是 Linux/Unix 系统用的&#xff09;。shutdown.bat / shutdown.sh&#xf…

Java 实现桌面烟花秀

前言 今天&#xff0c;我们将展示如何使用 Java Swing 创建一个烟花效果&#xff0c;覆盖整个桌面。我们将重点讲解如何在桌面上展示烟花、如何实现发射和爆炸效果&#xff0c;以及如何将这些效果整合到一个完整的程序中。 效果展示 如上图所示&#xff0c;我们在桌面实现了&…

【开源大模型生态9】百度的文心大模型

这张图展示了百度千帆大模型平台的功能架构及其与BML-AI开发平台和百度百舸AI异构计算平台的关系。以下是各个模块的解释&#xff1a; 模型广场&#xff1a; 通用大模型&#xff1a;提供基础的自然语言处理能力。行业大模型&#xff1a;针对不同行业的定制化模型。大模型工具链…

新的 MathWorks 硬件支持包支持从 MATLAB 和 Simulink 模型到高通 Hexagon 神经处理单元架构的自动化代码生成

MathWorks 今天宣布&#xff0c;推出针对 Qualcomm Hexagon™ 神经处理单元&#xff08;NPU&#xff09;的硬件支持包。该处理单元嵌入在 Snapdragon 系列处理器中。MathWorks 硬件支持包&#xff0c;则专门针对 Qualcomm Technologies 的 Hexagon NPU 架构进行优化&#xff0c…

基于SSM的“校园外卖管理系统”的设计与实现(源码+数据库+文档+开题报告)

基于SSM的“校园外卖管理系统”的设计与实现&#xff08;源码数据库文档开题报告) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 消费者系统结构图 商户系统结构图 管理员系统结构图 校…

数据脱敏 (Jackson + Hutool 工具包)

一、简介 系统使用 Jackson 序列化策略&#xff0c;对标注了 Sensitive 注解的属性进行脱敏处理 基于Hutool 脱敏案列&#xff1a; Retention(RetentionPolicy.RUNTIME) Target(ElementType.FIELD) JacksonAnnotationsInside// 表示只对有此注解的字段进行序列化 JsonSeriali…