必看!S3File Sink Connector 使用文档

news2024/11/28 4:45:22

file

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。

描述

将数据输出到 AWS S3 文件系统。

提示:

如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。

主要特性

  • 仅一次语义

默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。

  • 文件格式类型
    • 文本 (text)
    • CSV
    • Parquet
    • ORC
    • JSON
    • Excel

选项

名称类型必需默认值备注
pathstring-
bucketstring-
fs.s3a.endpointstring-
fs.s3a.aws.credentials.providerstringcom.amazonaws.auth.InstanceProfileCredentialsProvider
access_keystring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
access_secretstring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
custom_filenamebooleanfalse是否需要自定义文件名
file_name_expressionstring"${transactionId}"仅在 custom_filename 为 true 时使用
filename_time_formatstring"yyyy.MM.dd"仅在 custom_filename 为 true 时使用
file_format_typestring"csv"
field_delimiterstring'\001'仅在 file_format 为 text 时使用
row_delimiterstring"\n"仅在 file_format 为 text 时使用
have_partitionbooleanfalse是否需要处理分区
partition_byarray-仅在 have_partition 为 true 时使用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅在 have_partition 为 true 时使用
is_partition_field_write_in_filebooleanfalse仅在 have_partition 为 true 时使用
sink_columnsarray当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
is_enable_transactionbooleantrue
batch_sizeint1000000
compress_codecstringnone
common-optionsobject-
max_rows_in_memoryint-仅在 file_format 为 Excel 时使用
sheet_namestringSheet${Random number}仅在 file_format 为 Excel 时使用

path [string]

目标目录路径是必需的。

bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 协议,此参数应为 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端点

fs.s3a.aws.credentials.provider [string]

认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档

access_key [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws。

access_secret [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws。

hadoop_s3_properties [map]

如果需要添加其他选项,可以在这里添加并参考此 链接

hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
   }

custom_filename [boolean]

是否自定义文件名。

file_name_expression [string]

仅在 custom_filenametrue 时使用

file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now}${uuid},例如 test_${uuid}_${now}${now} 代表当前时间,其格式可以通过指定选项 filename_time_format 来定义。

请注意,如果 is_enable_transactiontrue,我们将在文件名的开头自动添加${transactionId}_

filename_time_format [string]

仅在 custom_filenametrue 时使用

file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式列于下表中:

符号描述
y
M
d月中的天数
H一天中的小时 (0-23)
m小时中的分钟
s分钟中的秒数

file_format_type [string]

我们支持以下文件类型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format 为 text 时需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 file_format 为 text 时需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅在 have_partitiontrue 时使用。

基于选定字段对分区数据进行分区。

partition_dir_expression [string]

仅在 have_partitiontrue 时使用。

如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。

默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partitiontrue 时使用。

如果 is_partition_field_write_in_filetrue,分区字段及其值将写入数据文件中。

例如,如果您想要写入 Hive 数据文件,其值应为 false

sink_columns [array]

需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。 字段的顺序决定了实际写入文件的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保在写入目标目录时数据不会丢失或重复。

请注意,如果 is_enable_transactiontrue,我们将在文件头部自动添加 ${transactionId}_

目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval 较小,则在新的检查点触发时,写入器将创建一个新文件。

compress_codec [string]

文件的压缩编解码器及其支持的详细信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提示:Excel 类型不支持任何压缩格式。

常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。

sheet_name [string]

工作簿的工作表名称。

示例

对于文本文件格式,具有 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/text"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction=true
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/parquet"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "parquet"
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/orc"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "orc"
  }

更新日志

2.3.0-beta 2022-10-20

  • 添加 S3File Sink 连接器

    2.3.0 2022-12-30

  • Bug修复

    • 修复了以下导致数据写入文件失败的错误:
      • 当上游字段为空时会抛出 NullPointerException
      • Sink 列映射失败
      • 从状态中恢复写入器时直接获取事务失败 (3258)
  • 功能

    • 支持 S3A 协议 (3632)
      • 允许用户添加额外的 Hadoop-S3 参数
      • 允许使用 S3A 协议
      • 解耦 Hadoop-AWS 依赖
    • 支持设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

下一版本

  • [优化]支持文件压缩(3699)

    本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1028593.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot druid多数据源配置,及druid监控

基础配置&#xff1a; springboot2.x版本 jdk1.8 依赖&#xff1a; <dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version> </dependency> &…

分享一个基于微信小程序的高校图书馆预约座位小程序 图书馆占座小程序源码 lw 调试

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

Linux高性能服务器编程 学习笔记 第五章 Linux网络编程基础API

我们将从以下3方面讨论Linux网络API&#xff1a; 1.socket地址API。socket最开始的含义是一个IP地址和端口对&#xff08;ip&#xff0c;port&#xff09;&#xff0c;它唯一表示了使用TCP通信的一端&#xff0c;本书称其为socket地址。 2.socket基础API。socket的主要API都定…

打开泰坦陨落提示msvcp120.dll丢失怎么办?三个解决方法快速解决

首先&#xff0c;我们来了解一下msvcr120.dll是什么文件。msvcr120.dll是一个动态链接库文件&#xff0c;它是Microsoft Visual C 2013 Redistributable中的一个组件。这个文件对于一些软件的运行是非常重要的&#xff0c;如果缺失或损坏&#xff0c;就会导致软件无法正常运行。…

从淘宝数据分析产品需求(商品销量总销量精准月销)

淘宝数据分析总体来说可以分为商品分析、客户分析、地区分析、时间分析四大维度(参考数据雷达的分析思路)。在这里我重点说商品分析。 在淘宝上开店的竞争还是非常激烈的&#xff0c;随便拿出一个单品就有很多竞品存在&#xff0c;所以做起来还是很难的&#xff0c;而想要在众…

Git:利用Git模拟企业级项目管理

文章目录 基础知识Git分支设计规范master分支release分支develop分支feature分支hotfix分支 模拟进行企业级项目管理 本篇主要总结的是企业级开发模型以及利用Git模拟企业级别的项目管理方式 基础知识 前面已经进行了全部的关于Git的各项操作&#xff0c;那么Git是作用于企业…

<Altium Designer>向PCB导入网表(.NET)

目录 01 AD PCB导入网表(.NET) 添加.NET文件到AD工程 通过show Differences操作导入器件 02 文章总结 大家好&#xff0c;这里是程序员杰克。一名平平无奇的嵌入式软件工程师。 硬件工程师使用的是Cadence的OrCAD画原理图&#xff0c;输出的是.NET网表&#xff0c;而杰克使…

自动化测试---选择框

radio框选择选项&#xff0c;直接用WebElement的click方法&#xff0c;模拟用户点击就可以了。 比如, 我们要在下面的html中&#xff1a; 1.先打印当前选中的老师名字 2.再选择 小雷老师 <div id"s_radio"><input type"radio" name"teach…

在静态方法中访问@Value注入的静态变量!!

一、 静态变量 static修饰的成员变量&#xff0c;称为静态成员变量&#xff0c;静态成员变量最大的特性&#xff1a;不属于某个具体的对象&#xff0c;是所有对象所共享的 简单来说&#xff1a;在某些类的对象中存在一些相同的成员变量&#xff0c;那么这种成员变量就可以设置…

解决VSCode下载速度很慢

这是VSCode的官网&#xff1a; Visual Studio Code - Code Editing. Redefined 按照官网的下载链接&#xff0c;速度实在是感人&#xff01; 解决办法也很简单&#xff0c;把链接换为CDN加速的链接 把下载链接中的az764295.vo.msecnd.net 替换为&#x1f449; vscode.cdn.azu…

MySQL数据库入门到精通1--基础篇(MySQL概述,SQL)

1. MySQL概述 1.1 数据库相关概念 目前主流的关系型数据库管理系统&#xff1a; Oracle&#xff1a;大型的收费数据库&#xff0c;Oracle公司产品&#xff0c;价格昂贵。 MySQL&#xff1a;开源免费的中小型数据库&#xff0c;后来Sun公司收购了MySQL&#xff0c;而Oracle又收…

压电换能器的工作原理和应用(功率放大器)

在日常生活中&#xff0c;可能会遇到很难测量的物理量&#xff0c;例如施加在金属上的机械应力、温度、压力水平等……对于所有这些应用&#xff0c;需要一种能够以我们熟悉的单位和校准来测量这些未知量的设备&#xff0c;而比较常用的设备是换能器。 换能器是一种电气设备&am…

华为OD机试 - 计算面积 - 逻辑分析(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…

491. 递增子序列

题目链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 思路&#xff1a; 注意的点&#xff1a;1.是在原有的序列里找递增的子序列 示例 2&#xff1a; 输入&#xff1a;nums [4,4,3,2,1] 输出&#xff1a;[[4,4]] 记一…

业务上云的容器排障与思考

1 前言 此前我们部门已经完成了业务上云的目标&#xff0c;而随着业务请求量的激增&#xff0c;上云应用系统也面临着一些复杂的故障和挑战。 下文我就结合最近的容器排障工作&#xff0c;跟大家一起探讨如何优化系统的性能、扩展性和容错能力&#xff0c;为读者提供参考和借鉴…

Python从入门到放弃系列教程01

Python从入门到放弃系列教程01 第一章 01 初识Python Python的起源 1989年&#xff0c;为了打发圣诞节假期&#xff0c;吉多范罗苏姆&#xff08;龟叔&#xff09;决定开发一个新的解释程序&#xff08;Python雏形&#xff09;&#xff0c;1991年&#xff0c;第一个Python解…

QT支持的平台

简述&#xff1a; Qt是一个商业和开源许可的跨平台应用程序和UI框架。它由Qt公司与Qt项目社区一起在开源治理模式下开发。 使用Qt&#xff0c;您可以编写一次GUI应用程序&#xff0c;然后将它们部署到桌面&#xff0c;移动和嵌入式操作系统中&#xff0c;而无需重写源代码。 Qt…

【医学影像数据处理】 Dicom 文件格式处理汇总

在医学影像的数据存储领域&#xff0c;是存在一定的行业标准的。X光、CT机器等等医疗器械等生产企业&#xff0c;会依据行业标准&#xff0c;对采集的数据进行规范化的存储。 这里面就包括了大名鼎鼎的DICOM 3.0协议&#xff0c;上述的摄影形式大部分也都是以这种形式进行存储…

Python实战:用多线程和多进程打造高效爬虫

文章目录 &#x1f34b;引言&#x1f34b;为什么要使用多线程和多进程&#xff1f;&#x1f34b;线程的常用方法&#x1f34b;线程锁&#xff08;也称为互斥锁或简称锁&#xff09;&#x1f34b;小案例&#x1f34b;实战---手办网&#x1f34b;总结 &#x1f34b;引言 在网络爬…

【JavaSpring】Aop的通知类型,获取数据

AOP 通知描述了抽取的共性功能&#xff0c;根据共性功能抽取的位置不同&#xff0c;最终运行代码时要将其加入到合理的位置 前置通知 Pointcut("execution(void org.example.dao.BookDao.update())")private void pt() {}Before("pt()")public void befo…