得物自建 DTS 平台的技术演进 | 精选

news2025/1/27 12:52:36

0 前言

DTS是数据传输平台(Data Transfer Platform的缩写)

随着得物App的用户流量增长,业务选择的数据库越来越多样化,异构数据源之间的数据同步需求也逐渐增多。为了控制成本并更好地支持业务发展,我们决定自建DTS平台。本文主要从技术选型、能力支持与演化的角度出发,分享了在DTS平台升级过程中获得的经验,并提供一些参考。

1 技术选型

DTS的主要目标是支持不同类型的数据源之间的数据交互,包括关系型数据库(RDBMS)、NoSQL数据库、OLAP等,同时整合了数据库配置管理、数据订阅、数据同步、数据迁移、DRC双活数据同步支持、数据巡检、监控报警、统一权限等多个模块,以构建安全、可扩展、高可用的数据架构平台。

1.1 能力对比

1.2 DTS 1.0 - 以 canal/otter/datax 作为执行引擎

1.3 为什么要切换到Flink?

为了支持多种读端数据源和写端数据源,需要一个统一数据处理框架,以减少重复组件和提高开发效率。同时数据源类型和组件的维护难度与复杂度呈线性增长,现有的组件需要统一维护到一个项目中。

Canal和Otter等组件的社区活跃度低,很长时间没有得到维护更新。因此,需要选择一个新的、活跃的框架。此外,现有组件也无法有效支持全量+增量一体化的操作。

因此,使用一个统一的数据处理框架,能够同时支持多种读端数据源和写端数据源,以及全量+增量一体化的功能,是必要的。这样能够降低组件的维护难度和复杂度,提高开发效率。

通过DTS 2.0,我们希望将canal/otter/datax演化为一个任务执行框架+管理平台,能够为后续大量数据源迭代提速。

1.4 DTS 2.0 以Flink作为执行引擎

现有的开发流程:

  • 统一的任务执行框架,集成flink并引入connectors根据配置组装出具体的DTS任务
  • 维护并研发新的 connector

当我们需要支持新的数据源, 首先将数据源相关插件维护在connector中,接着在执行框架中引入需要的组件,其中存在大量的可复用的功能,这样就做到了connector及功能组件复用的效果。

2 DTS 现有能力

3 我们做了什么?

3.1 DTS Connectors框架 - 数据源支持提速

在Flink CDC基础上实现的全量/增量任务同步框架,基本的架构如下

其中Connector中分别实现了Flink提供的SourceFunction和SinkFunction函数,分别负责从读端读取数据,往写端写入数据,因此一个Connector可同时存在于上游或者下游。

任务的启动流程:

a. 任务的Main函数如下所示, 根据如下的Json文件加载到对应的Connector中的SourceFactory或者SinkFactory来构造对应的DataStream。

DataStream是Flink中提供的数据流操作类
public class Main {    public static void main(String[] args) throws Exception {
        // 解析参数        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String[] parsedArgs = parseArgs(parameterTool);
        Options options = new OptionParser(parsedArgs).getOptions();        options.setJobName(options.getJobName());
        // 执行任务        StreamExecutionEnvironment environment =                EnvFactory.createStreamExecutionEnvironment(options);        exeJob(environment, options);    }

任务Json配置:

{  "job":{    "content":{      "reader":{        "name":"binlogreader",        "parameter":{          "accessKey":"",          "binlogOssApiUrl":"",          "delayBetweenRestartAttempts":2000,          "fetchSize":1,          "instanceId":"",          "rdsPlatform":"",          "restartAttempts":5,          "secretKey":"",          "serverTimezone":"",          "splitSize":1024,          "startupMode":"LATEST_OFFSET"        }      },      "writer":{        "name":"jdbcwriter",        "parameter":{          "batchSize":10000,          "concurrentWrite":true,          ],          "dryRun":false,          "dumpCommitData":false,          "errorRecord":0,          "flushIntervalMills":30000,          "poolSize":10,          "retries":3,          "smallBatchSize":200        }      }    },
  }}

b. 我们提供了两个抽象工厂类,SourceFactory, SinkFactory, 其中的createSource, createSink便是子工厂需要实现的方法,不同的数据源实现不同。

public abstract class SourceFactory<T> {    public abstract DataStream<T> createSource();}public abstract class SinkFactory<T> {    public abstract void createSink(DataStream<T> rowData) throws Exception;}

c. 接下来,我们只需要实现对应的子工厂方法就可以了

public class BinlogSourceFactory extends AbstractJdbcSourceFactory {    @Override    public DataStream<TableRowData> createSource() {
        List<String> tables = this.binlogSourceConf.getConnection().getTable();        Set<String> databaseList = new HashSet<>(2);
        // 使用对应的Connector构建DataStream    }}

d. 通用能力函数:RateLimitFunction, BinlogPositionFunction 其中分别实现了对应的任务能力,例如限流,任务位点保存等。

public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {

    private transient FlinkConnectorRateLimiter rateLimiter;

    @Override    public T map(T value) throws Exception {        if (rateLimiterEnabled) {            rateLimiter.acquire(1);        }        return value;    }

当任务所需的函数都创建完成后,任务就真正开始运行了。

收益:

3.2 RDS日志获取

DTS通过提供增量和全量同步能力为业务提供数据同步功能,但在增量订阅/同步任务执行过程中,可能会遇到一些异常情况。其中,以下三种情况需要特别处理:

  • Binlog可用性

云厂商的数据库实例本地binlog有效期8小时,过期部分进行OSS备份。MySQL业务高峰期或者DDL变产生大量的binlog,  DTS任务尝试获取过期数据失败,任务因此中断。因此,DTS支持了本地binlog+OSS备份binlog的获取及切换,保障日志可用性。

  • 数据库 实例主从切换

RDS经常会发生主备节点切换,在切换的过程中要保证数据不丢。由于切换前后两个数据库实例 Binlog 文件一般都是不一致的,此时任务位点记录方式是 BinlogPosition 模式,则在切换之后任务需要自动进行 Binlog 对齐操作,进而保证数据的完整性。将新数据实例上的位点查询时间戳提前1-2分钟即可。

  • 读实例订阅支持

DTS任务binlog dump连接数过多造成主库压力及影响DDL变更,因此需要支持读库订阅。云厂商的读库不提供备份,在读库日志过期时需要切换到主库进行读取。

3.3 全量增量一体化功能

全量增量一体化是指先同步存量数据,待存量结束之后再开始同步增量数据。其中也加入了增量阶段的OSS备份日志获取。但存量阶段依然存在一些问题,需要进一步改造优化。

3.4 数据源接入- starrocks, postgres等

支持从mysql同步到starrocks和postgres, 在任务执行框架的基础上,只需要开发starrocks-connector, postgres connector支持对应的数据源即可。其中的其他能力,像多表同步、分库分表等场景都可以达到复用的效果。

3.5 JBDC写入改造

脚本扩展和动态表名路由:

数据合并和多线程写入:

3.6 监控告警

DTS任务需要采集flink任务指标,主要包括任务延迟、各个算子阶段的写入速率,算子被压及使用率等。其中 任务延迟需要接入告警服务,于是我们选择了引入redis来缓存任务的延迟时间,再上报到告警服务来完成飞书的消息和电话告警。

4 最佳实践

4.1 0000-00-00 00:00:00时间戳的问题

MySQL的时间戳允许为0000-00-00 00:00:00, 在Flink任务中通常会被转换为null, 导致写入下游数据源失败, 因此需要做特殊标记对于不同的数据源做不同的转化保证写入的正切行。

4.2 Flink CDC 任务 serverId唯一性

Flink CDC source 会伪装成 MySQL slave节点,为了保证数据的准确性,每个slave必须拥有唯一的serverId来标记该slave的唯一性。因此在flink cdc的任务中我们为每一个任务分配了一个唯一的serverId区间(范围区间是为了支持多并行度)。

4.3 Flink任务数据序列化瓶颈

在flink任务中使用DataStreamAPI并使用比较复杂的数据结构进行传输时,算子之间的序列化成本较高,两个方向,一是建立更为高效的数据结构进行传输,二是开启flink对象复用,并尽可能减少不同并行度之间的数据传输。

5 未来演进

DTS作为一个数据同步平台主要功能是尽可能提供高效的数据源同步功能,助力于多变的业务场景。

5.1 基于Flink SQL的ETL任务管理

流式数据处理除了现有的DataStream API还存在SQL的形式,SQL作为一种通用的语言,对于数据相关的业务同学极大的降低了学习成本。而通过Flink SQL可以做到的ETL流式数据加工也能解决一些复杂业务场景的处理逻辑,将业务逻辑转化为DAG的流式处理图,通过拖拽的方式也能方便使用,FLINK SQL的演进方向能够和现有的Flink DataStream API互补。

应用场景:ETL强大的流式数据转换处理能力大幅提升数据集成效率,也能建实时报表体系,提高分析效率,同时也可以应用于一些实时大屏的场景。

5.2 统一技术栈

将现有的DTS能力都迁移到Flink平台上,保持统一的技术栈,能够极大的降低维护成本。现有遗留的双向同步、数据比对等能力需要做进一步的改造和迁移,符合整体技术收敛的趋势。

6 总结

本文主要分享了以下几个方面:Flink相比现有的技术栈带来的收益,切换到Flink以后的迭代方向及架构功能上的变更、带来新的问题如何解决,以及未来的一些迭代方向,希望能让大家有所收获。

*文/风子

本文属得物技术原创,更多精彩文章请看:得物技术官网

未经得物技术许可严禁转载,否则依法追究法律责任!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/748970.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt6之通用文件格式.dat

dat文件&#xff0c;在Windows中大量存在&#xff0c;到处都有。如下图edge浏览器安装目录下就有一个dat文件&#xff0c;如果你强行打开&#xff0c;发现它是乱码的。 一、什么是dat文件 DAT 文件格式只是一种通用格式&#xff0c;它在文件中包含任何类型的数据&#xff0c;它…

使用 TensorRT、卡尔曼滤波器和 SORT 算法进行实时对象检测和跟踪:第 1 部分训练模型

实时物体检测和跟踪在监控、自动驾驶和机器人等各种应用中至关重要。这些任务需要能够实时处理高分辨率视频流的高效算法。近年来,基于深度学习的目标检测算法(例如YOLO、SSD和Faster R-CNN)在图像和视频中的目标检测和定位方面显示出了令人印象深刻的结果。然而,这些算法的…

JVM面试题详解

JVM介绍 JVM是什么&#xff1f; JVM由哪些部分组成&#xff1f;运行流程是什么&#xff1f; JVM组成 什么是程序计数器 你能给我详细的介绍Java堆吗&#xff1f; 什么是虚拟机栈&#xff1f; 堆栈的区别是什么&#xff1f; 能不能解释一下方法区 你听过直接内存吗 类加载器 …

tx视频 wx小程序 视频缓存方案

本文所有教程及源码、软件仅为技术研究。不涉及计算机信息系统功能的删除、修改、增加、干扰,更不会影响计算机信息系统的正常运行。不得将代码用于非法用途,如侵立删!tx视频 wx小程序 视频缓存方案 环境 win10CharlesInternet Download Manager抓包分析 搜索关键词可以很容…

《生活教育》期刊简介及投稿邮箱

《生活教育》期刊简介及投稿邮箱 《生活教育》杂志创办于1934&#xff0c;是中华人民共和国教育部主管的国家重点学术期刊&#xff0c;国家级期刊&#xff0c;中国知网全文收录G4期刊&#xff0c;它的理论是陶行知教育思想的主线和重要基石&#xff0c;陶行知的教育理论&#…

【力扣算法06】之 _2544_ 交替数字和- python

文章目录 问题描述示例1示例 2示例 3提示 思路分析代码分析完整代码运行示例效果截图 完结 问题描述 给你一个正整数 n 。n 中的每一位数字都会按下述规则分配一个符号&#xff1a; 最高有效位 上的数字分配到 正 号。 剩余每位上数字的符号都与其相邻数字相反。 返回所有数字及…

这5款多 GPU 渲染器提升渲染能力,赶紧用起来

多 GPU 渲染的强大功能可以使您的渲染能力增加一倍、三倍或更多。使用合适的多GPU渲染引擎可以帮助您在短时间内以更快的速度获得高质量的产品。什么是真正的多 GPU 渲染&#xff1f;最好的多 GPU 渲染引擎是什么&#xff1f; 什么是多 GPU 渲染和多 GPU 渲染引擎&#xff1f;…

编译mysql8.0.33遇见错误libstdc++.so.6: version CXXABI_1.3.9 not found

1 现象 在执行make编译mysql8.0.33时&#xff0c;遇见报错信息&#xff1a; /lib64/libstdc.so.6: version CXXABI_1.3.9 not found&#xff0c; 查看版本发现只是1.3.7。 2 解决方案 2.1 解决方案1 安装glibc-2.18。 2.1 解决方案2 执行如下指令&#xff0c;将新文件替…

[微信小程序] movable-view 可移动视图容器 - 范围问题

movable-view 可移动视图容器 可移动视图容器&#xff0c;在页面中可以拖拽滑动。movable-view必须在 movable-area 组件中&#xff0c;并且必须是直接子节点 <view><movable-area style"width: 750rpx;height: 200rpx;background-color: gainsboro;">&l…

【力扣算法10】之 7. 整数反转 python

文章目录 问题描述示例1示例2示例3示例4提示 思路分析代码分析完整代码详细分析运行效果截图调用示例运行结果 完结 问题描述 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。 如果反转后整数超过 32 位的有符号整数的范围 [−231, 231 − 1] &…

(Onenet)STM32L+BC20+MQTT协议传输温湿度,ADC,电压,GPS数据到Onenet物联网平台

1、材料准备 准备以下材料 2、设备连接 2.1 插入物联网卡 首先把BC20核心板从开发板上拆下来 然后将物联卡放置在BC20核心板内 物联卡放置完成将BC20核心板重新插入到开发板内&#xff08;注意不要弄错方向&#xff09; 同时接入天线 2.2 连接ST-Link仿真器 用3条杜邦线接…

python接口自动化(三十四)-封装与调用--函数和参数化(详解)

简介 前面虽然实现了参数的关联&#xff0c;但是那种只是记流水账的完成功能&#xff0c;不便于维护&#xff0c;也没什么可读性&#xff0c;随着水平和技能的提升&#xff0c;再返回头去看前边写的代码&#xff0c;简直是惨不忍睹那样的代码是初级入门的代码水平都达不到。接下…

股票开户手续费最低的券商有哪些?揭秘券商成本佣金,交易费率原来可以这么低!

哪个券商手续费最低实际上是没有这种说法的&#xff0c;想要低手续费一定要联系线上客户经理办理开户&#xff0c;是可以和线上客户经理协商降低手续费的&#xff0c;一定要通过线上客户经理特殊渠道开户&#xff0c;才能享受低手续费账户&#xff0c;还可以办理到vip账户&…

Python中可以用三种方法判断文件是否存在

目录 前言1.使用os模块2.使用Try语句3. 使用pathlib模块尾语 &#x1f49d; 前言 大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 通常在读写文件之前&#xff0c;需要判断文件或目录是否存在&#xff0c;不然某些处理方法可能会使程序出错。 所以最好在做任何操作之前&#xff…

spring复习:(22)实现了BeanNameAware等Aware接口的bean,相应的回调方法是在哪里被调用的?

AbstractAutowireCapableBeanFactory的doCreateBean用来创建bean, 其中调用了initializeBean方法对bean进行初始化 initializeBean包含如下代码&#xff1a; 而invokeAwareMethods代码如下&#xff1a; 可见其分别判断是否实现了BeanNameAware接口、BeanClassLoaderAware接口…

【Linux指令sudo与Linux编译器-gcc/g++】

目录 一、sudo1.1为什么我用不了sudo命令1.2如何将普通用户添加到信任列表里面 二、Linux编译器-gcc/g使用2.1背景知识2.2gcc如何完成上面的4个功能&#xff1f;2.3函数库 一、sudo sudo是linux系统管理指令&#xff0c;是允许系统管理员让普通用户执行一些或者全部的root命令…

金融行业数据安全相关法律法规及政策汇总

如今随着大数据和“互联网”等新兴技术的拓展&#xff0c;数据的作用也在不断凸显&#xff0c;金融业是产生和积累数据量最大、数据类型最丰富的行业之一&#xff0c;数据安全与个人信息保护在新时代也面临新的风险与挑战。 根据《中国银行保险报》与亚信网络安全产业技术研究院…

CCPC 2022济南站+2020威海站+2019秦皇岛站+2022绵阳站(部分)(暑假集训)

目录 Tower Frozen Scoreboard Identical Parity​编辑 Best Carry Player Stack Sort Invoker MUV LUV EXTRA Forest Program Angle Beats Catch You Catch Me Tower 思路&#xff1a; 提前进行/2的预处理操作&#xff0c;操作之后 &#xff0c;在记录下要达到一样的步骤…

JavaFX 基础

1.JavaFX应用的程序基本结构 窗口(stage)&#xff1a;窗口里面可以设置场景&#xff0c;但是一次只能显示一个场景。 场景(scene)&#xff1a;场景可以添加UI组件图的树形结构。 根节点(parent)&#xff1a;根节点设置为一个布局&#xff0c;放置不同的节点(组件)。可以在根节…

备战秋招 | 笔试强训2

目录 一、选择题 二、编程题 三、选择题题解 四、编程题题解 一、选择题 1、使用printf函数打印一个double类型的数据&#xff0c;要求&#xff1a;输出为10进制&#xff0c;输出左对齐30个字符&#xff0c;4位精度。以下哪个选项是正确的&#xff1f; A. %-30.4e B. %4.…