映客基于Apache SeaTunnel 打造高效的一站式数据集成平台

news2024/12/29 9:51:22

背景

随着业务的增加,数据集成任务大量增长,越来越多的数据源的需要支持,原有的系统已经无法完全支撑现有体量。

file

现有的数据集成平台短板慢慢展现导致部分业务线无法快速对接。数据源的架构在变得繁多和复杂,数据应用也逐渐变得更加垂直和场景化,这也倒逼了现代数据架构飞速发展。从而数据集成已经从一项技术管理工作升级为系统工程。

整体方案

对于整套系统的实施,我们首先对以下核心事情的做了处理:

确定数据集成的目标和范围

目前公司有大量的任务是基于 Trino 和 Kyuubi 这两个开源组件进行数据查询的加速和一站式 SQL 的统一管理,由于有大量的任务已经接入到系统中,这需要新的系统直接适配以前的任务,并进行自动迁移工作。

选择适合的数据集成工具和技术

在选择数据集成工具和技术时,需要考虑与原有系统无缝集成的工作量以及可持续性扩展的效率。

选型可以参考这篇文章:https://mp.weixin.qq.com/s/nFF31Rc3E0ia5jAl2ibRPw

我们经过一些开源软件的对比,在底层支撑系统中我们选择了数据集成工具的新起之秀 Apache SeaTunnel 和 Dinky 这两个开源软件。

虽然在选择 Apache SeaTunnel 的时候,当时还未支持这两个组件,但是我们还是决定进行二次开发以便对其支持。

Apache SeaTunnel 集成

我们在 SeaTunnel 中提供了对 Trino 和 Kyuubi 的 JDBC ⽀ 持⽅式,⽬前只实现了对 SOURCE 端的⽀持。

在 SeaTunnel 中对组件的 JDBC ⽀持⽅式实现很简单,我们可以参考 MySQL 的实现⽅式来实现它。

构建⽀持 Trino 的 Dialect

Dialect 需要实现 JdbcDialect 和 JdbcDialectFactory 这两个接⼝类。

TrinoDialect 的实现代码如下:

@Override
public String dialectName() {
    return "Trino";
}
    
@Override
public JdbcRowConverter getRowConverter() {
    return new TrinoJdbcRowConverter();
}
    
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
    return new TrinoTypeMapper();
}

PrestoDialectFactory 的实现代码:

@Override
public boolean acceptsURL(@NonNull String url) {
    // 通过 jdbc:presto 实现对 Presto 的⽀持
    return url.startsWith("jdbc:presto:") || url.startsWith("jdbc:trino:");
}
    
@Override
public JdbcDialect create() {
    return new TrinoDialect();
}

需要注意的是:我们在代码中也同时⽀持了 Presto

构建 Trino 数据类型转换器

数据类型转换器实现⽐较简单,核⼼就是对数据类型的⽀持,以下是部分实现代码:

@Override
public SeatunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
    String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
    // VARCHAR(x)      --->      VARCHAR
    if (columnType.indexOf("(") > -1) {
        columnType = columnType.split("\\(")[0];
    }
    int precision = metadata.getPrecision(colIndex);
    int scale = metadata.getScale(colIndex);
    switch (columnType) {
        case PRESTO_BOOLEAN:
            return BasicType.BOOLEAN_TYPE;
        case PRESTO_TINYINT:
            return BasicType.BYTE_TYPE;
        case PRESTO_INTEGER:
            return BasicType.INT_TYPE;
        case PRESTO_SMALLINT:
            return BasicType.SHORT_TYPE;
        case PRESTO_BIGINT:
            return BasicType.LONG_TYPE;
        case PRESTO_DECIMAL:
            return new DecimalType(precision, scale);
        case PRESTO_REAL:
            return BasicType.FLOAT_TYPE;
        case PRESTO_DOUBLE:
            return BasicType.DOUBLE_TYPE;
        case PRESTO_CHAR:
        case PRESTO_VARCHAR:
        case PRESTO_JSON:
        case PRESTO_ARRAY:
            return BasicType.STRING_TYPE;
        case PRESTO_DATE:
            return LocalTimeType.LOCAL_DATE_TYPE;
        case PRESTO_TIME:
            return LocalTimeType.LOCAL_TIME_TYPE;
        case PRESTO_TIMESTAMP:
            return LocalTimeType.LOCAL_DATE_TIME_TYPE;
        case PRESTO_VARBINARY:
        case PRESTO_BINARY:
            return PrimitiveByteArrayType.INSTANCE;
        //Doesn't support yet
        case PRESTO_MAP:
        case PRESTO_ROW:
        default:
            final String jdbcColumnName = metadata.getColumnName(colIndex);
            throw new JdbcConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
                String.format(
                    "Doesn't support Trino type '%s' on column '%s'  yet.",
                    columnType, jdbcColumnName));
    }
}

需要注意的是:需要我们特殊处理 VARCHAR(x) 这种带有位数的数据类型。

 ⽀持 TIMESTAMP WITH TIME ZONE

如果我们在编写的 SQL 中指定了时区后,默认在 SeaTunnel 框架中是不⽀持的,我们需要修改从⽽⽀持它。

修改 org.apache.seatunnel.api.table.type.LocalTimeType ⽀持携带时区

public static final LocalTimeType<LocalDateTime>
LOCAL_DATE_TIME_TYPE_WITH_ZONE = new LocalTimeType<>(LocalDateTime.class, SqlType.TIMESTAMP);

修改对应于 Spark 或 Flink 中的 TypeConverterUtils 进⾏⽀持它。需要注意对时区的截取以及设置。

Kyuubi 的对接集成和 Trino ⽅式⼀致。

设计实践

以下为数据集成平台架构图:

file

通过架构图我们可以看出我们的数据集成平台功能还是比较完善的,核心分为:

  • 数据服务体系

  • 数据资产管理

  • 数据汇集

  • 数据研发

  • 数据监控等

通过各个模块的整合以及资源的调整我们可以使用平台做出一条完善的数据链路。

下图为任务的具体发布流程:

file

当客户端发起请求后,主节点会获取已经注册的 Nodes 列表,通过 Gateway 校验 API 是否可用,此时会获取到某个 Node 作为任务分发的主节点,在该节点中会自动配置当前任务的一些监管控制等逻辑,在任务执行期间会产生 Log DriverMonitor DriverResource Driver 等多个 Driver 驱动器用于管理和监控任务的生命周期。

当任务执行结束后,系统会根据用户指定的配置进行最终任务的状态分发并做销毁,从而一整条任务的流水线构建完成。

发布任务

file

平台提供了数据任务的一整套流水线,通过新任务构建我们可以支持在平台中发布一个新任务,该任务会集成监控,资源管理等功能。

以上是平台的一个任务展示列表,我们通过模拟一个新的任务来进行平台的体验。

在平台中我们的任务接入方式很简单,分为:

  • DSL 介入

  • SQL 接入 (实验功能)

以下是一个任务的 DSL 参数,他模拟通过查询 Presto 将数据写入到 ClickHouse 中,并提供给用户查询

{
  "appKey": "datateam",
  "deployMode": "yarn",
  "username": "demo",
  "taskName": "datateam_demo_first_task",
  "timeout": 900,
  "sources": [
    {
      "source": "Jdbc",
      "configure": [
        {
          "field": "host",
          "value": "localhost",
          "split": false
        },
        {
          "field": "port",
          "value": "8080",
          "split": false
        },
        {
          "field": "type",
          "value": "presto",
          "split": false
        },
        {
          "field": "user",
          "value": "default",
          "split": false
        },
        {
          "field": "query",
          "value": "select 'xxx' as name, 12 as age",
          "split": false
        },
        {
          "field": "result_table_name",
          "value": "datateam_demo_first_task_presto_source",
          "split": false
        },
        {
          "field": "fields",
          "value": [
            {
              "column": "name",
              "origin": "名称",
              "type": "string"
            },
            {
              "column": "age",
              "origin": null,
              "type": "int"
            }
          ],
          "split": true
        }
      ]
    }
  ],
  "sinks": [
    {
      "sink": "Clickhouse",
      "configure": [
        {
          "field": "host",
          "value": "127.0.0.1",
          "split": false
        },
        {
          "field": "port",
          "value": "9000",
          "split": false
        },
        {
          "field": "database",
          "value": "test",
          "split": false
        },
        {
          "field": "table",
          "value": "datateam_demo_first_task",
          "split": false
        },
        {
          "field": "columns",
          "value": [
            "name",
            "age"
          ],
          "split": false
        }
      ]
    }
  ],
  "hooks": [
    {
      "hook": "Kafka",
      "rule": "SUCCESS",
      "configure": [
        {
          "field": "format",
          "value": "JSON"
        }
      ]
    }
  ],
  "platform": "Data"
}

只需要这样一个简单的 JSON 配置即可实现查询 Presto 将数据写入到 ClickHouse 中, 并会将最终的写入结果以 JSON 形式发送到指定的 Kafka Hook 中。

通过以上配置平台生成一个任务并分发到集群中,并执行设置的内容,在该 DSL 中我们设置的是 SQL,平台会在集群中去执行我们输入的 SQL 内容。

当然平台提供还提供 SQL 方式接入,我们只需要编写一个简单的 SQL 即可实现任务的接入,以下是一个简单的任务 SQL

CREATE TASK `datateam_demo_first_task`
WITH INPUT Jdbc (
  url=`localhost`,
  port=`8080`,
  type=`presto`,
  username=`default`,
  result_table_name=`datateam_demo_first_task_presto_source`,
  fields={
    format=`name`|`名称`
  }
)

WITH OUTPUT Clickhouse(
  host=`127.0.0.1`,
  port=`8123`,
  database=default,
  columns=[`name`, `age`]
)

WITH HOOK Kafka(
   rule=`SUCCESS`,
   fields={
    format=`JSON`
   }
)

WITH QUREY
  select 'xxx' as name, 12 as age

系统会通过解析 SQL Node 将配置转化为可执行参数从而进行发布任务。

日志管理 Log Driver

file

系统中有丰富的任务日志管理,不仅仅包含提交机执行的详细日志还包含集群中运行的详细日志。

当任务发布后,系统默认会构建 Log Driver 他会去采集集群中当前任务的运行日志,直到任务运行结束 Log Driver 回随即销毁。Log Driver 销毁后系统默认会调用并解析 Log Aggregation 来获取集群最终运行状态,从而将其更新至底层存储中。

超时及重试机制 Monitor Driver

当任务发布成功后系统会启动一个 Monitor Driver, 他主要用于对该任务的的一些监控操作,其中最核心的就是超时和重试机制。

  • 超时机制 

用户可以指定 timeout 参数用来配置当前任务的超时时间,一般当系统中任务较多或节点负载较高时,在任务发布时系统会自动抽取相关任务一周内执行记录,通过分析该执行记录(执行消耗资源,消耗时间等)进行 timeout 参数的重新设定,从而适配当前任务的执行过程不受外部依赖所影响导致任务异常退出。

  •  重试机制 

重试机制一般不会被触发。他拥有两种模式,分别是自动手动。在自动模式中只有运行状态 FAILURE 并且异常信息触发系统默认指定重试规则才会被应用。手动模式中用户可以配置自定义重试规则。两种模式可以并存,默认手动模式会覆盖自动模式。

当然 Monitor Driver 负责的不只是超时和重试,还会负责任务执行中的一些流程。当然包含了整个任务的生命周期,从任务的构建到任务的结束以及数据的落地都有实时响应以及反馈。

资源管控 Resource Driver

每个任务拥有独立的 Resource Driver, 当任务执行完成后 Resource Driver 随即启动,他会采集当前任务在集群中所使用的资源信息,以下是一个资源图

file

我们可以清楚的看到该任务消耗的内存以及 CPU

未来展望

流式任务

目前平台大多都是批处理任务,但是会有一些流式任务,目前只是一些简单的方式实现,还没有完善的 Driver 体系,无法完整的监管其生命周期,后续会对该类型任务推出完善的 Driver 机制。

 优化接入方式

目前大部分任务对接还是基于 DSL 方式,这样导致数据分析人员以及数仓人员无法快速对接平台,虽然推出 SQL 方式但目前还是实验功能,后续将会完善并将其完全引入系统中。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2033390.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

opencv-python图像增强四:多曝光融合(方法一)

文章目录 一、简介&#xff1a;二、多曝光融合方案&#xff1a;三、算法实现步骤3.1 读取图像与曝光时间&#xff1a;3.2 计算响应曲线并合并3.3 色调映射 四&#xff1a;整体代码实现五&#xff1a;效果 一、简介&#xff1a; 在摄影和计算机视觉领域&#xff0c;高动态范围&…

关于IP子网掩码的解释

关于IP子网掩码的解释 1、掩码 10.1.1.0/27 掩码 10.1.1.0/27 表示一个子网&#xff0c;其中/27是子网掩码的表示方式&#xff0c;指的是前27位是网络位&#xff0c;剩下的5位是主机位&#xff0c;这种掩码意味着每个子网有32个IP地址(2^532)&#xff0c;其中包括一个网络地址…

GitHub提交PR(GitHub提交个人代码到社区)

GitHub提交PR&#xff08;GitHub提交个人代码到社区&#xff09; 1. 设置基本信息 确保你的本地 git 配置中的用户名和邮箱地址与你在 GitHub 账户中设置的信息一致。‌如果不一致&#xff0c;‌使用 git config --global user.name "Your Name" 和 git config --g…

三层架构与解耦——IoCDI机制【后端 7】

三层架构与解耦——IoC&DI机制 在软件开发领域&#xff0c;三层架构&#xff08;Controller、Service、Dao&#xff09;是一种广泛采用的架构模式&#xff0c;它通过将应用程序分为三个主要层次来组织代码&#xff0c;旨在提高代码的可维护性、复用性和可扩展性。而解耦&am…

读零信任网络:在不可信网络中构建安全系统17无控制器架构

1. 建立系统框图 1.1. 实现零信任网络的第一步重要工作是建立系统框图 1.2. 系统框图能够帮助我们透彻地理解内部网络和外部网络间的通信模式&#xff0c;有助于系统通信信道的设计 1.3. 对于现有的网络来说&#xff0c;建议首先利用日志工具来记录网络流量&#xff0c;然后…

2万多条初中历史题库ACCESS\EXCEL数据库

这段时间破解了中高学生知识题库&#xff0c;包含高&中英语题库、小&学英语题库、初&中地理题库、初&中历史题库、高&中历史题库、初&中生物题库&#xff0c;数据表结构都一样&#xff0c;今天发的这份是上万条的初中历史题库&#xff0c;截图包含所有…

CSS——伪元素:before

CSS——伪元素&:before 简单介绍&#xff1a; ::after和::before的使用很简单&#xff0c;可以认为其所在元素上存在一前一后的两个的元素&#xff0c;这两个元素默认是内联元素&#xff0c;但我们可以为其增添样式。::after和::before使用的时候一定要注意&#xff0c;必…

数据结构之二叉树详解——包含递归及迭代遍历方式

二叉树的种类 二叉树&#xff08;binary tree&#xff09;是一种非线性数据结构&#xff0c;代表“祖先”与“后代”之间的派生关系&#xff0c;体现了“一分为二”的分治逻辑。与链表类似&#xff0c;二叉树的基本单元是节点&#xff0c;每个节点包含值、左子节点引用和右子节…

计算机毕业设计 扶贫助农系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

Golang面试题六(GMP)

目录 1.Go线程实现模型 1:1 关系 N:1关系 M:N关系 2.GM模型 3.GMP模型 概念 模型简介 有关P和M的个数问题 P和M何时会被创建 4.调度器的设计策略 5.go func() 调度流程 6.调度器的生命周期 7.Go work stealing 机制 8.Go hand off 机制 9.Go 抢占式调度 9.Sys…

8.3.数据库基础技术-关系代数

并&#xff1a;结果是两张表中所有记录数合并&#xff0c;相同记录只显示一次。交&#xff1a;结果是两张表中相同的记录。差&#xff1a;S1-S2,结果是S1表中有而S2表中没有的那些记录。 笛卡尔积&#xff1a;S1XS2,产生的结果包括S1和S2的所有属性列&#xff0c;并且S1中每条记…

[C++][opencv]基于opencv实现photoshop算法色阶调整

【测试环境】 vs2019 opencv4.8.0 【效果演示】 【核心实现代码】 Levels.hpp #ifndef OPENCV2_PS_LEVELS_HPP_ #define OPENCV2_PS_LEVELS_HPP_#include "opencv2/core.hpp" #include "opencv2/imgproc.hpp" #include "opencv2/highgui.hpp&quo…

Re:从零开始的逆向笔记02day

1-C语言 参数传递 参数传递是通过堆栈的&#xff0c;传递的顺序是从右到左 函数返回值是存储在寄存器eax中 类型 char x -1; //0xFF 1111 1111 int y x; //0xFFFFFFFF 1111 1111 1111 1111 1111 1111 1111 1111 其余位为符号位unsigned char x -1; //0xFF 1111 1111 in…

云快充协议1.5版本的充电桩系统软件

介绍 小程序端&#xff1a;城市切换、附近电站、电桩详情页、扫码充电、充电中动态展示、订单支付、个人中心、会员充值、充值赠送、联系客服&#xff1b; 管理后台&#xff1a;充电数据看板、会员管理、订单管理、充值管理、场站运营、文章管理、财务管理、意见反馈、管理员管…

腾讯云COS和阿里云OSS在Springboot中的使用

引言&#xff1a;之前本来是用OSS做存储的&#xff0c;但是上线小程序发现OSS貌似消费比COS多一些&#xff0c;所以之前做了技术搬迁&#xff0c;最近想起&#xff0c;打算做个笔记记录一下&#xff0c;这里省去在阿里云注册OSS或腾讯云中注册COS应用了。 一、OSS 1、配置yml …

Linux 网络设备驱动

一.网络设备驱动框架 接收 将报文从设备驱动接受并送入协议栈 老API netif_if 编写网络设备驱动 步骤 1.注册一个网络设备 2.填充net_device_ops结构体 3.编写接收发送函数 // SPDX-License-Identifier: GPL-2.0-only /** This module emits "Hello, world"…

IOS 02 SnapKit 纯代码开发

SnapKit是一个Swift语言写的自动布局框架&#xff0c;可以运行到iOS&#xff0c;Mac系统上&#xff1b;OC版本的框架是Masonry&#xff0c;都是出自同一个团队。 用这个框架的目的是&#xff0c;用起来比系统自带的API方便&#xff0c;他内部也是对系统API进行了封装。 为什么…

房产中介小程序

本文来自&#xff1a;ThinkPHPFastAdmin房产中介小程序 - 源码1688 应用介绍 产中介小程序是一款基于ThinkPHPFastAdmin开发的原生微信小程序&#xff0c;为房地产中介提供房源管理、发布、报备客户、跟踪客户以及营销推广获客等服务的系统。 前端演示&#xff1a; 后台演示&am…

HarmonyOS应用开发者基础认证(三)

1、针对包含文本元素的组件&#xff0c;例如Text、Button、TextInput等&#xff0c;可以使用下列哪些属性&#xff1a;&#xff08;全选&#xff09; 答案&#xff1a; fontColor fontFamily fontSize fontWeight fontStyle 分析&#xff1a; 2、关于Tabs组件和TabContent组件&…

【高效笔记与整理的艺术】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…