NIFI实现JSON转SQL并插入到数据库表中

news2025/1/8 5:45:19

说明

本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI

需求背景

现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。

json数据

[
    {
        "name": "张三",
        "age": 23,
        "gender": 1
    },{
        "name": "李四",
        "age": 24,
        "gender": 1
    },{
        "name": "小红",
        "age": 18,
        "gender": 0
    }
]

建表语句

CREATE TABLE `sys_user` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',
  `age`  int NOT NULL DEFAULT 0 COMMENT '年龄',
  `gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦

创建文件流

添加处理器:GetFile

点击工具栏的Processor,拖拽到画布中

筛选GetFile,点击ADD添加到画布中

配置GetFile处理器

双击添加的处理器,弹出对应的配置界面

可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容

点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行

 点击PROPERTIES选项

 配置PROPERTIES,分别填写Input DirectoryFile FilterKeep Source File,其他选项默认即可。

说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可

将文件放到对应的目录下

说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错

修改权限:

chmod +777 /root/data/nifi/nifi-current/mydata/file

(可选操作)测试处理器配置是否成功

 添加LogAttribute处理器

连接处理器

将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。

在弹出的界面中勾选success,然后点击ADD

 第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。

在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY

说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作

terminate代表成功后终止,retry代表成功后继续尝试

可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。

运行处理器

运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部

第一种

鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次

点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中

 当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小

将鼠标放到两个处理器的连接处,鼠标右键,选择List queue

 在弹出的界面中可以看到等待中的队列列表

选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框

 点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据

 点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex

 运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可

然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容

说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容

正常打印数据说明GetFile处理器配置的没问题

Json数组分隔

添加处理器:SplitJson

 配置SplitJson处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $

连接处理器

将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD

可选操作)测试处理器配置是否成功

将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split

 此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选

此时所有的处理器已正常显示

开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据

停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue

Json转为SQL

添加处理器:ConvertJSONToSQL

 配置ConvertJSONToSQL处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

配置JDBC Connection Pool

Value下面点击,选择Create new service

根据自己的情况选择对应的services,我这里选择的是默认的 

点击最后面的右箭头

点击右侧的小齿轮

切换到SETTINGS选项卡,给驱动起个名字,方便以后识别

切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可

校验参数配置是否正确,点击右上角的对号

校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY

开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE

 最后可以看到state已经变为Enabled,点击右上角的X关闭

到此JDBC的配置结束

配置Statement Type

再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句

配置Table Name

校验配置是否正确

最后点击APPLY

连接处理器

将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split

可选操作)测试处理器配置是否成功

这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句

执行生成的SQL

添加处理器:PutSQL

配置PutSQL处理器

双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可

 

 校验配置是否正确

最后点击APPLY

连接处理器

将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql

 处理PutSQL处理器的告警

双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容

完整的配置结果

包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL

 开启所有的处理器

数据库是否有数据

可以看到现在的数据库里面还是没有数据的

 开启处理器

在画布的空白位置,鼠标右键选择Start

开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入

 查看数据库数据

此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)

结束语

NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/988516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

传输层协议 --TCP报文格式详细介绍

一、 TCP协议格式 TCP如何将报头与有效载荷进行分离? 当TCP从底层获取到一个报文后,虽然TCP不知道报头的具体长度,但报文的前20个字节是TCP的基本报头,并且这20字节当中涵盖了4位的首部长度。 因此TCP是这样分离报头与有效载荷的…

Java——》ThreadLocal

推荐链接: 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…

ChatGPT:深度学习和机器学习的知识桥梁

目录 ChatGPT简介 ChatGPT的特点 ChatGPT的应用领域 ChatGPT的工作原理 与ChatGPT的交互 ChatGPT的优势 ChatGPT在机器学习中的应用 ChatGPT在深度学习中的应用 总结 近年来,随着深度学习技术的不断发展,自然语言处理技术也取得了显著的进步。其…

软件设计模式(二):工厂、门面、调停者和装饰器模式

前言 在这篇文章中,荔枝将会梳理软件设计模式中的四种:工厂模式、Facade模式、Mediator模式和装饰器Decorator模式。其中比较重要的就是工厂模式和装饰器模式,工厂模式在开发中使用的频数比较高。希望荔枝的这篇文章能讲清楚哈哈哈哈&#xf…

OpenCV(三十一):形态学操作

​​​​​​1.形态学操作 OpenCV 提供了丰富的函数来进行形态学操作,包括腐蚀、膨胀、开运算、闭运算等。下面介绍一些常用的 OpenCV 形态学操作函数: 腐蚀操作(Erosion): erode(src, dst, kernel, anchor, iteration…

【LeetCode】剑指 Offer <二刷>(6)

目录 题目:剑指 Offer 12. 矩阵中的路径 - 力扣(LeetCode) 题目的接口: 解题思路: 代码: 过啦!!! 题目:剑指 Offer 13. 机器人的运动范围 - 力扣&#…

docker-compose安装mysql

基于docker-compose快速安装mysql 目录 一、目录结构 1、 docker-compose.yml 2、 my.cnf 3、error.log 二、执行安装 三、连接使用 一、目录结构 1、 docker-compose.yml version: 3 services:mysql:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7 #…

Kubernetes核心概念

Kubernetes是一个工业级的容器编排平台。 Kubernetes核心功能 服务发现和负载均衡。容器的自动装箱,调度(scheduling),按照容器的规格(需要的cpu和内存等)确定一个容器存放到集群中的哪一个机器上。进行自…

创建vue3项目并引用elementui

1.创建vu3项目&#xff1a; 执行命令 npm create vuelatest 2.终端会出现如下选项&#xff0c;不确定的直接enter键进入下一步&#xff1b; 3.然后再执行下方命令&#xff1a; cd <your-project-name> npm install4.安装依赖成功后引入elementui,执行命令&#xff1a…

高压电容器的内部结构是什么样的?

高压电容器的内部结构取决于其具体的设计和用途&#xff0c;但通常包括以下主要组件&#xff1a; 电介质&#xff1a;电介质是高压电容器内部的核心部分。它通常由绝缘材料制成&#xff0c;如聚丙烯薄膜、聚酯薄膜、陶瓷或其他高绝缘性材料。电介质的选择取决于电容器的电压等级…

盘点那些有高级客服分配功能的软件系统

过去&#xff0c;很多企业虽然有服务意识&#xff0c;但并不强烈&#xff0c;现在客户需求以及团队运营的发展推动着企业在客户管理的方式上采用更有效的服务方式来应对实际的变化&#xff0c;尤其是客服行业&#xff0c;所以很多企业在客服的管理和分配上下尽了功夫&#xff0…

Unity的UI管理器

1、代码 public class UIManager {private static UIManager instance new UIManager();public static UIManager Instance > instance;//存储显示着的面板脚本&#xff08;不是面板Gameobject&#xff09;&#xff0c;每显示一个面板就存入字典//隐藏的时候获取字典中对…

【自动化测试】之PO模式介绍及案例

目录 概念 PO三层模式&#xff1a; 1. 构建基础的 BasePage 对象层 2. 构建首页的 Page 层&#xff08;操作层&#xff09; 3.构建业务层 常用断言方法&#xff1a; 4. 构建用例集&#xff0c;执行文件&#xff0c;输出自动化测试报告 测试报告模板 概念 PO&#xff08…

【C++】详解红黑树并模拟实现

前言&#xff1a; 上篇文章我们一起学习了AVL树比模拟实现&#xff0c;我们发现AVL树成功地把时间复杂度降低到了O(logN)。但是同时我们不难发现一个问题&#xff0c;在构建AVL树中我们也付出了不小的代价&#xff0c;频繁的旋转操作导致效率变低。为了解决这个问题&#xff0c…

使用Fastchat部署vicuna大模型

FastChat是一个用于训练、提供服务和评估基于大型语言模型的聊天机器人的开放平台。其核心特点包括&#xff1a; 最先进模型&#xff08;例如 Vicuna&#xff09;的权重、训练代码和评估代码。一个分布式的多模型提供服务系统&#xff0c;配备 Web 用户界面和与 OpenAI 兼容的…

算法通关村第十七关:黄金挑战-跳跃游戏问题

黄金挑战-跳跃游戏问题 1. 跳跃游戏 LeetCode 55 https://leetcode.cn/problems/jump-game/ 思路分析 关键是判断能否到达终点&#xff0c;不用管每一步跳跃到哪里&#xff0c;而是尽可能的跳跃到最远的位置 看最多能覆盖到哪里&#xff0c;只要不断更新能覆盖的距离&#x…

【狂神】Spring5笔记(一)之IOC

目录 首页&#xff1a; 1.Spring 1.1 简介 1.2 优点 2.IOC理论推导 3.IOC本质 4.HelloSpring ERROR 5.IOC创建对象方式 5.1、无参构造 这个是默认的 5.2、有参构造 6.Spring配置说明 6.1、别名 6.2、Bean的配置 6.3、import 7.DL依赖注入环境 7.1 构造器注入 …

[JAVA] byte与int的类型转换案例剖析

总结&#xff1a; ①没有byte的字面值&#xff0c;赋值时需要强制转换类型 ②涉及运算&#xff0c;系统自动进行类型升级&#xff0c;由此用final修饰&#xff0c;代表这是一个不会更改值的常量&#xff0c;通过编译 感受&#xff1a;还是用int吧&#xff0c;自动类型转换太复…

VB:顺序查找

VB&#xff1a;顺序查找 Private Sub Command1_Click()Dim i%, m%Dim x(1 To 10) As SingleFor i 1 To 10x(i) Val(InputBox("请输入"))Next im seqSearch(x, 10)If (m 1) ThenPrint "已找到"ElsePrint "未找到"End If End Sub Function se…

为什么在线客服系统的消息撤回功能是有必要的?

如今在日常工作和沟通中&#xff0c;很多企业都在使用在线客服系统跟客户进行线上交流和协作。然而有时候客服可能会不小心发送错误的消息或包含敏感信息的消息&#xff0c;人们在现实的沟通交流中是不会真实存在“说出去的话还能收回来”的情况&#xff0c;但这是在网络上&…