Nifi脚本组件ExecuteScript 的使用(一)

news2024/11/19 4:30:45

ExecuteScript 组件的基本使用

前面已经介绍过Nifi中基本的数据流程,这里介绍一下最为常用的一个组件,ExecuteScript processor,顾名思义ExecuteScript组件是一组以自定义脚本为主体的组件,意思就是,可以在该组件内部进行脚本语言编写,以此来完成数据的处理,相较于其他固定的Process组件的预设操作和要求,脚本更为便捷和自由。

通过组件商城搜索ExecuteScript可以搜索到相关的脚本组件,这里Nifi内置了多种脚本语言的支持,包括python、js、groovy等,打开组件后,SETTING和SCHEDULING配置就不用多说了,通用组件的配置,直接打开特有的PROPERTIES配置页签,可以看到如上图中的几个配置项。

  • Script File : 这里可以通过文件路径,直接加载脚本文件读取,和Script Body 配置二选一即可
  • Script Body:实时脚本代码,可以直接在此处编写相应的脚本语言
  • Failure Strategy:这里是当脚本中出现异常或错误时,数据的流出,一般选择transfer to failure即可
  • Additional classpath: 如果编写脚本过程中脚本需要依赖别的类库,则这里需要指定jar包(以groovy为例)
  • 最后一个SQL.masql为个人自定义的参数,可以在脚本中使用,后续说到Controller Service时会介绍,这里不过多解释

ExecuteScript 组件的脚本编写

看过了具体的组件,实际使用中是直接在ScriptBody中进行脚本语言编写,ExecuteScript进行脚本编程之前,需要先熟悉一些概念。

session: 是对processor的ProcessSession属性的引用。session允许在 flow files 执行下面的操作: create(), putAttribute(), transfer(), 像 read() 和 write()一样。

context: 是对 ProcessContext 的引用。可以用于检索 processor 的属性, 关系, Controller Services, 和 StateManager。

log: 是对ComponentLog的引用。用于 log 消息到 NiFi系统, 如 log.info('Hello world!')。

REL_SUCCESS: 这是对 "success" relationship 的引用。这是从父类 (ExecuteScript)的静态变量基础来的, 但是一些引擎(如 Lua)不允许引用静态成员, 只是一个为了方便的变量。

REL_FAILURE: 这是对 "failure" relationship 的引用。与 REL_SUCCESS 一样, 这是从父类 (ExecuteScript)的静态变量基础来的, 但是一些引擎(如 Lua)不允许引用静态成员, 只是一个为了方便的变量。

Dynamic Properties: 任何在ExecuteScript定义的动态属性都作为变量集合到 PropertyValue 对象,对应于dynamic property。允许获得property的 String 值 , 通过NiFi表达式进行求值,获得相应的类型 (如 Boolean, 等等)。 因为动态属性名称成为脚本里的变量名, 你需要了解所选的脚本引擎的变量命名属性。 例如, Groovy 不允许变量名中提供 (.) , 所以,如果 "my.property"作为动态属性将会报错。

与这些变量名的交互通过 NiFi Java API进行, 下面的每一个案例将讨论相应的API调用。 下面的案例执行不同的函数操作 flow files, 如 reading/writing 属性, 转换为 relationship, logging, 等等。需要注意,这里的例子只是一些片段。举例来说, 如果使用session.get()从队列中获取 flow file , 必须转换为 relationship 或者移除, 否则将会引发错误。代码片段应该是平面化的而且保持清晰,没有容易引起混乱的代码,仅用于演示概念,从而让工作简单。

读取管道中数据(flow file)

进行脚本编写处理数据第一步就是需要从前置管道中获取数据到ExecuteScript组件中。

从session中获得flow file

数据进入管道后,如何对接组件为ExecuteScript,则数据是以Flow File的形式存在,获取数据就是获取Flow File。

需求:从队列中获得输入flow file,执行 ExecuteScript 并进行处理。

方法:使用 session对象的get(). 该方法返回FlowFile,是下一个最高优先级的FlowFile用于处理. 如果没有 FlowFile 用于处理, 该方法将返回 null. 注意, 如果一个持续的FlowFiles流进入processor,也可能会返回null. 这在多个并发任务处理时会发生,此时其他任务已获得了 FlowFiles. 如果脚本要求有一个 FlowFile才能继续处理, 如果是session.get()得到null应该立即返回。

例子

Groovy:

flowFile = session.get()
if(!flowFile) return

Jython:

flowFile = session.get()
if (flowFile != None):

Javascript:

var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}

JRuby:

flowFile = session.get()
if flowFile != nil
# All processing code goes here
end

得到多个 flow files

需求:从queue(s)获得多个flow files用于ExecuteScript处理

方法:使用session对象的get(maxResults) 方法. 该方法从工作队列中返回最多 maxResults 个FlowFiles . 如果没有 FlowFiles 可用, 一个空的list 将被返回 (而不是返回 null).。

例子

Groovy:

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    flowFileList.each { flowFile ->
    // Process each FlowFile here
    }
}

Jython:

flowFileList = session.get(100)
if not flowFileList.isEmpty():
    for flowFile in flowFileList:
        # Process each FlowFile here

Javascript:

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    for each (var flowFile in flowFileList) {
        // Process each FlowFile here
    }
}

JRuby:

flowFileList = session.get(100)
if !(flowFileList.isEmpty())
    flowFileList.each { |flowFile|
        # Process each FlowFile here}
end

创建一个新的 flow files

创建新Flow Files

需求:创建一个新的 FlowFile 发送到下一步的 processor

方法:使用session的 create() 方法. 该方法返回 FlowFile 对象, 以用于后续的处理操作。

例子

Groovy:

flowFile = session.create()
// Additional processing here

Jython:

flowFile = session.create()
# Additional processing here

Javascript:

var flowFile = session.create();
// Additional processing here

JRuby:

flowFile = session.create()
# Additional processing here

从父级FlowFile创建新的 FlowFile

需求:从已有的 FlowFile 创建新的flow file 发送到下一步的 processor

方法:使用session的 create(parentFlowFile) 方法,该方法获得父级 FlowFile 的引用,然后返回新的派生 FlowFile 对象。新创建的 FlowFile 除UUID之外将继承父级的所有属性,同时该方法将自动创建一个 起源 FORK 事件或 起源 JOIN 事件,在 ProcessSession被提交的时候,取决于FlowFiles 是否从同一个parent创建。

例子

Groovy:

flowFile = session.get()
if(!flowFile) return
    newFlowFile = session.create(flowFile)
    // Additional processing here

Jython:

flowFile = session.get()
if (flowFile != None):
    newFlowFile = session.create(flowFile)
    # Additional processing here

Javascript:

var flowFile = session.get();
if (flowFile != null) {
    var newFlowFile = session.create(flowFile);
    // Additional processing here
}

JRuby:

flowFile = session.get()
if flowFile != nil
    newFlowFile = session.create(flowFile)
    # Additional processing here
end

flow file 的attributes操作

从 flow file 得到属性

需求:获得flow file 的属性。

方法:使用FlowFile对象getAttribute(attributeKey) 。 该方法对于给定的attributeKey返回一个字符串值 , 如果没有找到相应的key就返回null. 下面的例子演示返回FlowFile的 "filename" 属性。

例子

Groovy:

flowFile = session.get()

if(!flowFile) return
myAttr = flowFile.getAttribute('filename')

Jython:

flowFile = session.get()

if (flowFile != None):
    myAttr = flowFile.getAttribute('filename')
    # implicit return at the end

Javascript:

var flowFile = session.get()

if (flowFile != null) {
    var myAttr = flowFile.getAttribute('filename')
}

JRuby:

flowFile = session.get()

if flowFile != nil
myAttr = flowFile.getAttribute('filename')
end

添加属性到 flow file

需求:在已有的 flow file 上添加自己的属性。

方法:使用session对象的 putAttribute(flowFileattributeKeyattributeValue) 方法。 该方法更新给定的 FlowFile's 属性,使用给出的 key/value 对来进行。

注意:对象的 "uuid" 属性是固定的,并且不能修改; 如果key被命名为 "uuid", 将被忽略.

这里的FlowFile 对象是不可改变的; 这意味着,如果通过API更新了 FlowFile 的属性 (或其它的改变了) , 你将得到一个新版的FlowFile的新的引用。当转换FlowFiles到relationships时这是非常重要的。你必须保持对FlowFile的最新版本的引用, 你必须转换或者移除所有的FlowFiles的最后版本, 否则执行时将会得到错误信息。经常情况下, 该用于存储 FlowFile 引用变量将会被最后返回的版本覆盖 (中间的 FlowFile 应用将会被自动抛弃). 在这个例子中,你可以看到当添加属性时重用flowFile引用的技术。注意到当前的flowFile引用被传递给putAttribute() 方法. 这个结果FlowFile具有命名为 'myAttr'值为 'myValue'的属性。如果你有一个对象,可以序列化为String. 最终, 请注意如果你添加了多个属性, 最好创建一个Map,然后使用 putAllAttributes() 方法来进行赋值。

例子

Groovy:

flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Jython:

flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
    # implicit return at the end

Javascript:

var flowFile = session.get();
if (flowFile != null) {
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}

JRuby:

flowFile = session.get()
if flowFile != nil
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end

添加多个属性到一个flow file

需求:向 flow file 添加多个自定义属性。。

方法:使用 session对象的putAllAttributes(flowFileattributeMap) 方法。该方法更新给定的FlowFile's 属性,以 key/value 对的方式存储在Map中返回。

注意:对象的 "uuid" 属性是固定的,并且不能修改; 如果key被命名为 "uuid", 将被忽略.

该技术创建了一个 Map (aka dictionary in Jython, hash in JRuby) 用于更新,然后调用putAllAttributes() 。这比对putAttribute() 对每一个 key/value 遍历效率更高, 这将导致对每一个属性调用时 FlowFile 都需要创建一个副本 (查看上面 FlowFile 不变性的讨论)。下面例子中的Map包含两个条目: myAttr1 和 myAttr2, 设为 '1' 并且第二个为 String (附着到方法签名,对key和value都要求 String)。 注意到session.transfer() 在这里并未指定 (因此下面的代码片段并不能工作), 查看下面的方法。

例子

Groovy:

attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)

Jython:

attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAllAttributes(flowFile, attrMap)
    # implicit return at the end

Javascript:

var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()

if (flowFile != null) {
    flowFile = session.putAllAttributes(flowFile, attrMap)
}

JRuby:

attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get()
if flowFile != nil
flowFile = session.putAllAttributes(flowFile, attrMap)
end

转换 flow file

转移一个flow file 到 relationship

需求:在处理完flow file (new or incoming)之后, 你希望将flow file转移到 relationship ("success" or "failure"). 在这个简单的例子中,让我们假定有一个变量叫做 "errorOccurred", 用于指示在哪种 relationship下 FlowFile 将被转移。

方法:使用session对象的transfer(flowFilerelationship) 方法。基于给定的relationship,该方法将给定的FlowFile发送到适合的目标处理器队列。如果relationship通向不止一个目标,FlowFile的状态将被复制 ,从而每一个目标都将收到一个 FlowFile的拷贝,因此也将具有唯一的标识符UUID。

注意:最后,ExecuteScript将执行session.commit() 以进行操作的提交。你不需要在脚本内部执行session.commit() 来执行提交操作。

例子

Groovy:

flowFile = session.get()

if(!flowFile) return

// Processing occurs here
if(errorOccurred) {
    session.transfer(flowFile, REL_FAILURE)
}
else {
    session.transfer(flowFile, REL_SUCCESS)
}

Jython:

flowFile = session.get()

if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

Javascript:

var flowFile = session.get();

if (flowFile != null) {
    // All processing code goes here
    if(errorOccurred) {
        session.transfer(flowFile, REL_FAILURE)
    }
    else {
        session.transfer(flowFile, REL_SUCCESS)
    }
}

JRuby:

flowFile = session.get()
if flowFile != nil
    # All processing code goes here
    if errorOccurred
        session.transfer(flowFile, REL_FAILURE)
    else
        session.transfer(flowFile, REL_SUCCESS)
    end
end

日志 Logging

发送消息到 log并制定日志级别

需求:希望报告一些事件、消息并通过日志框架写入。

方法: 使用 log 的方法(), trace(), debug(), info(), 或 error() 完成。这些方法可以是单个的字符串或者字符串数组对象, 或字符串后面跟着Throwable的对象数组。第一个用于简单消息. 第二个用于一些动态对象(值)的log。在消息字符串中使用 "{}" 进行引用。这些用于对对象数组进行求值,当消息读到 "Found these things: {} {} {}" 并且 Object array 是 ['Hello',1,true], 那么logged 消息将是 "Found these things: Hello 1 true",第三种logging方法带一个 Throwable 参数, 这在例外被捕捉到并且希望日志记录时使用。

例子

Groovy:

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

Jython:

from java.lang import Object
from jarray import array

objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)

Javascript:

var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);

objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)

JRuby:

log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1900011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用特殊的参数和符号来改变图像的风格、质量、比例

🪂🌹 /imagine prompt: 一朵白色的花,透明的花瓣,深如海水,晶莹剔透+露珠,8K,HD,常春藤,花卉,热带植物 --v 5 --ar 2:1 --c 80 --s 500 -v 5切换模型版本5--ar 2:1宽高比例为2:1--c 80混沌值为80--s 500样式值为500🧑🏼‍🎨Midjourney 动漫和插画风格 🌹…

Java视频点播网站

作者介绍:计算机专业研究生,现企业打工人,从事Java全栈开发 主要内容:技术学习笔记、Java实战项目、项目问题解决记录、AI、简历模板、简历指导、技术交流、论文交流(SCI论文两篇) 上点关注下点赞 生活越过…

Redis的八种数据类型介绍

Redis 是一个高性能的键值存储,它支持多种丰富的数据类型。每种数据类型都有其特定的用途和底层实现。下面我将介绍 Redis 支持的主要数据类型及其背后的数据结构。 本人这里还有几篇详细的Redis用法文章,可以用来进阶康康! 1. 字符串 (Stri…

Python数据分析-分子数据分析和预测

一、设计背景 分子结构设计与性质计算对研发新型高能量密度材料具有重要意义。机器学习作为一种大数据计算模型,可以避免复杂、危险的实验,大幅提高研发效率、降低设计和计算成本。本文基于机器学习的方法以及通过构建神经网络,实现对高能量…

网络基础:IS-IS协议

IS-IS(Intermediate System to Intermediate System)是一种链路状态路由协议,最初由 ISO(International Organization for Standardization)为 CLNS(Connectionless Network Service)网络设计。…

TP8/6 子域名绑定应用

原www.xxx.com/admin改为admincms.xxx.com config/app.php 官方文档:ThinkPHP官方手册

fastadmin 如何给页面添加水印

偶然发现fastadmin框架有个水印插件&#xff0c;看起来漂亮&#xff0c;就想也实现这样的功能&#xff0c;看到需要费用。但是现成的插件需要费用&#xff0c;自己动手丰衣足食。说干就干。 1. 找到watermark.js &#xff0c;放到assets/js/ 下面 2.具体页面引入 <script…

基于单片机的粉尘检测报警防护系统研究

摘要 &#xff1a; 粉尘检测是环境保护的重要环节&#xff0c;传统的粉尘检测防护系统的预防方式较为单一。本文设计了一种基于单片机的粉尘检测报警防护系统&#xff0c;能有效地检测粉尘浓度&#xff0c;进行多种方式的报警防护&#xff0c;以保证工作人员的生命健康和安全。…

平价猫粮新选择!福派斯鲜肉猫粮,让猫咪享受美味大餐!

福派斯鲜肉猫粮&#xff0c;作为一款备受铲屎官们青睐的猫粮品牌&#xff0c;凭借其卓越的品质和高性价比&#xff0c;为众多猫主带来了健康与美味的双重享受。接下来&#xff0c;我们将从多个维度对这款猫粮进行解析&#xff0c;让各位铲屎官更加全面地了解它的魅力所在。 1️…

强大的文档编辑工具——坤Tools正式版 V0.4.4【免费的Word转PDF、PDF转Word、替换内容、转换图片、合并图片工具】

在这个信息爆炸的时代&#xff0c;我们每个人都像是一名勇敢的探险家&#xff0c;在茫茫的数据海洋中寻找着属于自己的宝藏。 软件链接&#xff1a;吾爱原创 | 全功能批量处理器&#xff0c;绿色版本&#xff01; 今天给大家带来一款功能强大的文档编辑工具——坤Tools正式版…

孙溟㠭篆刻《睡片原谅一切,醒来不问过往》

孙溟㠭篆刻《睡片原谅一切&#xff0c;醒来不问过往》 佛陀言&#xff1a;睡前原谅一切&#xff0c;醒来不问过往&#xff0c;珍惜所有不期而遇&#xff0c;看淡所有不辞而别甲辰夏溟㠭于寒舍小窗下刊。

Hadoop-15-Hive 元数据管理与存储 Metadata 内嵌模式 本地模式 远程模式 集群规划配置 启动服务 3节点云服务器实测

章节内容 上一节我们完成了&#xff1a; Hive中数据导出&#xff1a;HDFSHQL操作上传内容至Hive、增删改查等操作 背景介绍 这里是三台公网云服务器&#xff0c;每台 2C4G&#xff0c;搭建一个Hadoop的学习环境&#xff0c;供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…

KIVY 3D Rotating Monkey Head¶

3D Rotating Monkey Head — Kivy 2.3.0 documentation KIVY 3D Rotating Monkey Head kivy 3D 旋转猴子头 This example demonstrates using OpenGL to display a rotating monkey head. This includes loading a Blender OBJ file, shaders written in OpenGL’s Shading…

机器学习筑基篇,​Ubuntu 24.04 快速安装 PyCharm IDE 工具,无需激活!

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] Ubuntu 24.04 快速安装 PyCharm IDE 工具 描述&#xff1a;虽然在之前我们安装了VScode&#xff0c;但是其对于使用Python来写大型项目以及各类配置还是比较复杂的&#xff0c;所以这里我们还是推…

docker buildx 交叉编译设置

dockerd配置文件 /etc/docker/daemon.json设置&#xff1a; rootubuntu:/etc/docker# cat daemon.json {"insecure-registries":["localhost:5000","127.0.0.1:5000","172.16.67.111:5000"],"features": {"buildkit&…

三、数据库系统(考点篇)

1、三级模式一两级映像 内模式&#xff1a;管理如何存储物理的 数据 &#xff0c;对数据的存储方式、优化、存放等。 模式&#xff1a;又称为概念模式&#xff0c; 就是我们通常使用的表这个级别 &#xff0c;根据应用、需求将物理数据划分成一 张张表。 外模式&#xff1a;…

springboot出租房租赁系统-计算机毕业设计源码80250

摘 要 随着城市化进程的不断推进&#xff0c;人口流动日益频繁&#xff0c;住房租赁需求逐渐增加。为了更好地满足人们对住房租赁服务的需求&#xff0c;本论文基于Spring Boot框架&#xff0c;设计并实现了一套出租房租赁系统。 首先&#xff0c;通过对市场需求和现有系统的调…

职升网:中级统计师是否属于中级职称?

中级统计师确实属于中级职称。 在统计专业人员的职称体系中&#xff0c;中级统计师占据了重要的位置&#xff0c;它属于中级职称范畴。这个职称体系包括初级、中级、高级和正高级四个层次&#xff0c;每个层次都对应着不同的专业技术岗位等级。初级职称只设助理级&#xff0c;…

Podman 深度解析

目录 引言Podman 的定义Podman 的架构Podman 的工作原理Podman 的应用场景Podman 在 CentOS 上的常见命令实验场景模拟总结 1. 引言 随着容器化技术的发展&#xff0c;Docker 已成为容器管理的代名词。然而&#xff0c;由于 Docker 的一些局限性&#xff0c;如需要守护进程和 …

昇思25天学习打卡营第08天|模型训练

模型训练 模型训练一般分为四个步骤&#xff1a; 构建数据集。定义神经网络模型。定义超参、损失函数及优化器。输入数据集进行训练与评估。 现在我们有了数据集和模型后&#xff0c;可以进行模型的训练与评估。 ps&#xff1a;这里的训练和Stable Diffusion中的训练是一样…