大数据Flink(一百二十):Flink SQL自定义函数(UDF)

news2025/1/11 22:58:06

文章目录

Flink SQL自定义函数(UDF)

一、概述

二、​​​​​​​自定义标量函数(UDSF)

三、​​​​​​​​​​​​​​自定义聚合函数(UDAF)

四、 ​​​​​​​​​​​​​​自定义表值函数(UDTF)


Flink SQL自定义函数(UDF)

Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。

一、概述

在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:

其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。

进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。

点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。 

点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。

二、​​​​​​​​​​​​​​自定义标量函数(UDSF)

  • 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。

udfs.py内容如下:

from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
    return s[begin:end]

 说明:

  • sub_string定义了获取每条数据中从begin~end位的字符的代码;
  • 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
  • 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;

进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:

CREATE TABLE function_udf(
  a VARCHAR,
  b INT,
  c INT
) WITH (
  'connector' = 'socket',
  'hostname' = '178.23.146.213',
  'port' = '9999',
  'format' = 'csv'
);
  • 查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
  • 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
  • 在ecs向9999发送数据
123|456,4,2
12|3456,7,1

结果如下:

查询结果是function_udf表中a字段每行字符串的第3-5个字符。

三、​​​​​​​​​​​​​​自定义聚合函数(UDAF)

  • 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。

udafs.py内容如下:

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udaf

class WeightedAvg(AggregateFunction):

    def create_accumulator(self):
        # Row(sum, count)
        return Row(0, 0)

    def get_value(self, accumulator: Row) -> float:
        if accumulator[1] == 0:
            return 0
        else:
            return accumulator[0] / accumulator[1]

    def accumulate(self, accumulator: Row, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight

    def retract(self, accumulator: Row, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight

weighted_avg = udaf(f=WeightedAvg(),
                    result_type=DataTypes.DOUBLE(),
                    accumulator_type=DataTypes.ROW([
                        DataTypes.FIELD("f0", DataTypes.BIGINT()),
                        DataTypes.FIELD("f1", DataTypes.BIGINT())]))

说明:

  • 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
  • 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
  • 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
  • create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。

仍然使用function_udf表,查询语句如下:

SELECT weighted_avg(b,c)
FROM function_udf;

选中查询语句运行之后,向9999端口依次发送数据,如下:

123|456,4,2

12|3456,7,1

 

查询结果是以c字段为权重的b字段当前数据和历史数据的均值。

四、 ​​​​​​​​​​​​​​自定义表值函数(UDTF)

自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。

udtfs.py内容如下:

from pyflink.table import DataTypes
from pyflink.table.udf import udtf

@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):
    splits = s.split("|")
    yield splits[0], splits[1]

说明:

  • 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
  • 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
  • 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。

 仍然使用function_udf表,查询语句如下:

SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
  • 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1

结果:

查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2142882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

re题(20)BUUCTF [GWCTF 2019]pyre

BUUCTF在线评测 (buuoj.cn) Python解包及反编译: PyInstaller Extractoruncompyle6 - 知乎 (zhihu.com) python撤消: Pycharm撤销操作和代码跳转后退回操作以及消除波浪线操作快捷键_pycharm怎么反撤销-CSDN博客 把.pyc文件变成py文件 把.py文件用记事本打开 cod…

supermap iclient3d for cesium模型沿路径移动

可以直接settimeout隔一段时间直接设置位置属性,但是得到的结果模型不是连续的移动,如果想要连续的移动,就需要设置一个时间轴,然后给模型传入不同时间时的位置信息,然后就可以了。 开启时间轴 let start Cesium.Jul…

负载均衡:从理论到实践 ---day04

负载均衡 负载均衡1.什么是负载均衡2.负载均衡的分类硬件负载均衡软件负载均衡选择 3.引入负载均衡的好处 第一个Ribbon实例步骤1:步骤2:步骤3:步骤4: 问题1. 负载均衡的主要目标是什么?2. 负载均衡器的作用是什么&…

【云岚到家-即刻体检】-day07-2-项目介绍及准备

【云岚到家-即刻体检】-day07-2-项目介绍及准备 1 项目介绍1)项目简介2)界面原型3)实战目标 2 搭建实战环境1)服务端2)管理端前端工程3)用户端前端工程4)测试 3 熟悉项目代码1)接口文…

Linux操作系统面试题记录

一、进程与线程 1.并发和并行的区别 并发:一个cpu处理器处理多个任务; 并行:多个cpu处理器处理多个任务; 2.进程和线程是什么?区别?何时用线程何时用进程? Linux中其实没有进程线程之分&…

面试官:讲一讲Spring MVC源码解析

好看的皮囊千篇一律、有趣的灵魂万里挑一 文章持续更新,可以微信搜索【小奇JAVA面试】第一时间阅读,回复【资料】获取福利,回复【项目】获取项目源码,回复【简历模板】获取简历模板,回复【学习路线图】获取学习路线图。…

驱动器磁盘未格式化恢复实战

驱动器磁盘未格式化的深度剖析 在日常的数字生活中,驱动器作为数据存储的重要载体,承载着用户无数的珍贵资料。然而,当遇到“驱动器中的磁盘未被格式化”的提示时,这份平静往往会被瞬间打破。这一状况不仅让用户感到困惑和焦虑&a…

JZ2440开发板——S3C2440的UART的使用

以下内容源于韦东山课程的学习与整理,如有侵权请告知删除。 一、UART硬件简介 UART,全称是“Universal Asynchronous Receiver Transmitter”,即“通用异步收发器”,也就是我们日常说的“串口”。 它在嵌入式中用途非常广泛&…

LabVIEW提高开发效率技巧----VI服务器和动态调用

VI服务器(VI Server)和动态调用是LabVIEW中的两个重要功能,可以有效提升程序的灵活性、模块化和可扩展性。通过这两者的结合,开发者可以在运行时动态加载和调用VI(虚拟仪器),实现更为复杂的应用…

【 html+css 绚丽Loading 】 000052 璇玑转轮

前言:哈喽,大家好,今天给大家分享今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 &#x1f…

实时数仓3.0DWD层

实时数仓3.0DWD层 DWD层设计要点:9.1 流量域未经加工的事务事实表9.1.1 主要任务9.1.2 思路9.1.3 图解9.1.4 代码 9.2 流量域独立访客事务事实表9.2.1 主要任务9.2.2 思路分析9.2.3 图解9.2.4 代码 9.3 流量域用户跳出事务事实表9.3.1 主要任务9.3.2 思路分析9.3.3 …

全面掌握 Jest:从零开始的测试指南(下篇)

在上一篇测试指南中,我们介绍了Jest 的背景、如何初始化项目、常用的匹配器语法以及钩子函数的使用。这一篇篇将继续深入探讨 Jest 的高级特性,包括 Mock 函数、异步请求的处理、Mock 请求的模拟、类的模拟以及定时器的模拟、snapshot 的使用。通过这些技…

办了房屋抵押经营贷,空壳公司不怕被查吗?续贷不上怎么办?

很多有房的朋友,想必都办理过抵押经营贷款。但是,当办完房屋抵押经营贷款之后,钱到手了,别光顾着乐呵,贷后管理可是门大学问,稍有不慎,麻烦就找上门了。咱得确保资金用得对路,征信亮…

windows 使用wsl安装docker

前言 很多情况下代码开发需要依赖 Linux 系统,比如安装 Docker 容器来实现代码隔离,然而问题是大部分同学的电脑都是 Windows 系统,这时就会出现大量报错,经历过的同学一定是边踩坑边落泪。 如何免费拥有一台 Linux 服务器呢&…

什么是即时通讯平台

在当今数字化时代,高效的沟通和协作是企业成功的关键。为了满足企业的沟通需求,即时通讯平台应运而生。WorkPlus作为一款企业级即时通讯平台,提供了丰富的功能和安全性,助力企业实现高效协作、数字化办公以及推动业务发展。本文将…

为什么直播要用RTMP?

为什么要选RTMP 直播使用RTMP(Real-Time Messaging Protocol)协议的原因主要有以下几点: 1. 低延迟特性 RTMP被设计为实时消息传递协议,通过优化传输机制,可以实现较低的传输延迟。这对于直播来说至关重要&#xff…

LeetCode_sql_day26(184,1549,1532,1831)

描述 184.部门工资最高的员工 表: Employee ----------------------- | 列名 | 类型 | ----------------------- | id | int | | name | varchar | | salary | int | | departmentId | int | -----------------…

22章 开发高效算法

1.编写一个程序,提示用户输入一个字符串,然后显示最大连续递增的有序子字符串。分析你的程序的时间复杂度。 import java.util.Scanner;public class Test {public static void main(String[] args) {System.out.println("请输入字符串&#xff1a…

这个公司可以做点什么呢?

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不…

国学盛典 致敬先贤 《老子与道德经》纪录片研讨会在北京善品堂国学馆圆满落幕

9月10日,《老子与道德经》纪录片研讨会在北京善品堂国学馆圆满落幕。中国著名表演艺术家、曾饰演央视86版电视剧《西游记》中“孙悟空”的六小龄童先生与两百余人传统文化传播者、践行者、爱好者齐聚一堂,共同交流。本次会议由中国文化促进会福文化工作委…