Nifi中的Controller Service

news2024/11/15 15:46:08

Service简介

首先Nifi中的Controller Service 和我们MVC概念中的Controller Service不是一个概念,Nifi中的Controller Service更像是和Processor同级的一个概念,它和Processor在我个人的使用经验来理解的话就是它是预制好的各种服务,可以被Processor引用或者支撑Processor,例如一个SQL读取的Processor,它得需要JDBC的连接,才能访问数据库。这里Controller Service 就可以是一个JDBC的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发Processor一样,根据自己的业务需求,进行自定义的Controller Service 开发。

当我们使用某些依赖Service的组件(Processor)时,在配置中会出现选择Service或者创建新的Service的情况,这里的Service即是Nifi的Controller Service,一旦创建新的,则会生成一个以Group为范围的 “全局” Service对象,这时,再有依赖同类型Service的Processor时,可以直接选中:

 

 Controller Service的配置

单独查看Controller Service 可以从面板空白处,右键Configure来看,如下图:

 这是一个JDBC的连接池Service,它包含的属性有名称、类型、简介、启用状态、操作;从操作中可以看到配置该Service需要填写基本的各类属性;其中,Service是有启停状态的,如果想修改Service的属性内容,必须先保证该Service是停用状态,然后点击配置标识,则进入配置页面,它的配置和Processor的差不多,通过页签区别,共有三个页签:SETTING(基础属性)、PROPERTIES(使用属性)、COMMENT(页签):

SETTING 基础属性

 基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此Service的Processor 列表

PROPERTIES 使用属性

核心的业务配置,此标签页的配置项根据不同的Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是JDBC的连接池,所以基本需要连接数据库所需的URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar包路径 ,这里不少Service可能都需要第三方的jar包依赖才可以使用,长期使用或生产环境下,建议将所有jar资源集中放在统一路径下。

COMMENT 页签 

一个提供Service使用说明的页签,可根据自己实际需求,补充使用Service的用法以及描述

Service 的使用范围

 在Nifi的基本使用中的Group的使用介绍,Group同时也对Services起作用,如果我们在一个Nifi的最外层的平面上 新增Controller Service,那么这些Service的作用域是整个Nifi的任何位置,如果我们在某个Group内创建Controller Service, 那么这个Controller Service 仅在Group范围内可以被引用,Nifi的这种机制也是方便Service的使用和维护

 

全局参数配置

类似于 数据库连接池、Kafka、Redis等各种组件的连接池、客户端Client的Service在实际的使用中会非常多,由此配置的Service也会非常多,于是就会产生很多次的反复配置URL、账号这一系列重复的内容,由于Nifi的特性,这些Service又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个group来回跳转、调整不同的Service的Configure;为应对此类问题,Nifi 提供了全局配置的机制来弥补。

 使用变量前:

这里的 URL、Driver Class Name、Database User在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变得,可能只需要配置一遍就好,不需要每次创建Service都写一遍;所以可以这里可以使用上下文变量(Parameter Context)

首先,打开Parameter Context,创新一组新的变量:

 

 

 

 

之后进入Service 的管控面板(空白处右键选择Configure),先选择变量组:

 

再进入 CONTROLLER SERVICES 对Service的配置进行修改,将具体的RUL、Driver-name、user等参数,全部使用变量替换(变量使用‘#’符 )

 

DBCPConnectionPool的使用样例

下面将使用Nifi 实现一个简单的Demo,从Mysql数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用ExecuteSQL组件,读取Mysql中的数据,根据上文描述,创建一个DBCPConnectionPool 的Service,然后启动 :

 添加 ExecuteSQL组件,配置相关内容,根据自定义编写的SQL读取数据库内容:

随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用groovy处理数据,所以选择转换为JSON进行处理,实际使用可以根据自身情况选择转换器:

添加 ExecuteGroovyScript 组件,使用groovy脚本对数据进行处理,groovy的脚本内容如下: 

 

groovy内容:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;



def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
   //在这里可以对数据进行处理
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}





/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}
/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

 最后使用LogMessage组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等

 

在ExcuseGroovyScript组件中使用Service

 在 ExcuseGroovyScript 组件内部使用groovy脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript组件支持可以引入Service,提供用户编写的groovy脚本内部使用Service;

首先需要在ExcuteGroovyScript组件的PROPERTIES  配置中新增属性:

 

这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 ‘SQL.’ 或者 'CTL.'作为名称前缀时,则能够使用Service,后续的属性值则会变成Service的选择。

在groovy的代码中,则可以通过 SQL.mysql.{method}的方式,调用Service的方法,在ExcuseScript组件中配合脚本语言进行数据的操作:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;



def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    def mapdic = [:]
    //使用Service查询数据库
    SQL.mysql.eachRow("SELECT id,value FROM tb_dic_detail WHERE u_status = 1 "){
       row->
           mapdic.put(row.id.toString(),row.value.toString());    }
    
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}



/*****************************************************************公共函数*********************************************************************/

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}
/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java入门1.5.0

前言&#xff1a; 在java入门1.4.0中&#xff0c;我们快速构建了一个基于Maven管理的Spring boot3项目&#xff0c;对基本文件结构有了初步的认知&#xff0c;创建了git仓库 正片: 看山是山&#xff0c;看山不是山&#xff0c;看山还是山&#xff0c;下面两段代码很好了验证这…

51单片机嵌入式开发:9、 STC89C52RC 操作LCD1602技巧

STC89C52RC 操作LCD1602技巧 1 代码工程2 LCD1602使用2.1 LCD1602字库2.2 巧妙使用sprintf2.3 光标显示2.4 写固定长度的字符2.5 所以引入固定长度写入方式&#xff1a; 3 LCD1602操作总结 1 代码工程 承接上文&#xff0c;在原有工程基础上&#xff0c;新建关于lcd1602的c和h…

逐步实践复现 SELF-RAG

SELF-RAG 简介 SELF-RAG&#xff08;Self-Reflective Retrieval-Augmented Generation&#xff09;是一种检索增强生成&#xff08;RAG&#xff09;的框架&#xff0c;它通过自我反思学习检索、生成和批判&#xff0c;以提高大型语言模型&#xff08;LLM&#xff09;的质量和真…

谷粒商城实战笔记-28-前端基础-技术栈简介

文章目录 一&#xff0c;学习目标1&#xff0c;VSCode的使用2&#xff0c;开发语言ES6的学习目标3&#xff0c;Node.js的学习目标4&#xff0c;Vue的学习目标5&#xff0c;Babel的学习目标6&#xff0c;webpack的学习目标 二&#xff0c;前后端技术栈的比较 本节的主要内容是介…

KEIL下载芯片包记录

第一步 第二步 第三步

oracle 23ai新的后台进程bgnn介绍

前言 昨天发文研究了哪些oracle 后台不能杀 具体文章如下链接 oracle哪些后台进程不能杀&#xff1f;-CSDN博客 其中23ai中新增了一个后台进程bgnn 但是在oracle 23ai database reference中并没有找到该后台进程 有点不甘心就开了个SR&#xff0c;找oracle 官方来看看这个后…

Go语言---定时器

定时器 Timer-只响应一次 Timer 是一个定时器&#xff0c;代表未来的一个单一事件&#xff0c;可以告诉 timer 要等待多长时间&#xff0c;它提供一个 channel&#xff0c;在将来的那个时间那个 channel 提供了一个时间值。 2s后&#xff0c;往timer.C写数据&#xff0c;有…

智慧教育解决方案PPT(44页)

1. 教育信息化1.0与2.0 教育信息化1.0注重全体教师和学生的教学与学习应用&#xff0c;以及数字校园建设。2.0则强调宽带网络、优质资源和网络学习空间的普及&#xff0c;提高信息化应用水平和师生信息素养&#xff0c;建立教育资源和管理公共服务平台&#xff0c;推动“互联网…

【系统架构设计师】九、软件工程(项目管理|进度管理|软件配置管理|软件质量管理|软件风险管理 )

目录 十四、项目管理 14.1 软件进度管理 14.1.1 工作分解结构 14.1.2 Gantt 图 和 PERT 图 14.1.3 关键路径法 14.1.4 浮动时间 14.2 软件配置管理 14.3 软件质量管理 14.4 软件风险管理 相关推荐 历年真题练习 十四、项目管理 软件项目管理的对象是软件工程项目。…

3.Softmax回归

回归和分类 回归估计一个连续值 分类预测一个离散类别 Softmax回归实际是一个分类问题 从回归到多类分类 对类别进行一位有效编码 y [ y 1 , y 2 , ⋯ , y n ] T y[y_1,y_2,\cdots,y_n]^T y[y1​,y2​,⋯,yn​]T,如果是第i类&#xff0c;则值为1&#xff0c;否则为0 使用…

摸鱼大数据——Kafka——Kafka的shell命令使用

Kafka本质上就是一个消息队列的中间件的产品&#xff0c;主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据&#xff0c;以及如何使用Kafka来消费数据 topics操作 注意: 创建topic不指定分区数和副本数,默认都是1个 分区数可以后期通过alter增大,但是…

k8s集群离线部署

K8s离线部署 环境 目标 k8s离线部署 步骤 部署docker 详情见文章&#xff1a;《离线安装docker及后端项目离线打包》 https://blog.csdn.net/qq_45371023/article/details/140279746?spm1001.2014.3001.5501 所用到的所有文件在&#xff1a; 链接&#xff1a;https://pan…

摸鱼大数据——Kafka——Kafka的集群搭建

1、软件安装 搭建Kafka集群 1、下载安装 安装包下载地址&#xff1a;https://kafka.apache.org/download 2、将Kafka的安装包上传到虚拟机&#xff0c;并解压 cd /export/software/ tar -xzvf kafka_2.12-2.4.1.tgz -C ../server/ 配置软连接: cd /export/server ln -s kaf…

Debezium日常分享系列之:Debezium 3.0.0.Alpha1 Released

Debezium日常分享系列之&#xff1a;Debezium 3.0.0.Alpha1 Released 一、重大改变Java 和 Maven 要求已更改 二、新的特征和提高MongoDB 三、更多内容 Debezium 3 的第一个预发布版本 3.0.0.Alpha1。这个版本虽然比正常的预版本要小&#xff0c;但高度关注几个关键点&#xff…

【漏洞复现】Splunk Enterprise for Windows 任意文件读取漏洞 CVE-2024-36991

声明&#xff1a;本文档或演示材料仅用于教育和教学目的。如果任何个人或组织利用本文档中的信息进行非法活动&#xff0c;将与本文档的作者或发布者无关。 一、漏洞描述 Splunk Enterprise 是一款强大的机器数据管理和分析平台&#xff0c;广泛应用于企业中&#xff0c;用于实…

【单片机毕业设计选题24058】-基于嵌入式的智慧酒店管理系统设计与实现

系统功能: 系统分为主机端和从机端&#xff0c;主机端主动向从机端发送信息和命令&#xff0c;从机端 收到主机端的信息后回复温湿度和光照强度信息。 从机端操作&#xff1a; 从机端上电后显示“欢迎使用智慧酒店系统请稍后”两秒后进入正常显示界面。 第一行显示系统状态…

文心快码——百度研发编码助手

介绍 刚从中国互联网大会中回来&#xff0c;感受颇深吧。百度的展商亮相了文心快码&#xff0c;展商人员细致的讲解让我们一行了解到该模型的一些优点。首先&#xff0c;先来简单介绍一下文心快码吧。 文心快码&#xff08;ERNIE Code&#xff09;是百度公司推出的一个预训练…

Go语言---并发编程之channel(双channel,单channel)以及应用实例(生产者消费者、打印机模型)

Channel goroutine 运行在相同的地址空间&#xff0c;因此访问共享内存必须做好同步。goroutine 通过通信来共享内存&#xff0c;而不是其享内存来通信。 引用类型 channel 是CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步&#xff0c;确保并发安全。 chan…

【Linux】磁盘性能压测-FIO工具

一、FIO工具介绍 fio&#xff08;Flexible I/O Tester&#xff09;是一个用于评估计算机系统中 I/O 性能的强大工具。 官网&#xff1a;fio - fio - Flexible IO Tester 注意事项&#xff01; 1、不要指定文件系统名称&#xff08;如/dev/mapper/centos-root)&#xff0c;避…

vue + echart 饼形图

图表配置&#xff1a; import { EChartsOption, graphic } from echarts import rightCircle from /assets/imgs/index/right_circle.png export const pieOption: EChartsOption {title: {text: 100%,subtext: 游客加量,left: 19%,top: 42%,textStyle: {fontSize: 24,color:…