技术干货|如何利用 ChunJun 实现数据离线同步?

news2024/12/23 16:12:49

ChunJun 是⼀款稳定、易⽤、⾼效、批流⼀体的数据集成框架,基于计算引擎 Flink 实现多种异构数据源之间的数据同步与计算。ChunJun 可以把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从⽽为企业提供全⾯的数据共享,目前已在上千家公司部署且稳定运⾏。

在之前,我们曾经为大家介绍过如何利用 ChunJun 实现数据实时同步(点击看正文),本篇将为大家介绍姊妹篇,如何利⽤ ChunJun 实现数据的离线同步。

ChunJun 离线同步案例

离线同步是 ChunJun 的⼀个重要特性,下⾯以最通⽤的 mysql -> hive 的同步任务来介绍离线同步。

配置环境

找⼀个空⽬录,接下来要配置 Flink 和 ChunJun 的环境,下⾯以 /root/chunjun_demo/ 为例⼦。

● 配置 Flink

下载 Flink

wget "http://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"
tar -zxvf chunjun-dist.tar.gz

● 配置 ChunJun

#下载 chunjun, 内部依赖 flink 1.12.7
wget https://github.com/DTStack/chunjun/releases/download/v1.12.8/chunjun-dist-1.12-SNAPSHOT.tar.gz
#新创建⼀个⽬录
mkdir chunjun && cd chunjun
#解压到指定⽬录
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz

解压好的 ChunJun 有如下⽬录: bin chunjun-dist chunjun-examples lib

● 配置环境变量

#配置 Flink 环境变量
echo "FLINK_HOME=/root/chunjun_demo/flink-1.12.7" >> /etc/profile.d/sh.local
#配置 Chunjun 的环境变量
echo "CHUNJUN_DIST=/root/chunjun_demo/chunjun/chunjun-dist" >> /etc/profile.d/sh.local
#刷新换新变量
. /etc/profile.d/sh.local

● 在 Yarn 上⾯启动 Flink Session

#启动 Flink Session
bash $FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_DIST -d

输出如下:

echo "stop" | $FLINK_HOME/bin/yarn-session.sh -id application_1683599622970_0270
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
yarn application -kill application_1683599622970_0270

下⾯提交任务会⽤到 Flink Session 这个 Yarn Application Id (application_1683599622970_0270)。

● 其他配置

如果⽤ parquet 格式,需要把 flink-parquet_2.12-1.12.7.jar 放⼊到 flink/lib 下⾯, 在上⾯的例⼦中,需要放到 $FLINK_HOME/lib ⾥⾯。

file

提交任务

● 在 MySQL 准备数据

-- 创建⼀个名为ecommerce_db的数据库,⽤于存储电商⽹站的数据
CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 创建⼀个名为orders的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS orders (
 id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主键
 order_id VARCHAR(50) NOT NULL, -- 订单编号,不能为空
 user_id INT NOT NULL, -- ⽤户ID,不能为空
 product_id INT NOT NULL, -- 产品ID,不能为空
 quantity INT NOT NULL, -- 订购数量,不能为空
 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
-- 订单⽇期,默认值为当前时间戳,不能为空
);
-- 插⼊⼀些测试数据到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
       ('ORD124', 2, 102, 1),
       ('ORD125', 3, 103, 3),  
       ('ORD126', 1, 104, 1),
       ('ORD127', 2, 105, 5);

select * from chunjun.orders;       

如果没有 MySQL 的话,可以⽤ docker 快速创建⼀个。

docker pull mysql:8.0.12
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.12

● 创建 Hive 表

CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 创建⼀个名为orders的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS chunjun.orders (
 id INT,
 order_id VARCHAR(50),
 user_id INT,
 product_id INT,
 quantity INT,
 order_date TIMESTAMP
)
 STORED AS PARQUET;
-- 查看 hive 表,底层的 HDFS ⽂件位置,下⾯的 SQL 结果⾥⾯ Location 字段,就是 HDFS ⽂件的位置。
desc formatted chunjun.orders;
-- Location: hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders
-- ⼀会配置同步任务的时候会⽤到 hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders

● 在当前⽬录( /root/chunjun_demo/ ) 配置⼀个任务 mysql_hdfs.json

vim mysql_hdfs.json 输⼊如下内容:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
 }
 ],
"username": "root",
"password": "123456",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ]
 },
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ],
"writeMode": "overwrite",
"encoding": "utf-8",
"fileType": "parquet",
"fullColumnName":
 [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"],
"fullColumnType":
 [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
 },
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {
"record": 0
 },
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

因为我们要将 MySQL 同步到 Hive ⾥⾯,但是如果直接同步 Hive 的话,内部会⽤ jdbc,⽽ jdbc 的效率不⾼,因此我们可以直接把数据同步到 Hive 底层的 HDFS 上⾯,所以 writer ⽤到了 hdfswriter。脚本解析如下:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connectionComment": "数据库链接, 数据库, 表, 账号, 密码",
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": [ "jdbc:mysql://172.16.85.200:3306/chunjun" ],
"table": [ "orders" ]
 }
 ],
"username": "root",
"password": "123456",
"columnComment": "要同步的列选择, 可以选择部分列",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ]
 },
"nameComment" : "source 是 mysql",
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"pathComment": "HDFS 上⾯的路径, 通过 hive 语句的 desc formatted 查看",
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfigComment": "是 hdfs ⾼可⽤最基本的配置, 在 Hadoop 配置⽂件 hdfs-site.xml 可以找到",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"columnComment": "要同步的列选择, 可以选择部分列",
"column": [
 { "name": "id", "type": "INT" },
 { "name": "order_id", "type": "VARCHAR" },
 { "name": "user_id", "type": "INT" },
 { "name": "product_id", "type": "INT" },
 { "name": "quantity", "type": "INT" },
 { "name": "order_date", "type": "TIMESTAMP" }
 ],
"writeModeComment": "覆盖写⼊到 hdfs 上⾯的⽂件, 可选 overwrite, append(默认模式)",
"writeMode": "overwrite",
"encoding": "utf-8",
"fileTypeComment": "可选 orc, parquet, text",
"fileType": "parquet",
"fullColumnNameComment": "全部字段,有时候 column ⾥⾯同步部分字段,但是⼜需要有全部字段的格式,例如 fileType : text ",
"fullColumnName": [ "id", "order_id", "user_id", "product_id", "quantity", "order_date"], 
"fullColumnTypeComment": "全部字段的类型",
"fullColumnType": [ "INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP" ]
 },
"nameComment" : "sink 是 hdfs",
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {
"record": 0
 },
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

● 提交任务

bash chunjun/bin/chunjun-yarn-session.sh -job mysql_hdfs.json -confProp
{\"yarn.application.id\":\"application_1683599622970_0270\"}

● 查看任务

file file

任务同步完成, 可以看⼀下 HDFS 上⾯的数据。

file

查看⼀下 Hive 表的数据。

file

注意, 如果是分区的 Hive 表,需要⼿动刷新⼀下 Hive 的元数据, 使⽤ MSCK 命令。(MSCK 是 Hive 中的⼀个命令,⽤于检查表中的分区,并将其添加到 Hive 元数据中)

MSCK REPAIR TABLE my_table;

ChunJun 离线同步原理解析

HDFS 文件同步原理

· 对于⽂件系统,同步的时候会先把⽂件写⼊到 path + [filename] ⽬录⾥⾯的 .data 的⽂件⾥⾯,如果任务失败,那么 .data ⾥⾯的⽂件不会⽣效。

· 在 TaskManager 上⾯所有 task 任务结束的时候,会在 JobManager 执⾏ FinalizeOnMaster 的 finalizeGlobal ⽅法, 最终会调⽤到 moveAllTmpDataFileToDir , 把 .data ⾥⾯的⽂件移除到 .data 的上⼀层。

public interface FinalizeOnMaster {

/**
The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
Params:parallelism – The parallelism with which the format or functions was run.
Throws:IOException – The finalization may throw exceptions, which may cause the job to abort.
*/
void finalizeGlobal(int parallelism) throws IOException; 
}
// 在 JobManager 执⾏
@Override
protected void moveAllTmpDataFileToDir() {
if (fs == null) {
openSource();
 }
String currentFilePath = "";
try {
Path dir = new Path(outputFilePath);
Path tmpDir = new Path(tmpPath);

FileStatus[] dataFiles = fs.listStatus(tmpDir);
for (FileStatus dataFile : dataFiles) {
currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
 }
fs.delete(tmpDir, true);
 } catch (IOException e) {
throw new ChunJunRuntimeException(
String.format(
"can't move file:[%s] to dir:[%s]", currentFilePath, outputFilePath),
e);
 }
}

增量同步

增量同步主要针对某些只有 Insert 操作的表,随着业务增⻓,表内数据越来越多。如果每次都同步整表的话,消耗的时间和资源会⽐较多。因此需要⼀个增量同步的功能,每次只读取增加部分的数据。

● 实现原理

其实现原理实际上就是配合增量键在查询的 sql 语句中拼接过滤条件,⽐如 where id > ? ,将之前已经读取过的数据过滤出去。

增量同步是针对于两个及以上的同步作业来说的。对于初次执⾏增量同步的作业⽽⾔,实际上是整表同步,不同于其他作业的在于增量同步作业会在作业执⾏完成后记录⼀个 endLocation 指标,并将这个指标上传到 prometheus 以供后续使⽤。

除第⼀次作业外,后续的所有增量同步作业都会取上⼀次作业的 endLocation 做为本次作业的过滤依据(startLocation)。⽐如第⼀次作业执⾏完后,endLocation 为10,那么下⼀个作业就会构建出例如 SELECT id,name,age from table where id > 10 的 SQL 语句,达到增量读取的⽬的。

● 使用限制

· 只有 RDB 的 Reader 插件可以使⽤

· 通过构建SQL过滤语句实现,因此只能⽤于RDB插件

· 增量同步只关⼼读,不关⼼写,因此只与Reader插件有关

· 增量字段只能为数值类型和时间类型

· 指标需要上传到 prometheus,⽽ prometheus 不⽀持字符串类型,因此只⽀持数据类型和时间类型,时间类型会转换成时间戳后上传

· 增量键的值可以重复,但必须递增

· 由于使⽤ '>' 的缘故,要求字段必须递增

断点续传

断点续传是为了在离线同步的时候,针对⻓时间同步任务如超过1天,如果在同步过程中由于某些原因导致任务失败,从头再来的话成本⾮常⼤,因此需要⼀个断点续传的功能从任务失败的地⽅继续。

● 实现原理

· 基于 Flink 的 checkpoint,在 checkpoint 的时候 会存储 source 端最后⼀条数据的某个字段值,sink 端插件执⾏事务提交。

· 在任务失败,后续通过 checkpoint 重新运⾏时,source 端在⽣成 select 语句的时候将 state ⾥的值作为条件拼接进⾏数据的过滤,达到从上次失败位点进⾏恢复。

file · jdbcInputFormat 在拼接读取 SQL 时,如果从 checkpoint 恢复的 state 不为空且 restoreColumn 不为空,则此时会将 checkpoint ⾥的 state 作为起点开始读取数据。

● 适用场景

通过上述原理我们可以知道 source 端必须是 RDB 类型插件,因为是通过 select 语句拼接 where 条件进⾏数据过滤达到断点续传的,同时断点续传需要指定⼀个字段作为过滤条件,且此字段要求是递增的。

· 任务需要开启 checkpoint

· reader 为 RDB 的插件均⽀持且 writer ⽀持事务的插件(如 rdb filesystem 等),如果下游是幂等性则 writer 插件也不需要⽀持事务

· 作为断点续传的字段在源表⾥的数据是递增的,因为过滤条件是 >

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

同时,欢迎对大数据开源项目有兴趣的同学加入我们,一起交流最新开源技术信息,号码:30537511,项目地址:https://github.com/DTStack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/543841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

针对电子企业的仓储需求,提出WMS仓储管理系统解决方案

随着电子行业的快速发展,仓储管理已经成为电子企业日常运营中不可或缺的一环。然而,由于缺乏有效的仓储管理系统,电子企业经常面临库存不准确、库存滞销等问题。这就是电子企业仓储管理面临的严重问题,引出了需要提出一套有效的仓…

【每日一题Day211】LC1079活字印刷 | 回溯 计数dp

活字印刷【LC1079】 你有一套活字字模 tiles,其中每个字模上都刻有一个字母 tiles[i]。返回你可以印出的非空字母序列的数目。 **注意:**本题中,每个活字字模只能使用一次。 我反正是写的相当暴力 计数回溯 思路: 为了构成不同的…

win10设置notepad++默认打开txt - 两步解决

第一步:Notepad注册.txt 以管理员的方式打开notepad 步骤:打开设置 -> 首选项 -> 文件关联 双击.txt .txt移动到注册框即可 第二步 设置Notepad默认打开 按照以下步骤将Notepad设置为默认打开.txt文件: 右键单击任何一个.txt文件…

车规级MCU芯片

作为车辆控制的核心器件,MCU主要用于车身控制、驾驶控制、信息娱乐和驾驶辅助系统。 8位MCU :提供低端控制功能:风扇控制、空调控制、雨刷、天窗、 车窗升降、低端仪表盘、集线盒、座椅控制、门控模块。 16位MCU :提供中端控制功能:用于动力…

生物信息学——用好源代码的技巧与心法

如果你是一名科研人员,在研究的过程中需要用到代码,那么你可能不需要像专业码农那样从头到尾一句一句去写完整的,而是可以将网上的一段符合应用场景的现成代码拿过来直接用。 这听起来是不是很简单?然而实际上... 目前&#xff0c…

Tuxera NTFS2023苹果电脑专用磁盘读写软件

Tuxera NTFS for Mac是苹果上专门的NTFS磁盘读写工具,帮助Mac用户解决苹果操作系统读写U盘、硬盘等NTFS格式分区的磁盘的困难。其实,最早读写NTFS软件是Tuxera NTFS-3G,但是当时是开源的(直接让用户免费使用的)&#x…

ubuntu1804替换系统的cups后,启动cups时报错 undefined symbol:_cupsMessageSave。。。

开发环境: Ubuntu18.04 cups-2.2.7 最终要将cups-2.2.7替换为cups-.2.3.3 好,在编译完cups后,对系统的cups进行替换,,此操作已完成。。。。 接下来,启动cups,发现启动失败。。 紧接着执行 jo…

《Java 核心技术面试》课程笔记(九)

对比 Hashtable、HashMap、TreeMap 有什么不同? 典型回答 Hashtable、HashMap、TreeMap 都是最常见的⼀些 Map 实现,是以键值对的形式存储和操作数据的容器类型。Hashtable 是早期 Java 类库提供的⼀个哈希表实现,本身是同步的,…

GE H201TI 全系统自检和自诊断

Hydran 201Ti是一个小型在线预警发射器。它永久安装在变压器上,将为工作人员提供各种故障气体复合值的单一ppm读数,以提醒他们潜在的问题。 可以下载该值,并且可以将警报设置在预定水平,以提醒人员并能够监控发展中的故障状况。 …

作为一位php程序员应具要有那些能力

最近,随着信息技术的发展,更多的人开始关注PHP程序员的职业发展,并渴望成为一名高薪稳定就业的PHP程序员。但是,想要成为一名优秀的PHP程序员,并不仅仅需要掌握PHP的语言基础,还需要具备其他的技能和能力。…

本地代码提交到gitee

提交步骤 注意:该步骤需要使用git工具,请提前下载 参考文章1:如何将本地代码上传到 gitee 该博客包含了gitee创建仓库流程 参考文章2:Git push命令报hint: Updates were rejected because the remote contains work that you do问…

【论文阅读笔记】Federated Unlearning with Knowledge Distillation

个人阅读笔记,如有错误欢迎指出 Arxiv 2022 [2201.09441] Federated Unlearning with Knowledge Distillation (arxiv.org) 问题: 法律要求客户端有随时要求将其贡献从训练中消除的权利 让全局模型忘记特定客户的贡献的一种简单方法是从头开始对模型进…

【机器视觉4】双目立体视觉标定

双目立体视觉标定的目的是标定左、右两个摄像机之间的坐标转换关系。 双目立体视觉的标定过程:采用MATLAB图像处理和计算机视觉库中的 Stereo Camera Calibrator(SCC)来标定双目立体视觉系统中左、右摄像机并获得左右摄像机的内参矩阵 M 1 M_1 M1​、 M 2 M_2 M2​…

Midjourney|文心一格prompt教程[技巧篇]:生成多样性、增加艺术风格、图片二次修改、渐进优化、权重、灯光设置等17个技巧等你来学

Midjourney|文心一格prompt教程[技巧篇]:生成多样性、增加艺术风格、图片二次修改、渐进优化、权重、灯光设置等17个技巧等你来学 1.技巧一:临摹 我认为学习图片类的 prompt,跟学习画画是类似的,最好的学习方法不是直接用模板。…

【FMC155】2 路14-bit、500MSPS/1GSPS/1.25GSPS 直流耦合ADC 同步采集FMC 子卡模块(AD9680)中文资料

板卡概述 FMC155 是一款基于VITA57.1 标准的,实现2 路14-bit、500MSPS/1GSPS/1.25GSPS 直流耦合ADC 同步采集FMC 子卡模块。该模块遵循VITA57.1 规范,可直接与FPGA 载卡配合使用,板卡ADC 器件采用ADI 的AD9680 芯片,该芯片具有两…

结合Sqoop练习一下columns、where和query参数

1、前期的数据准备 1》创建一个学生表 create table student(id char(30),name char(30),age int,phone char(100),address char(100)); 2》插入数据 insert into student values("1001","zhanghuan","21","1111","guiyang&q…

wpf工程中加入Hardcodet.NotifyIcon.Wpf生成托盘

1、在项目中用nuget引入Hardcodet.NotifyIcon.Wpf。如下图所示。 2、在App.xaml中创建托盘界面&#xff0c;代码是写在 App.xaml 里面 注意在application中一定要加入这一行代码&#xff1a; xmlns:tb"http://www.hardcodet.net/taskbar" 然后在<Application.Re…

科研热点| 慎投!3本期刊被剔除SCIE, 5月SCIE/SSCI目录已更新 (附下载)~

2023年5月18日&#xff0c;科睿唯安更新了Web of Science核心期刊目录&#xff0c;此次更新后SCIE期刊目录共包含9503本期刊&#xff0c;SSCI期刊目录共包含3557本期刊。此次SCIE & SSCI期刊目录更新&#xff0c;与上次更新&#xff08;2023年4月&#xff09;相比&#xff…

NestedFormer:用于脑肿瘤分割的嵌套模态感知Transformer

文章目录 NestedFormer: Nested Modality-AwareTransformer for Brain Tumor Segmentation摘要方法Global Poolformer EncoderNested Modality-Aware Feature AggregationModality-Sensitive Gating 实验结果 NestedFormer: Nested Modality-AwareTransformer for Brain Tumor …

谈一谈CMDB

大家好&#xff0c;我是易安&#xff01;今天我们谈一谈运维相关的话题&#xff0c;配置管理&#xff0c;专业一点就叫作 CMDB&#xff08;Configuration Management DataBase&#xff09;。 概念 CMDB并不是一个新概念&#xff0c;它源于ITIL&#xff08;Information Technol…