seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用
seatunnel除了丰富的连接器类型,其转换器也能够让数据转换更加简单,包括Copy,Filter,FieldSelector,FielMapper,DATa Filter,TypeConverter,Replace,Split,FilterRowKind,SQL,SQL Functions等。
1、Copy
将字段复制到新字段。
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name = "base_region_01"
query = "select * from base_region limit 4"
}
}
transform {
Copy {
source_table_name = "base_region_01"
result_table_name = "base_region_02"
fields {
id = id
region_name = region_name
region_name2 = region_name
}
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "base_region_02"
query = "insert into base_region(id,region_name,region_name2) values(?,?,?)"
}
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_copy.conf
2、Filter
筛选字段。
需要保留的字段列表。不在列表中的字段将被删除
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name = "t_user_01"
query = "select * from t_user"
}
}
transform {
Filter {
source_table_name = "t_user_01"
result_table_name = "t_user_02"
fields = [id, name]
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_02"
query = "insert into ods_t_user(id,name) values(?,?)"
}
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_filter.conf
3、FieldSelector
字段选择器(Field Selector)转换器示例:
transform {
FieldSelector {
fields = ["id", "name", "age"]
}
}
这个示例配置将只选择源数据中的 "id"、"name" 和 "age" 字段进行后续处理。
4、FieldMapper
- 字段映射器示例:
transform {
FieldMapper {
mappings {
source_field = "source_value"
target_field = "target_value"
}
}
}
这个示例配置将把源数据中的 "source_field" 字段的值映射为 "target_field" 字段的值
5、DataFilter
数据过滤器示例
transform {
DataFilter {
condition = "age >= 18"
}
}
这个示例配置将只保留满足条件 "age >= 18" 的数据行。
6、TypeConverter
- 数据类型转换器示例:
transform {
TypeConverter {
field_conversion {
name {
from = "string"
to = "integer"
}
age {
from = "string"
to = "integer"
}
}
}
}
这个示例配置将把源数据中的 "name" 和 "age" 字段的数据类型从字符串(string)转换为整数(integer)。
7、Replace
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name = "t_user_01"
query = "select * from t_user"
}
}
transform {
Replace {
source_table_name = "t_user_01"
result_table_name = "t_user_02"
replace_field = "name"
pattern = "%"
replacement = ""
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_02"
query = "insert into ods_t_user(id,name,birth,gender) values(?,?,?,?)"
}
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_replace.conf
8、Split
将一个字段拆分为多个字段。
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name = "t_user_01"
query = "select * from t_user"
}
}
transform {
Split {
source_table_name = "t_user_01"
result_table_name = "t_user_02"
separator = "-"
split_field = "birth"
output_fields = [birth_y, birth_m, birth_d]
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "t_user_02"
query = "insert into ods_t_user_y_m_d(id,name,birth,gender,birth_y,birth_m,birth_d) values(?,?,?,?,?,?,?)"
}
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_split.conf
9、FilterRowKind
按 RowKind 筛选数据
transform {
FilterRowKind {
row_kind = "DELETE"
}
}
这个示例配置将只保留源数据中标记为 "DELETE" 的行。
env {
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}
transform {
FilterRowKind {
source_table_name = "fake"
result_table_name = "fake1"
exclude_kinds = ["INSERT"]
}
}
sink {
Console {
source_table_name = "fake1"
}
}
10、SQL
SQL转换使用内存SQL引擎,我们可以通过SQL函数和SQL引擎的能力来实现转换任务。
11、SQL Functions
String Functions | Numeric Functions | Time and Date Functions | System Functions |
ASCII | ABS | CURRENT_DATE | CAST |
BIT_LENGTH | ACOS | CURRENT_TIME | COALESCE |
CHAR_LENGTH / LENGTH | ASIN | CURRENT_TIMESTAMP / NOW | IFNULL |
OCTET_LENGTH | ATAN | DATEADD / TIMESTAMPADD | NULLIF |
CHAR / CHR | COS | DATEDIFF | |
CONCAT | COSH | DATE_TRUNC | |
CONCAT_WS | COT | DAYNAME | |
HEXTORAW | SIN | DAY_OF_MONTH | |
RAWTOHEX | SINH | DAY_OF_WEEK | |
INSERT | TAN | DAY_OF_YEAR | |
LOWER / LCASE | TANH | EXTRACT | |
UPPER / UCASE | MOD | FORMATDATETIME | |
LEFT | CEIL / CEILING | HOUR | |
RIGHT | EXP | MINUTE | |
LOCATE / INSTR / POSITION | FLOOR | MONTH | |
LPAD | LN | MONTHNAME | |
RPAD | LOG | PARSEDATETIME / TO_DATE | |
LTRIM | LOG10 | QUARTER | |
RTRIM | RADIANS | SECOND | |
TRIM | SQRT | WEEK | |
REGEXP_REPLACE | PI | YEAR | |
REGEXP_LIKE | POWER | ||
REGEXP_SUBSTR | RAND / RANDOM | ||
REPEAT | ROUND | ||
REPLACE | SIGN | ||
SOUNDEX | TRUNC | ||
SPACE | |||
SUBSTRING / SUBSTR | |||
TO_CHAR | |||
TRANSLATE |
如:
transform {
SqlFunction {
function = "LOWER"
field = "name"
}
}
这个示例配置将源数据中的 "name" 字段转换为小写字母形式。
除此之外,还支持SQL UDF 函数