FlinkAPI开发之自定义函数UDF

news2024/9/21 14:47:07

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 实现自定义接口FilterFunction
        DataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {
            @Override
            public boolean filter(Orders orders) throws Exception {
                //过滤金额大于10000元的订单
                if (orders.getOrder_amount() > 50) {
                    return true;
                } else {
                    return false;
                }

            }
        });
        streamOperator.print();
        environment.execute();
    }
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;

public class OrderFilter implements FilterFunction<Orders> {
    @Override
    public boolean filter(Orders orders) throws Exception {
        //过滤金额大于10000元的订单
        if (orders.getOrder_amount() > 50) {
            return true;
        } else {
            return false;
        }
    }
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 返回类型记得修改为 DataStream
        DataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());
        operator.print();
        environment.execute();
    }
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明
        DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);
        streamOperator.print();
        environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        ordersDataStreamSource.print();
        // TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型
        DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
            }

            @Override
            public String map(Orders orders) throws Exception {
                return orders.getOrder_date().toString()+"字符串";
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
            }
        });
        operator.print();
        environment.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1365938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

接口芯片选型分析 四通道差分驱动可满足ANSI TIA/EIA-422-B 和ITU V.11 的要求 低功耗,高速率,高ESD

四通道差分驱动可满足ANSI TIA/EIA-422-B 和ITU V.11 的要求 低功耗&#xff0c;高速率&#xff0c;高ESD。 其中GC26L31S可替代AM26LS31/TI&#xff0c;GC26L32S替代AM26LS32/TI&#xff0c;GC26E31S替代TI的AM26LV31E

第04章_数组

第04章_数组 讲师&#xff1a;尚硅谷-宋红康&#xff08;江湖人称&#xff1a;康师傅&#xff09; 官网&#xff1a;http://www.atguigu.com 本章专题脉络 1、数组的概述 1.1 为什么需要数组 需求分析1&#xff1a; 需要统计某公司50个员工的工资情况&#xff0c;例如计算…

第十四章 this关键字介绍和使用

文章目录 一、this是哪里二、常见的this指向2.1 默认绑定2.2 隐式绑定2.3 显示绑定2.4 构造函数绑定 三、函数的方法四、练习&#xff1a;相关文章&#xff1a; 一、this是哪里 this在英文中的含义是【这】。那么【这】是【哪】&#xff1f;this关键字一般存在于函数中&#x…

vue3 img图片怎么渲染

在 Vue3 中加载图片&#xff08;img&#xff09;src地址时&#xff0c;出现无法加载问题。网上很多都建议使用 require 加载相对路径&#xff0c;如下&#xff1a; <img :src"require(../assets/img/icon.jpg)"/>但是按照这种方式加载又会报错如下&#xff1a;…

简单易懂的PyTorch线性层解析:神经网络的构建基石

目录 torch.nn子模块Linear Layers详解 nn.Identity Identity 类描述 Identity 类的功能和作用 Identity 类的参数 形状 示例代码 nn.Linear Linear 类描述 Linear 类的功能和作用 Linear 类的参数 形状 变量 示例代码 nn.Bilinear Bilinear 类的功能和作用 B…

Open CV 图像处理基础:(一)Open CV 在windows环境初始化和 Java 动态库加载方式介绍

Open CV 在windows环境初始化和 Java 动态库加载方式介绍 Open CV是一个开源的计算机视觉和机器学习软件库&#xff0c;它提供了一系列的工具和程序库&#xff0c;让用户能够进行复杂的图像处理和计算机视觉任务。在Java中使用OpenCV涉及到环境初始化和动态库加载。以下是一些…

什么是消费增值?如何做到增值?

在当今的商业世界&#xff0c;消费观念正在经历一场深刻的变革。传统的消费模式中&#xff0c;消费者购买商品后&#xff0c;交易即结束&#xff0c;消费者与商品的关系仅停留在使用层面。然而&#xff0c;随着消费增值模式的出现&#xff0c;这一观念正在被颠覆。这一模式将消…

蓝牙网关在物联网领域三大应用

蓝牙网关在物联网的应用主要包括物联网室内定位、物联网数据采集、物联网连接控制三大应用领域&#xff0c;以下对三大应用领域做详细解释。 一、物联网蓝牙室内定位 蓝牙网关在室内定位的应用包括人员定位和资产设备定位两大方向。 1、人员定位 蓝牙网关安装于室内的特定地…

Fragstats景观格局指数计算入门教程

土地利用以及景观格局是当前全球环境变化研究的重要组成部分及核心内容&#xff0c;其对区域的可持续发展以及区域土地管理有非常重要的意义。通过对土地利用时空变化规律进行分析可以更好的了解土地利用变化的过程和机制&#xff0c;并且通过调整人类社会经济活动&#xff0c;…

【AI视野·今日NLP 自然语言处理论文速览 第七十一期】Fri, 5 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Fri, 5 Jan 2024 Totally 28 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers LLaMA Pro: Progressive LLaMA with Block Expansion Authors Chengyue Wu, Yukang Gan, Yixiao Ge, Zeyu Lu, …

安科瑞智慧型动态无功补偿的工业应用——安科瑞赵嘉敏

摘要&#xff1a;低压配电系统的无功补偿是电能质量治理的重要环节。在传统无功补偿中&#xff0c;响应速度较慢&#xff0c;补偿电流呈阶梯式&#xff0c;存在过补或欠补的现象&#xff0c;有时未必能到达理想的效果。为了解决这一问题&#xff0c;人们提出了一种无功补偿综合…

明基、书客、松下护眼台灯怎么样?三款热门台灯真实测评

近年来学生近视的现象越来越严重了&#xff0c;而且近视的年龄也越来越小了&#xff0c;不少还没开始上小学的孩子&#xff0c;就已经戴上了厚厚的近视眼镜。而那些高年级的学生更是近视的重灾区&#xff0c;不仅需要高强度的学习和长时间用眼&#xff0c;而且每晚都需要学习到…

cuttag和chip-seq的区别?

Cut&Tag&#xff08;Cleavage Under Targets and Tagmentation&#xff09;和ChIP-Seq&#xff08;Chromatin Immunoprecipitation Sequencing&#xff09;都是用于研究蛋白质与DNA相互作用的生物技术。它们在技术原理和应用方面有一些关键的区别。 1.ChIP-Seq测序 1.1 …

Linux 部署 AI 换脸

我使用的系统是 Ubuntu 20.04 文章实操主要分为以下几个部分 1、python 环境安装 2、下载 FaceFusion 上传服务器 3、创建 python 虚拟环境 4、下载 FaceFusion 依赖&#xff08;这里的命令执行时间会很长&#xff0c;够你睡午觉了&#xff09; 5、运行 FaceFusion 6、开…

基于SSM的基金投资交易管理网站的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

Java学习笔记(五)——时间相关类

文章目录 JDK7以前时间相关类Date 时间类阅读源码练习 SimpleDateFormat 格式化时间作用构造方法常用方法日期和时间模式练习 Calendar 日历获取Calendar对象的方法Calendar常用方法 JDK8新增时间相关类变化Date类ZoneId&#xff1a;时区Instant&#xff1a;时间戳ZoneDateTime…

ECharts 实现省份在对应地图的中心位置

使用 ECharts 下载的中国省市区的json文件不是居中的(如下图所示)&#xff0c;此时需要修改json文件中的 cp 地理位置&#xff0c;设置成每个省份的中心位置 {"type": "FeatureCollection","features":[{ "type": "Feature"…

C++ 手写堆 || 堆模版题:堆排序

输入一个长度为 n 的整数数列&#xff0c;从小到大输出前 m 小的数。 输入格式 第一行包含整数 n 和 m 。 第二行包含 n 个整数&#xff0c;表示整数数列。 输出格式 共一行&#xff0c;包含 m 个整数&#xff0c;表示整数数列中前 m 小的数。 数据范围 1≤m≤n≤105 &…

护眼灯色温多少合适?盘点合适色温的护眼台灯

有了孩子&#xff0c;就等于同时有了软肋和铠甲&#xff0c;也总是在自己的能力范围内&#xff0c;把最好的东西给他。当孩子开始学习知识后更是如此&#xff0c;能力范围内最好的教育资源、最好的学习环境&#xff0c;以及各种与之配套的学习用具。护眼台灯在这时候就安排上了…

热钱涌向线控底盘!XYZ全栈集成引领新风向

在车身、底盘部分&#xff0c;中央计算区域控制带动传统车控、底盘及动力控制ECU市场迎来新一轮技术升级和域融合窗口期。线控制动、转向及空气悬架&#xff0c;正在加速与智能驾驶融合并进一步提升驾乘体验。 12月13-15日&#xff0c;2023&#xff08;第七届&#xff09;高工…