Datax 数据同步-使用总结(二)

news2025/1/16 5:57:15

一、前言

这部分主要记录 datax 实现增量同步的方案。

二、核心思路

结合datax 提供的preSql、 postSql以及占位符,外加另外一张表同步日志表来记录相关同步信息。

三、版本迭代

3.1 初版本

where tbq.opera_date > cast(date_format(DATE_SUB(NOW(), interval 5 minute), '%Y%m%d%H%i%s000') as unsigned)"

这个版本,是直接以执行时时间为时间戳。
缺点,显而易见。当同步时间比较久的时候,5 分钟就远远不够。

3.2 版本

阅读 datax 的使用说明里,对于 mysql 的写,支持 presql 和 postsql 的方式。

因此考虑新建一个表,

  1. 在同步之前,利用 preSql,往该表中插入一条数据记录,记录同步开始时间。
  2. 同步完成后,利用 postSql 更新当前同步的这条记录,记录同步结束时间
  3. 读取时,从该表中获取上次同步开始时间的数据,作为同步时间戳。
    最终 json 脚本变成如下
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            xxxx
			],
                        "connection": [
                            {
                                xxxx
                            }
                        ],
			"where":" update_date > (select l.sync_start_date from sys_sync_log l where l.sync_business_type = 'gongdan' and l.sync_result = 1 order by l.sync_start_date desc limit 1)",
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            xxx
			],
                        "connection": [
                          {
                                xxxx
                          }
                        ],
			"preSql":[
				"insert into sys_sync_log(sync_start_date,sync_result) values(now(),2)"
			],
			"postSql":[
				"update sys_sync_log l set l.sync_end_date = now(),l.sync_result = 1 where l.id = ( select t.id from (select l1.id from sys_sync_log l1 where l1.sync_result = 2 order by l1.sync_start_date desc limit 1) t )"
			],
			"writerMode":"replace"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

此版本相对于上个版本,时间戳的获取上,比较固定,能避免因为同步代码问题,导致时间戳获取不准。

3.3 版本

上述版本写的相对复杂,需要先查询当前同步记录之后,再更新同步结束时间。无法保证一致性,即preSql 的插入的记录和 postSql 更新记录,可能不是同一个记录。
再结合 datax 的占位符特性,可以将记录的主键由外部传入。
因此 json 脚本变成

{
	"preSql":[
		"insert into sys_sync_log(id,sync_start_date,sync_result) values('${logId}',now(),2)"
	],
	"postSql":[
		"update sys_sync_log l set l.sync_end_date = now(),l.sync_result = 1 where l.id = '${logId}'"
	],
}

其中 ${logId}为占位符
liunx 中通过 uuidgen 命令可以获取 uuid。
因此执行同步脚本时,参考如下命令执行即可

python ../bin/datax.py -p "-DlogId=`uuidgen`" ./ssss.json

其中-p “-DlogId=uuidgen” 为获取 uuid,并传给 sss.json中
这个版本,可以保证 preSql 和 postSql 处理的记录,是同一条。

四、扩展

应该还有更优方案,还需继续研究。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1012833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图论第四天|127. 单词接龙、841. 钥匙和房间、463. 岛屿的周长

127. 单词接龙 ★ 文档讲解 :代码随想录 - 127. 单词接龙 状态:开始学习。(★:需要多次回顾并重点回顾) 思路: 本题需要解决两个问题: 图中的线是如何连在一起的 题目中并没有给出点与点之间的…

【Robotframework+python】实现http接口自动化测试

前言 下周即将展开一个http接口测试的需求,刚刚完成的java类接口测试工作中,由于之前犯懒,没有提前搭建好自动化回归测试框架,以至于后期rd每修改一个bug,经常导致之前没有问题的case又产生了bug,所以需要…

长城汽车,能打“持久战”吗?

文丨智能相对论 作者丨沈浪 百年汽车工业史正在进入一个全新的发展阶段:油改电的变革仍在激化,智能化的探索才刚刚起步,汽车产品将以什么样的面貌展现在世人面前,市场格局又将迎来怎样的变化?无人可知。 然而&#…

【Linix-Day12-线程同步和线程安全】

线程同步 和 线程安全 线程同步 除了信号量和互斥锁(互斥锁和条件变量上次介绍过),还有两种方式同步 1.读写锁 当同时对一块内存读写时,会出现下列问题,故而引入读写锁 接口介绍: 1.int pthread_rwloc…

PostgreSQL 事务并发锁

文章目录 PostgreSQL 事务大家都知道的 ACID事务的基本使用保存点 PostgreSQL 并发并发问题MVCC PostgreSQL 锁机制表锁行锁 总结 PostgreSQL 事务 大家都知道的 ACID 在日常操作中,对于一组相关操作,通常要求要么都成功,要么都失败。在关系…

Windows PHP 将 WORD转PDF,执行完成后 释放进程

Windows PHP 将 WORD转PDF,执行完成后 释放进程 word转PDF清理任务进程 【附赠彩蛋】每次PHP执行完word转pdf之后,在任务进程中都会生成并残留WINWORD.EXE进程,时间久了,服务器就会越来原卡,本文完整的讲述怎么转PDF和转换之后的操作。 word转PDF /**$doc 传入完整的doc路…

flutter run长时间卡在Running Gradle task “assembleDebug“问题解决

1.下载离线gradle, 在android>>gradle>>wrapper 中找到gradle-wrappper.properties 可以看到要下载的gradle的版本 下载官方链接,更改url的版本号就好 Gradle | Thank you for downloading Gradle! 在android>>gradle>>wrapper 中找到gradle-wra…

【C++从0到王者】第三十二站:异常

文章目录 一、C语言传统的处理错误的方式二、C异常概念三、异常的使用四、异常的抛出与捕获1.异常的抛出原则2.在函数调用链中异常栈展开匹配原则 五、实际应用中的异常使用六、C标准库的异常体系七、异常规范八、异常安全九、异常的优缺点总结 一、C语言传统的处理错误的方式 …

计网第五章(运输层)(四)(TCP的流量控制)

一、基本概念 流量控制就是指让发送方的发送速率不要太快,使得接收方来得及接收。可以使用滑动窗口机制在TCP连接上实现对发送方的流量控制。 注意:之前在讨论可靠传输时,讨论过选择重传协议和回退N帧协议都是基于滑动窗口的机制上进行实现…

学生在线查询系统

在教育管理中,学生查询系统是一个必不可少的工具,它能够方便学生、家长和教师快速获取学生的各项信息。而易查分作为一个功能强大的在线查询工具,能够帮助教育机构快速搭建一个高效便捷的学生查询系统。通过注册易查分账号,创建查…

Java毕业设计 SSM SpringBoot 水果蔬菜商城

Java毕业设计 SSM SpringBoot 水果蔬菜商城 SSM 水果蔬菜商城 功能介绍 首页 图片轮播 关键字搜索商品 分类菜单 折扣大促销商品 热门商品 商品详情 商品评价 收藏 加入购物车 公告 留言 登录 注册 我的购物车 结算 个人中心 我的订单 商品收藏 修改密码 后台管理 登录 商品…

element ui - el-table 表头筛选

element ui - el-table 表头筛选 前言**场景**:根据表头筛选出表格中符合条件的数据;**效果**: 情况一:表格没有分页方法代码 前言 场景:根据表头筛选出表格中符合条件的数据; 效果: 筛选结果…

代码随想录--栈与队列-用栈实现队列

使用栈实现队列的下列操作: push(x) -- 将一个元素放入队列的尾部。 pop() -- 从队列首部移除元素。 peek() -- 返回队列首部的元素。 empty() -- 返回队列是否为空。 需要两个栈一个输入栈,一个输出栈,这里要注意输入栈和输出栈的关系。 i…

CSDN中,如何创建目录或标题

创建目录或标题 1.复制,自动生成目录2.复制,自动生成标题3.CSDN标准写法如下图 1.复制,自动生成目录 [TOC]或 [TOC](这里写目录标题) # 一级目录 ## 二级目录 ### 三级目录2.复制,自动生成标题 # 一级目录 ## 二级目录 ### 三级目…

Java 多种获取项目路径下的文件

目标文件放在项目的resources文件夹下 的 mytxt文件里面,文件名叫 file Test.txt: 其实可以看到,项目运行后,这个文件被丢到了target文件夹下: 拿到这个文件的 InputStream : 比如我们在FileUtil里面写个获…

懒人制作企业期刊的秘籍

企业期刊是展示企业文化、提升形象、传递信息的重要工具。但是,制作企业期刊需要投入大量的时间和精力,对于忙碌的企业来说是一项艰巨的任务。 所以肯定也有人需要一款不会花费大量时间就能制作出高级感的企业期刊,大家不妨试试FLBOOK在线制…

Feign远程接口调用

概述 目的:解决微服务调用问题。如何从微服务A调用微服务B提供的接口。 特性: 声明式语法,简化接口调用代码开发。像调用本地方法一样调用其他微服务中的接口。集成了Eureka服务发现,可以从注册中心中发现微服务。集成了Spring…

SpringBoot:返回响应,统一封装

说明 接口的返回响应,封装成统一的数据格式,再返回给前端。 返回响应,统一封装实体,数据结构如下。 代码 package com.example.core.model;import io.swagger.v3.oas.annotations.media.Schema; import lombok.*;/*** 返回响应…

英飞凌TC3xx--深度手撕HSM安全启动(四)--TC3xx HSM使能和配置技巧

上一章,我们简单聊了下英飞凌TC3xx的HSM的系统框架、相关UCB、Host和HSM通信模块。今天着重分析HSM的使能。 1. 系统引入HSM的思考 为什么要增加HSM 信息安全方面考虑,系统的安全启动、ECU之间安全数据的交互、ECU内部的敏感信息保存 TC3xx使能HSM后,HSM的代码应该…