seatunnel数据集成(四)转换器使用

news2025/1/27 13:06:46

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


seatunnel除了丰富的连接器类型,其转换器也能够让数据转换更加简单,包括Copy,Filter,FieldSelector,FielMapper,DATa Filter,TypeConverter,Replace,Split,FilterRowKind,SQL,SQL Functions等。

1、Copy

将字段复制到新字段。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name = "base_region_01"
        query = "select * from base_region limit 4"
    }
}

transform {
  Copy {
    source_table_name = "base_region_01"
    result_table_name = "base_region_02"
    fields {
      id = id
      region_name = region_name
      region_name2 = region_name
    }
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "user"
    password = "password"
    source_table_name = "base_region_02"
    query = "insert into base_region(id,region_name,region_name2) values(?,?,?)"
  }
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_copy.conf

2、Filter

筛选字段。

需要保留的字段列表。不在列表中的字段将被删除

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name = "t_user_01"
        query = "select * from t_user"
    }
}

transform {
  Filter {
    source_table_name = "t_user_01"
    result_table_name = "t_user_02"
    fields = [id, name]
  }
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        source_table_name = "t_user_02"
        query = "insert into ods_t_user(id,name) values(?,?)"
  }
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_filter.conf

3、FieldSelector

字段选择器(Field Selector)转换器示例:

transform {
  FieldSelector {
    fields = ["id", "name", "age"]
  }
}

这个示例配置将只选择源数据中的 "id"、"name" 和 "age" 字段进行后续处理。

4、FieldMapper

  1. 字段映射器示例:
transform {
  FieldMapper {
    mappings {
      source_field = "source_value"
      target_field = "target_value"
    }
  }
}

这个示例配置将把源数据中的 "source_field" 字段的值映射为 "target_field" 字段的值

5、DataFilter

数据过滤器示例

transform {
  DataFilter {
    condition = "age >= 18"
  }
}

这个示例配置将只保留满足条件 "age >= 18" 的数据行。

6、TypeConverter

  1. 数据类型转换器示例:
transform {
  TypeConverter {
    field_conversion {
      name {
        from = "string"
        to = "integer"
      }
      age {
        from = "string"
        to = "integer"
      }
    }
  }
}

这个示例配置将把源数据中的 "name" 和 "age" 字段的数据类型从字符串(string)转换为整数(integer)。

7、Replace

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name = "t_user_01"
        query = "select * from t_user"
    }
}

transform {
  Replace {
    source_table_name = "t_user_01"
    result_table_name = "t_user_02"
    replace_field = "name"
    pattern = "%"
    replacement = ""
  }
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        source_table_name = "t_user_02"
        query = "insert into ods_t_user(id,name,birth,gender) values(?,?,?,?)"
  }
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_replace.conf

8、Split

将一个字段拆分为多个字段。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        result_table_name = "t_user_01"
        query = "select * from t_user"
    }
}

transform {
  Split {
    source_table_name = "t_user_01"
    result_table_name = "t_user_02"
    separator = "-"
    split_field = "birth"
    output_fields  = [birth_y, birth_m, birth_d]
  }
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        source_table_name = "t_user_02"
        query = "insert into ods_t_user_y_m_d(id,name,birth,gender,birth_y,birth_m,birth_d) values(?,?,?,?,?,?,?)"
  }
}
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_split.conf

9、FilterRowKind

按 RowKind 筛选数据

transform {
  FilterRowKind {
    row_kind = "DELETE"
  }
}

这个示例配置将只保留源数据中标记为 "DELETE" 的行。


env {
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    row.num = 100
    schema = {
      fields {
        id = "int"
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FilterRowKind {
    source_table_name = "fake"
    result_table_name = "fake1"
    exclude_kinds = ["INSERT"]
  }
}

sink {
  Console {
    source_table_name = "fake1"
  }
}

10、SQL

SQL转换使用内存SQL引擎,我们可以通过SQL函数和SQL引擎的能力来实现转换任务。

11、SQL Functions

String Functions

Numeric Functions

Time and Date Functions

System Functions

ASCII

ABS

CURRENT_DATE

CAST

BIT_LENGTH

ACOS

CURRENT_TIME

COALESCE

CHAR_LENGTH / LENGTH

ASIN

CURRENT_TIMESTAMP / NOW

IFNULL

OCTET_LENGTH

ATAN

DATEADD / TIMESTAMPADD

NULLIF

CHAR / CHR

COS

DATEDIFF

CONCAT

COSH

DATE_TRUNC

CONCAT_WS

COT

DAYNAME

HEXTORAW

SIN

DAY_OF_MONTH

RAWTOHEX

SINH

DAY_OF_WEEK

INSERT

TAN

DAY_OF_YEAR

LOWER / LCASE

TANH

EXTRACT

UPPER / UCASE

MOD

FORMATDATETIME

LEFT

CEIL / CEILING

HOUR

RIGHT

EXP

MINUTE

LOCATE / INSTR / POSITION

FLOOR

MONTH

LPAD

LN

MONTHNAME

RPAD

LOG

PARSEDATETIME / TO_DATE

LTRIM

LOG10

QUARTER

RTRIM

RADIANS

SECOND

TRIM

SQRT

WEEK

REGEXP_REPLACE

PI

YEAR

REGEXP_LIKE

POWER

REGEXP_SUBSTR

RAND / RANDOM

REPEAT

ROUND

REPLACE

SIGN

SOUNDEX

TRUNC

SPACE

SUBSTRING / SUBSTR

TO_CHAR

TRANSLATE

如:

transform {
  SqlFunction {
    function = "LOWER"
    field = "name"
  }
}

这个示例配置将源数据中的 "name" 字段转换为小写字母形式。

除此之外,还支持SQL UDF 函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1445362.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【大厂AI课学习笔记】【1.6 人工智能基础知识】(3)神经网络

深度学习是机器学习中一种基于对数据进行表征学习的算法。观测值(例如一幅草莓照片)可以使用 多种方式来表示,如每个像素强度值的向量,或者更抽象地表示成一系列边、特定形状的区域等。 深度学习的最主要特征是使用神经网络作为计算模型。神经网络模型 …

python+flask+django医院预约挂号病历分时段管理系统snsj0

技术栈 后端:python 前端:vue.jselementui 框架:django/flask Python版本:python3.7 数据库:mysql5.7 数据库工具:Navicat 开发软件:PyCharm . 第一,研究分析python技术&#xff0c…

《UE5_C++多人TPS完整教程》学习笔记8 ——《P9 访问 Steam(Acessing Steam)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P9 访问 Steam(Acessing Steam)》 的学习笔记,该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版,UP主(也是译者&…

使用UMAP降维可视化RAG嵌入

大型语言模型(LLMs)如 GPT-4 已经展示了出色的文本理解和生成能力。但它们在处理领域特定信息方面面临挑战,比如当查询超出训练数据范围时,它们会产生错误的答案。LLMs 的推理过程也缺乏透明度,使用户难以理解达成结论…

Win10截图的四种方式

截图不一定要依靠通讯软件,现在系统自己就带有这些功能。 1.Win Shift S组合键:选择微信截图,部分截图,随心所欲; 2.Win W组合键:呼出屏幕右侧的工作区,选择屏幕草图,支持裁剪、编辑…

flask+python高校学生综合测评管理系统 phl8b

系统包括管理员、教师和学生三个角色; 。通过研究,以MySQL为后端数据库,以python为前端技术,以pycharm为开发平台,采用vue架构,建立一个提供个人中心、学生管理、教师管理、课程类型管理、课程信息管理、学…

网络安全工程师技能手册(附学习路线图)

关键词:网络安全入门、渗透测试学习、零基础学安全、网络安全学习路线 安全是互联网公司的生命,也是每位网民的基本需求。现在越来越多的人对网络安全感兴趣,愿意投奔到网络安全事业之中,这是一个很好的现象。 很多对网络安全感…

线程池7个参数描述

所谓的线程池的 7 大参数是指&#xff0c;在使用 ThreadPoolExecutor 创建线程池时所设置的 7 个参数&#xff0c;如以下源码所示&#xff1a; public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable&…

游泳时可以听歌的耳机有哪些?戴游泳耳机有哪些好处?

游泳和跑步在某种程度上相似&#xff0c;特别是在短距离冲刺时&#xff0c;大脑似乎变得空白&#xff0c;而在中长距离的有氧运动中&#xff0c;身体感到疲劳&#xff0c;但大脑却异常清晰&#xff0c;时间却显得格外漫长。如何打发时间&#xff0c;让游泳锻炼变得不无聊&#…

「深度学习」dropout 技术

一、工作原理 1. 正则化网络 dropout 将遍历网络的每一层&#xff0c;并设置消除神经网络中节点的概率。 1. 每个节点保留/消除的概率为0.5: 2. 消除节点&#xff1a; 3. 得到一个规模更小的神经网络&#xff1a; 2. dropout 技术 最常用&#xff1a;反向随机失活 "…

Ubuntu Desktop - Terminal 输出全部选中 + 复制

Ubuntu Desktop - Terminal 输出全部选中 复制 1. Terminal2. Terminal 最大化3. Edit -> Select All4. Copy & PasteReferences 1. Terminal 2. Terminal 最大化 3. Edit -> Select All 4. Copy & Paste Edit -> Copy or Shift Ctrl C Edit -> Paste…

线程-线程的创建方式与线程池基础知识

创建线程有四种方式&#xff0c;继承Thread类、实现Runnable接口、实现Callable接口、线程池创建线程&#xff0c;常用的还是线程池创建线程。 1.继承Thread类 自定义类MyThread&#xff08;叫什么都行&#xff09;去extends Thread 重写里面的run方法&#xff0c;new MyThr…

2024-02-12 Unity 编辑器开发之编辑器拓展3 —— EditorGUI

文章目录 1 GUILayout2 EditorGUI 介绍3 文本、层级、标签、颜色拾取3.1 LabelField3.2 LayerField3.3 TagField3.4 ColorField3.5 代码示例 4 枚举选择、整数选择、按下按钮4.1 EnumPopup / EnumFlagsField4.2 IntPopup4.3 DropdownButton4.4 代码示例 5 对象关联、各类型输入…

【北邮鲁鹏老师计算机视觉课程笔记】05 Hough 霍夫变换

【北邮鲁鹏老师计算机视觉课程笔记】05 Hough 霍夫变换 1 投票策略 考虑到外点率太高 ①让直线上的每一点投票 ②希望噪声点不要给具体的任何模型投票&#xff0c;即噪声点不会有一致性的答案 ③即使被遮挡了&#xff0c;也能把直线找出来 参数空间离散化 直线相当于就是m,b两…

【Web】vulhub Fastjson反序列化漏洞复现学习笔记

目录 1.2.24 RCE CVE-2017-18349 复现流程 原理分析 1.2.47 RCE CNVD-2019-22238 复现流程 原理分析 漏洞探测 1.2.24 RCE CVE-2017-18349 复现流程 vulhub启动靶场 用marshalsec启动LDAP/RMI服务 java -cp marshalsec-0.0.3-SNAPSHOT-all.jar marshalsec.jndi.LDAPRef…

使用securecrt+xming通过x11访问ubuntu可视化程序

windows使用securecrtxming通过x11访问ubuntu可视化程序 windows机器IP&#xff1a;192.168.9.133 ubuntu-desktop20.04机器IP&#xff1a;192.168.9.190 windows下载xming并安装 按照图修改xming配置 开始->xming->Xlaunch 完成xming会在右下角后台运行 windows在…

Vue源码系列讲解——模板编译篇【一】(综述)

目录 1. 前言 2. 什么是模板编译 3. 整体渲染流程 4. 模板编译内部流程 4.1 抽象语法树AST 4.2 具体流程 5. 总结 1. 前言 在前几篇文章中&#xff0c;我们介绍了Vue中的虚拟DOM以及虚拟DOM的patch(DOM-Diff)过程&#xff0c;而虚拟DOM存在的必要条件是得先有VNode&…

Oracle11g安装配置详细教程

Oracle Database 11g是一款广泛使用的关系型数据库管理系统&#xff0c;它为企业级的应用提供了强大的数据管理功能。本文将详细介绍如何在Windows环境下安装和配置Oracle 11g。 准备工作 系统要求&#xff1a;确保你的系统满足安装Oracle 11g的最低要求。对于Oracle 11g Rele…

linux系统下vscode portable版本的python环境搭建003:venv

这里写自定义目录标题 python安装方案一. 使用源码安装&#xff08;有[构建工具](https://blog.csdn.net/ResumeProject/article/details/136095629)的情况下&#xff09;方案二.使用系统包管理器 虚拟环境安装TESTCG 本文目的&#xff1a;希望在获得一个新的系统之后&#xff…

Oracle的学习心得和知识总结(三十二)|Oracle数据库数据库回放功能之论文四翻译及学习

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《Oracle Database SQL Language Reference》 2、参考书籍&#xff1a;《PostgreSQL中文手册》 3、EDB Postgres Advanced Server User Gui…