在现代企业中,数据已经成为核心资产,基于开源数据集成平台SeaTunnel,工程师如何高效地连接和管理这些数据源,直接关系到企业的竞争力和运营效率。
本文将给大家介绍如何通过 JDBC PostgreSQL 数据源连接器,在 SeaTunnel 平台中实现高效的数据处理与集成,并详细解析其关键功能和使用场景。
支持的引擎
在数据集成和处理的过程中,选择合适的引擎至关重要。JDBC PostgreSQL 数据源连接器支持以下引擎:
- Spark: 适用于大规模数据处理和实时流处理。
- Flink: 强大的流式数据处理引擎,适合需要低延迟和高吞吐量的场景。
- SeaTunnel Zeta: 专为数据集成和处理设计的轻量级引擎,提供高效、灵活的解决方案。
使用依赖
对于 Spark/Flink 引擎
使用 Spark 或 Flink 引擎时,需要确保将 JDBC 驱动程序 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/
目录中。
对于 SeaTunnel Zeta 引擎
使用 SeaTunnel Zeta 引擎时,请将 JDBC 驱动程序 jar 包 放置在 ${SEATUNNEL_HOME}/lib/
目录中。
关键功能
JDBC PostgreSQL 数据源连接器在数据处理过程中,提供了一系列关键功能,帮助企业高效地管理和利用数据:
- 批量处理 (Batch): 支持大规模数据的批量读取和处理。
- 流处理 (Stream): 当前尚未支持流式数据处理。
- 精确一次 (Exactly-Once): 确保数据处理的精确一致性,避免重复和数据丢失。
- 列投影 (Column Projection): 允许用户选择和投影特定的列,以优化数据读取的性能。
- 并行处理 (Parallelism): 支持数据的并行读取和处理,提高处理效率。
- 用户定义的拆分 (User-Defined Split): 支持用户定义的拆分策略,灵活处理不同的数据分片需求。
数据源信息
连接器支持不同版本的 PostgreSQL 数据源,每个版本可能使用不同的驱动程序类。以下是支持的数据源信息:
数据源 | 支持的版本 | 驱动程序 | 连接 URL | Maven 下载链接 |
---|---|---|---|---|
PostgreSQL | 各版本依赖使用不同的驱动程序类 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 下载 |
PostgreSQL | 若需操作 GEOMETRY 类型数据 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 下载 |
数据库依赖
请下载与您的数据源相对应的 Maven 支持列表,并将其复制到 $SEATUNNEL_HOME/plugins/jdbc/lib/
目录中。例如,对于 PostgreSQL 数据源,请将 postgresql-xxx.jar
文件复制到上述目录。
数据类型映射
JDBC PostgreSQL 连接器提供了丰富的数据类型支持,将 PostgreSQL 数据类型映射到 SeaTunnel 的数据类型:
PostgreSQL 数据类型 | SeaTunnel 数据类型 |
---|---|
BOOL | BOOLEAN |
_BOOL | ARRAY |
BYTEA | BYTES |
_BYTEA | ARRAY |
INT2, SMALLSERIAL | SMALLINT |
_INT2 | ARRAY |
INT4, SERIAL | INT |
_INT4 | ARRAY |
INT8, BIGSERIAL | BIGINT |
_INT8 | ARRAY |
FLOAT4 | FLOAT |
_FLOAT4 | ARRAY |
FLOAT8 | DOUBLE |
_FLOAT8 | ARRAY |
NUMERIC (指定列大小 > 0) | DECIMAL (指定列大小,获取指定列的小数点右边的数字个数) |
NUMERIC (指定列大小 < 0) | DECIMAL (38, 18) |
BPCHAR, CHARACTER, VARCHAR, TEXT | STRING |
_BPCHAR, _CHARACTER, _VARCHAR, _TEXT | ARRAY |
TIMESTAMP(s), TIMESTAMPTZ(s) | TIMESTAMP(s) |
TIME(s), TIMETZ(s) | TIME(s) |
DATE | DATE |
选项
名称 | 类型 | 必填 | 默认值 | 描述 |
---|---|---|---|---|
url | String | 是 | - | JDBC 连接的 URL。例如:jdbc:postgresql://localhost:5432/test |
driver | String | 是 | - | 连接到远程数据源的 JDBC 类名,如果使用 PostgreSQL,值为 org.postgresql.Driver |
user | String | 否 | - | 连接实例的用户名 |
password | String | 否 | - | 连接实例的密码 |
query | String | 是 | - | 查询语句 |
connection_check_timeout_sec | Int | 否 | 30 | 验证连接的数据库操作完成的等待时间(秒) |
partition_column | String | 否 | - | 并行处理的分区列名,只支持数值类型主键,且只能配置一个列 |
partition_lower_bound | BigDecimal | 否 | - | 分区列的最小值,如果未设置,SeaTunnel 将查询数据库获取最小值 |
partition_upper_bound | BigDecimal | 否 | - | 分区列的最大值,如果未设置,SeaTunnel 将查询数据库获取最大值 |
partition_num | Int | 否 | 作业并行度 | 分区数量,只支持正整数,默认值为作业并行度 |
fetch_size | Int | 否 | 0 | 对于返回大量对象的查询,可以配置行获取大小以提高性能,减少满足选择条件所需的数据库命中次数。0 表示使用 JDBC 默认值 |
properties | Map | 否 | - | 其他连接配置参数,当 properties 和 URL 存在相同参数时,优先级由驱动程序的具体实现决定,例如在 MySQL 中,properties 优先于 URL |
并行读取
JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些规则拆分表中的数据,然后交给读者读取。读者的数量由 parallelism
选项决定。
拆分键规则
- 如果
partition_column
不为空,将用于计算拆分。该列必须是 支持的拆分数据类型。 - 如果
partition_column
为空,SeaTunnel 将从表中读取架构并获取主键和唯一索引。如果主键和唯一索引中有多个列,支持的拆分数据类型 中的第一列将用于拆分数据。例如,表有主键(nn guid, name varchar),因为guid
不在 支持的拆分数据类型 中,因此将使用列name
进行数据拆分。
支持的拆分数据类型
- 字符串
- 数字(int、bigint、decimal 等)
- 日期
相关拆分选项
split.size
每个拆分包含多少行,当读取表时,捕获的表将拆分为多个拆分。
split.even-distribution.factor.lower-bound
不推荐使用
块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即(MAX(id) - MIN(id) + 1)/ 行数),则表块将优化为均匀分布。否则,如果分布因子较小,表将被视为不均匀分布,并在估计的分片数超过 sample-sharding.threshold
值时使用基于采样的分片策略。默认值为 0.05。
split.even-distribution.factor.upper-bound
不推荐使用
块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即```markdown
PostgreSQL 数据源连接器:在 SeaTunnel 中的应用与优势
在现代企业中,数据已经成为核心资产,如何高效地连接和管理这些数据源,直接关系到企业的竞争力和运营效率。本文将深入探讨如何通过 JDBC PostgreSQL 数据源连接器,在 SeaTunnel 平台中实现高效的数据处理与集成,并详细解析其关键功能和使用场景。
支持的引擎
在数据集成和处理的过程中,选择合适的引擎至关重要。JDBC PostgreSQL 数据源连接器支持以下引擎:
- Spark: 适用于大规模数据处理和实时流处理。
- Flink: 强大的流式数据处理引擎,适合需要低延迟和高吞吐量的场景。
- SeaTunnel Zeta: 专为数据集成和处理设计的轻量级引擎,提供高效、灵活的解决方案。
使用依赖
对于 Spark/Flink 引擎
使用 Spark 或 Flink 引擎时,需要确保将 JDBC 驱动程序 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/
目录中。
对于 SeaTunnel Zeta 引擎
使用 SeaTunnel Zeta 引擎时,请将 JDBC 驱动程序 jar 包 放置在 ${SEATUNNEL_HOME}/lib/
目录中。
关键功能
JDBC PostgreSQL 数据源连接器在数据处理过程中,提供了一系列关键功能,帮助企业高效地管理和利用数据:
- 批量处理 (Batch): 支持大规模数据的批量读取和处理。
- 精确一次 (Exactly-Once): 确保数据处理的精确一致性,避免重复和数据丢失。
- 列投影 (Column Projection): 允许用户选择和投影特定的列,以优化数据读取的性能。
- 并行处理 (Parallelism): 支持数据的并行读取和处理,提高处理效率。
- 用户定义的拆分 (User-Defined Split): 支持用户定义的拆分策略,灵活处理不同的数据分片需求。
数据源信息
连接器支持不同版本的 PostgreSQL 数据源,每个版本可能使用不同的驱动程序类。以下是支持的数据源信息:
数据源 | 支持的版本 | 驱动程序 | 连接 URL | Maven 下载链接 |
---|---|---|---|---|
PostgreSQL | 各版本依赖使用不同的驱动程序类 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 下载 |
PostgreSQL | 若需操作 GEOMETRY 类型数据 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 下载 |
数据库依赖
请下载与您的数据源相对应的 Maven 支持列表,并将其复制到 $SEATUNNEL_HOME/plugins/jdbc/lib/
目录中。例如,对于 PostgreSQL 数据源,请将 postgresql-xxx.jar
文件复制到上述目录。
数据类型映射
JDBC PostgreSQL 连接器提供了丰富的数据类型支持,将 PostgreSQL 数据类型映射到 SeaTunnel 的数据类型:
PostgreSQL 数据类型 | SeaTunnel 数据类型 |
---|---|
BOOL | BOOLEAN |
_BOOL | ARRAY |
BYTEA | BYTES |
_BYTEA | ARRAY |
INT2, SMALLSERIAL | SMALLINT |
_INT2 | ARRAY |
INT4, SERIAL | INT |
_INT4 | ARRAY |
INT8, BIGSERIAL | BIGINT |
_INT8 | ARRAY |
FLOAT4 | FLOAT |
_FLOAT4 | ARRAY |
FLOAT8 | DOUBLE |
_FLOAT8 | ARRAY |
NUMERIC (指定列大小 > 0) | DECIMAL (指定列大小,获取指定列的小数点右边的数字个数) |
NUMERIC (指定列大小 < 0) | DECIMAL (38, 18) |
BPCHAR, CHARACTER, VARCHAR, TEXT | STRING |
_BPCHAR, _CHARACTER, _VARCHAR, _TEXT | ARRAY |
TIMESTAMP(s), TIMESTAMPTZ(s) | TIMESTAMP(s) |
TIME(s), TIMETZ(s) | TIME(s) |
DATE | DATE |
选项
名称 | 类型 | 必填 | 默认值 | 描述 |
---|---|---|---|---|
url | String | 是 | - | JDBC 连接的 URL。例如:jdbc:postgresql://localhost:5432/test |
driver | String | 是 | - | 连接到远程数据源的 JDBC 类名,如果使用 PostgreSQL,值为 org.postgresql.Driver |
user | String | 否 | - | 连接实例的用户名 |
password | String | 否 | - | 连接实例的密码 |
query | String | 是 | - | 查询语句 |
connection_check_timeout_sec | Int | 否 | 30 | 验证连接的数据库操作完成的等待时间(秒) |
partition_column | String | 否 | - | 并行处理的分区列名,只支持数值类型主键,且只能配置一个列 |
partition_lower_bound | BigDecimal | 否 | - | 分区列的最小值,如果未设置,SeaTunnel 将查询数据库获取最小值 |
partition_upper_bound | BigDecimal | 否 | - | 分区列的最大值,如果未设置,SeaTunnel 将查询数据库获取最大值 |
partition_num | Int | 否 | 作业并行度 | 分区数量,只支持正整数,默认值为作业并行度 |
fetch_size | Int | 否 | 0 | 对于返回大量对象的查询,可以配置行获取大小以提高性能,减少满足选择条件所需的数据库命中次数。0 表示使用 JDBC 默认值 |
properties | Map | 否 | - | 其他连接配置参数,当 properties 和 URL 存在相同参数时,优先级由驱动程序的具体实现决定,例如在 MySQL 中,properties 优先于 URL |
并行读取
JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些规则拆分表中的数据,然后交给读者读取。读者的数量由 parallelism
选项决定。
拆分键规则
- 如果
partition_column
不为空,将用于计算拆分。该列必须是 支持的拆分数据类型。 - 如果
partition_column
为空,SeaTunnel 将从表中读取架构并获取主键和唯一索引。如果主键和唯一索引中有多个列,支持的拆分数据类型 中的第一列将用于拆分数据。例如,表有主键(nn guid, name varchar),因为guid
不在 支持的拆分数据类型 中,因此将使用列name
进行数据拆分。
支持的拆分数据类型
- 字符串
- 数字(int、bigint、decimal 等)
- 日期
相关拆分选项
split.size
每个拆分包含多少行,当读取表时,捕获的表将拆分为多个拆分。
split.even-distribution.factor.lower-bound
不推荐使用
块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即(MAX(id) - MIN(id) + 1)/ 行数),则表块将优化为均匀分布。否则,如果分布因子较小,表将被视为不均匀分布,并在估计的分片数超过 sample-sharding.threshold
值时使用基于采样的分片策略。默认值为 0.05。
split.even-distribution.factor.upper-bound
不推荐使用
块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即(MAX(id) - MIN(id) + 1)/ 行数),则表块将优化为均匀分布。否则,如果分布因子更大,则如果预估的分片数超过 指定的值,则该表将被视为分布不均匀,并且将使用基于采样的分片策略sample-sharding.threshold。默认值为 100.0。
split.sample-sharding.threshold
此配置指定触发采样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-bound
和 chunk-key.even-distribution.factor.lower-bound
指定的范围,并且估计的分片数(计算为近似行数/块大小)超过此阈值时,将使用采样分片策略。这有助于更高效地处理大数据集。默认值为 1000 分片。
split.inverse-sampling.rate
采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则表示在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。对于非常大的数据集,较低的采样率是首选。默认值为 1000。
partition_column [string]
用于拆分数据的列名。
partition_upper_bound [BigDecimal]
扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_lower_bound [BigDecimal]
扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_num [int]
不推荐使用,正确的方法是通过
split.size
控制拆分数量
需要拆分成多少个分片,仅支持正整数。默认值为作业并行度。
提示
如果表不能拆分(例如,表没有主键或唯一索引,并且未设置
partition_column
),它将以单一并发运行。使用
table_path
替换query
进行单表读取。如果需要读取多个表,请使用table_list
。
任务示例
简单示例:
此示例在测试 "数据库" 中查询 type_bin 'table' 16 数据,并以单并行方式查询其所有字段。您还可以指定要查询的字段并最终输出到控制台。
# 定义运行环境
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source limit 16"
}
}
transform {
# 请访问 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
通过 partition_column 并行
使用您配置的分片字段并行读取查询表中的数据。如果您想读取整个表,可以这样做。
env {
parallelism = 4
job.mode = "BATCH"
}
source{
jdbc{
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source"
partition_column= "id"
partition_num = 5
}
}
sink {
Console {}
}
通过主键或唯一索引并行
配置 table_path 将开启自动拆分,您可以配置 split.* 以调整拆分策略。
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
table_path = "test.public.AllDataType_1"
query = "select * from public.AllDataType_1"
split.size = 10000
}
}
sink {
Console {}
}
并行边界
指定查询上限和下限的数据会更高效。根据您配置的上限和下限读取数据源会更高效。
source{
jdbc{
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source"
partition_column= "id"
# 返回的表名称
result_table_name = "jdbc"
partition_lower_bound = 1
partition_upper_bound = 50
partition_num = 5
}
}
多表读取
配置 table_list
将开启自动拆分,您可以配置 split.*
以调整拆分策略
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url="jdbc:postgresql://datasource01:5432/demo"
user="iDm82k6Q0Tq+wUprWnPsLQ=="
driver="org.postgresql.Driver"
password="iDm82k6Q0Tq+wUprWnPsLQ=="
"table_list"=[
{
"table_path"="demo.public.AllDataType_1"
},
{
"table_path"="demo.public.alldatatype"
}
]
#where_condition= "where id > 100"
split.size = 10000
#split.even-distribution.factor.upper-bound = 100
#split.even-distribution.factor.lower-bound = 0.05
#split.sample-sharding.threshold = 1000
#split.inverse-sampling.rate = 1000
}
}
sink {
Console {}
}
通过正确配置 PostgreSQL JDBC 源连接器,企业可以在复杂的数据环境中高效地管理和处理数据。
使用 SeaTunnel 的灵活性和强大的功能,用户可以轻松实现数据的并行处理和高效查询,从而在数据驱动的业务决策中获得更大的优势。
本文由 白鲸开源科技 提供发布支持!