NIFI实现数据库数据增量同步

news2024/11/27 0:31:00

说明

nifi版本:1.23.2(docker镜像)

需求背景

将数据库中的数据同步到另一个数据库中,要求对于新增的数据和历史有修改的数据进行增量同步

模拟数据

建表语句

源数据库和目标数据库结构要保持一致,这样可以避免后面单独转换

-- 创建测试表
CREATE TABLE `sys_user` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',
  `age`  int NOT NULL DEFAULT 0 COMMENT '年龄',
  `gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

测试数据

-- 模拟数据
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据1', 20, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据2', 21, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据3', 21, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据4', 18, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据5', 22, 1);

完整测试数据

配置数据库连接池

在画布空白位置鼠标右键,选择Configure

新增配置 

在弹出的界面点击+号,添加新的数据库连接池配置,如果已经有了配置该步骤可以跳过

 在弹出的界面筛选对应类型的连接池,我这里选择DBCPConnectionPool,然后点击ADD

点击刚才新添加的那一条数据右侧的小齿轮,进行连接池相关的配置

配置连接池相关属性 

主要配置以下几个内容,其他的根据情况决定是否需要修改,密码输入后是不会显示的

校验属性

校验配置是否正确,点击右上角的对钩,然后在弹出的界面点击VERIFY进行验证

 验证通过会全部显示绿色,如果某一条不通过会有提示,最后点击APPLY

(可选操作)给配置起个名字

为了方便后续使用,给连接池起个名字,要不然以后配置多了会分不清

激活连接池的配置

点击右侧的闪电标志激活配置,在新的页面中点击ENABLE激活,最后点击CLOSE关闭

已激活的配置

同理增加目标数据库的连接池配置,步骤和上面是一样的这里不再重复了,最终配置好后会有两个连接池的配置。如下:

获取数据库表数据

添加处理器:QueryDatabaseTable

点击工具栏的Processor,拖拽到画布中,筛选QueryDatabaseTable处理器,然后点击ADD添加到画布中

配置处理器:QueryDatabaseTable

双击处理器,切换到PROPERTIES选项卡,配置以下内容

Maximum-value Columns(最大值列):官方文档是这么解释的:以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每一列的最大值。使用多个列意味着列列表的顺序,并且每列的值预计比前几列的值增加得更慢。因此,使用多个列意味着列的分层结构,通常用于对表进行分区。此处理器可用于仅检索自上次检索以来添加/更新的那些行。请注意,某些 JDBC 类型(如 bit/boolean)不利于保持最大值,因此这些类型的列不应列在此属性中,并且将导致处理过程中的错误。如果未提供列,则将考虑表中的所有行,这可能会对性能产生影响。注意:为给定表使用一致的最大值列名非常重要,这样增量提取才能正常工作。
支持表达式语言:true

校验属性

给处理器起个名字,表示当前整个工作流的作用

拆分数据

添加处理器:SplitAvro

配置处理器:SplitAvro

双击处理器,切换到PROPERTIES选项卡,所有内容默认即可

数据入库

添加处理器:PutDatabaseRecord

配置处理器:PutDatabaseRecord

双击处理器,切换到PROPERTIES选项卡

新增Record Reader

配置AvroReader

点击右侧的箭头,在弹出的界面选择刚才配置的Reader,然后点击右侧的小齿轮

 在弹出的界面根据自己的需要自行配置,这里按照默认的配置即可

 激活Reader

点击右侧的闪电标志进行激活

 激活后的状态变为Enabled

其他配置

校验属性

连接所有处理器

连接处理器

连接QueryDatabaseTable和SplitAvro两个处理器,勾选For Relationships下的success

连接SplitAvro和PutDatabaseRecord两个处理器,勾选For Relationships下的split

处理SplitAvro处理器的告警

双击SplitAvro处理器,切换到RELATIONSHIPS,勾选下面的两个选项,然后点击APPLY

 处理PutDatabaseRecord处理器的告警

双击PutDatabaseRecord处理器,切换到RELATIONSHIPS,勾选下面的选项,然后点击APPLY

 完整配置

 启动所有处理器

QueryDatabaseTable处理器默认是一分钟执行一次的,可以在SCHEDULING选项卡下面进行配置,这里按照默认的时间来执行

 在画布空白位置鼠标右键选择Start启动所有的处理器

 

查看目标数据库数据

等待一分钟后查看目标数据库数据,发现源数据库的5条数据被同步到了目标数据库

 修改源数据库的数据

UPDATE sys_user SET is_deleted = 1 WHERE id = 1;
UPDATE sys_user SET is_deleted = 1 WHERE id = 4;
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据6', 22, 1);

再次查看目标数据库数据

等待处理器执行后,查看目标数据库数据发现新的数据已经被同步过去

 可以看到最后一个处理器最终由8条记录流入

结束语

以上便是使用NIFI增量同步数据库数据的全过程,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/976783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是数字体验平台

数字体验平台是一种基于数字技术的工具或系统,旨在提供用户与产品、服务或品牌进行交互和体验的场所。它通过结合多种数字化技术,如人工智能、大数据、云计算、物联网等,为用户提供个性化、多维度的体验。 数字体验平台可以为企业或组织搭建一…

Flutter实现ControlExecutor进行多个异步任务执行时监听状态并可指定最后执行的异步并在指定的异步执行完毕后结束executor并回调。

1.场景 当有多个接口请求时,且接口调用不是同时进行时,而且接口调用有可能时链式的,中间也有可能加入别的逻辑,但是需要在第一个接口调用时打开等待框,在最后一个接口调用完成时关闭等待框类似需求时,可以…

Spark 6:Spark SQL DataFrame

SparkSQL 是Spark的一个模块, 用于处理海量结构化数据。 SparkSQL是用于处理大规模结构化数据的计算引擎 SparkSQL在企业中广泛使用,并性能极好 SparkSQL:使用简单、API统一、兼容HIVE、支持标准化JDBC和ODBC连接 SparkSQL 2014年正式发布,当…

java调用js文件的两种方法(支持V8引擎)

前言 对java逆向感兴趣的盆友可以关注我以前的文章,有图片验证码识别、AES、SHA256等各种加密的java实现,不定时更新常用算法和加密,欢迎一起交流讨论! 在日常逆向中,一些前端的加密代码用java复现出来比较难&#xff…

使用工作流快速开发平台,做好企业数据资源管理!

面对越来越繁忙的业务,很多客户朋友不知道用什么样的平台才能快速处理好企业内部数据,做出更有利于企业发展的经营决策。其实,低代码技术平台、工作流快速开发平台是可以让企业减少重复工作,提高工作效率,实现流程化办…

中断处理原理:接口及按键驱动

一、什么是中断 一种硬件上的通知机制,用来通知CPU发生了某种需要立即处理的事件 分为: 内部中断 CPU执行程序的过程中,发生的一些硬件出错、运算出错事件(如分母为0、溢出等等),不可屏蔽外部中断 外设发…

自动计算比例 计算属性 computed @input=“rate“

<el-col :span"12"><el-form-item label"当年累计实收租金:" prop"cumulativeRent"><el-inputv-model"createForm.cumulativeRent"input"rate"clearable:disabled"value 2"><template slot…

如何将MySQL中指定的表结构同步到人大金仓数据库

场景 刚开始做数据库适配的时候,这是一个棘手的问题,因为MySQL的库里,表结构,字段都是最新的,但是金仓的库,全是旧版本的表结构。需要把我们模块的表结构,同步到金仓中。 虽然金仓有数据库同步工具,但是直接把所有表都给同步过来,难免会影响到其他模块。 然后…

HCIP学习-IPv6

目录 前置学习内容 IPv6解决的一些IPv4的缺陷 无限的地址 层次化的地址结构 即插即用 简化报文头部 IPv4和IPv6报头比较 端到端的网络罗完整性 安全性增强 挣钱QoS特性 IPv6地址介绍 格式 首选格式 压缩格式 内嵌IPv4地址格式的IPv6地址格式 IPv6的网络前缀和接…

TRICONEX 8312 数字输入模块

Triconex 8312 数字输入模块是一种用于工业自动化和控制系统的模块&#xff0c;通常用于监测和采集数字信号。以下是Triconex 8312 数字输入模块的一些常见产品特点&#xff1a; 多通道输入&#xff1a;8312 模块通常具有多个数字输入通道&#xff0c;允许同时监测多个数字信号…

【Linux】多线程2——线程互斥与同步/多线程应用

文章目录 1. 线程互斥1.1 问题引入1.2 线程互斥的相关概念1.3 互斥量mutex1.4 互斥量实现原理1.5 死锁 2. 线程安全和可重入函数3. 线程同步3.1 同步概念3.2 条件变量 4. 生产消费模型4.1 基于阻塞队列的cp模型4.2 基于环形队列的cp模型POSIX信号量 5. 线程池5.1 互斥量RAII版本…

总结/笔记-vue中的插槽(默认插槽、具名插槽、作用域插槽)

问题&#xff1a; 遇到了一个插槽&#xff0c;写法为 #default ”{ row }“ 插槽知识点&#xff1a; 定义 插槽&#xff0c;用于 在组件中 引用外部组件或自定义组件的内容。 即 子组件中提供给父组件使用的一个占位符&#xff0c;父组件可以在这个占位符中填充任何模板代…

【TI毫米波雷达笔记】ADCBuf外设初始化配置及驱动(以IWR6843AOP为例)

【TI毫米波雷达笔记】ADCBuf外设初始化配置及驱动&#xff08;以IWR6843AOP为例&#xff09; ADCBuf是为mmwave服务的 在配置之前需要配置好mmwave #include < ti/drivers/ADCBuf.h>对应mmwave studio&#xff1a; ADCBuf模块上电 调用&#xff1a; ADCBuf_init();A…

bash: cmake: command not found...+++++++lsb_release: command not found

一 .bash: cmake: command not found… centos中安装那个cmake。 1、问题 [rootPC3 home]# cmake bash: cmake: command not found... Similar command is: make当前系统&#xff1a; [rootPC3 home]# lsb_release -a LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx…

dex2oat编译模式、触发场景、命令强制执行

dex2oat简单理解就是把delvik虚拟机的可执行文件dex转化成AndroidRuntime虚拟机的可执行文件oat。 Android T版本由PKMS下发命令、native层进程installd负责具体执行dex2oat操作。installd回去调用dex2oat64完成编译工作&#xff0c;可以将dex2oat64理解成一个程序。源码路径&…

本地使用GFPGAN进行图像人脸修复

人脸修复 1.下载项目和权重文件2.部署环境3.下载权重文件4.运行代码5.网页端体验 首先来看一下效果图 1.下载项目和权重文件 https://github.com/iptop/GFPGAN-for-Video.git2.部署环境 根据README文件部署好环境&#xff0c;额外还需要&#xff1a; cd GFPGAN-1.3.8 pyt…

在Qt创建的UI中放一个显示点云的窗口(PCL+QT5)

1、首先在Qt Designer创建UI后&#xff0c;拖一个Widget窗口出来 2、在对象查看器中右击该Widget&#xff0c;选择提升窗口部件&#xff0c;如下操作&#xff1a; 3、把UI转出来放在VS项目中&#xff0c;其中你的UI代码头文件会自带QVTKOpenGLNativeWidget.h&#xff0c;当然你…

企业专题片的优势

企业专题片可以通过生动、感人的方式传达企业形象和信息&#xff0c;引起观众的共鸣和兴趣。它是一种强有力的营销工具&#xff0c;能够提升品牌形象、增加产品或服务的认知度&#xff0c;并在激烈的市场竞争中突显企业的实力和成果。企业专题片具有多个好处和影响&#xff0c;…

【C语言】入门——数组

目录 ​编辑 1.一维数组的创建和初始化 1.1一维数组的创建&#xff1a; 1.2 一维数组的初始化 &#xff1a; 2.二维数组的创建的初始化 2.1二维数组的创建&#xff1a; 2.2二维数组的初始化&#xff1a; 3.数组越界 4.数组的使用 4.1一维数组的使用&#xff1a; 4.2二维…

1、PostgreSQL数据库的历史和特性简介

PostgreSQL是一个开源的关系型数据库管理系统&#xff08;DBMS&#xff09;&#xff0c;它具有强大的功能和广泛的可扩展性&#xff0c;被广泛用于各种规模的应用程序和项目中。 一、PostgreSQL 的发展历史 PostgreSQL数据库的来历可以追溯到20世纪80年代末和90年代初。最早由…