0基础学习PyFlink——用户自定义函数之UDF

news2024/11/18 19:28:59

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,
        input_types: Union[List[DataType], DataType, str, List[str]] = None,
        result_type: Union[DataType, str] = None,
        deterministic: bool = None, name: str = None, func_type: str = "general",
        udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])
    def colFunc(oneCol):
        return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])
    def colFunc(oneCol):
        return Row(oneCol.lower())
              
    tab_lower=tab_source.map(colFunc(col('word')))   
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

if __name__ == '__main__':
    word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)
    def rowFunc(row):
        return Row(row[0].lower())

    tab_lower=tab_source.map(rowFunc) 
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())
    def colFunc(oneCol):
        return oneCol.lower()
    
    tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()
    @udf(result_type=DataTypes.STRING())
    def rowFunc(row):
        return row[0].lower()

    tab_lower=tab_source.map(rowFunc).alias('lower_word')
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1138465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于LSTM encoder-decoder模型实现英文转中文的翻译机器

前言 神经网络机器翻译(NMT, neuro machine tranlation)是AIGC发展道路上的一个重要应用。正是对这个应用的研究,发展出了注意力机制,在此基础上产生了AIGC领域的霸主transformer。我们今天先把注意力机制这些东西放一边,介绍一个对机器翻译…

[论文阅读]Point Density-Aware Voxels for LiDAR 3D Object Detection(PDV)

PDV Point Density-Aware Voxels for LiDAR 3D Object Detection 论文网址:PDV 论文代码:PDV 简读论文 摘要 LiDAR 已成为自动驾驶中主要的 3D 目标检测传感器之一。然而,激光雷达的发散点模式随着距离的增加而导致采样点云不均匀&#x…

云原生架构设计理论与实践

云原生架构设计理论与实践 云原生架构概述 云原生的背景 云原生定义和特征 云原生架构的设计原则 架构模式 服务化架构模式 Mesh化架构模式 Serverless模式 存储计算分离模式 分布式事务模式 可观测架构 事件驱动架构 云原生架构相关技术 容器技术 云原生微服务技术 无服务…

orm连接mysql

7.2 ORM ORM可以帮助我们做两件事 创建、修改、删除数据库中的表(不用写SQL语句)。无法创建数据库操作表中的数据(操作表中的数据)。 1.自己创建数据库 启动自己的mysql服务自带的工具创建数据库 create database gx_day5 DE…

CMake多文件构建初步

前面学习了cmake,不熟悉,只是记录了操作过程;下面再继续; 略有一点进步,增加一个代码文件,之前是1个代码文件; 如下图,prj是空文件夹, CMakeLists.txt如下;…

MySQL多表关联on和where速度对比实测谁更快

MySQL多表关联on和where速度对比实测谁更快 背景 今天发现有人在讨论:两张MySQL的数据表按照某一个字段进行关联的时候查询,我们使用on和where哪种查询方式更快。百闻不如一见,我们来亲自测试下。 先说结论 Where、对等查询的join速度基本…

vue使用smooth-signature实现移动端电子签字,包括横竖屏

vue使用smooth-signature实现移动端电子签字&#xff0c;包括横竖屏 1.使用smooth-signature npm install --save smooth-signature二.页面引入插件 import SmoothSignature from "smooth-signature";三.实现效果 四.完整代码 <template><div class&quo…

第13期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练 Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…

asp.net学生考试报名管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net学生考试报名管理系统是一套完善的web设计管理系统系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使 用c#语言开发 应用技术&#xff1a;asp…

YouTrack 中如何设置邮件通知

在 YouTrack 中&#xff0c;默认是不会邮件通知的。 你可以为你的账号设置邮件通知。 设置的方法为单击用户属性&#xff0c;然后在弹出的小窗口中选择属性选项。 设置邮件通知 在通知 Tab 页面中&#xff0c;选择发送邮件的方式&#xff0c;默认这个选项是不选择的。 用户…

React之如何捕获错误

一、是什么 错误在我们日常编写代码是非常常见的 举个例子&#xff0c;在react项目中去编写组件内JavaScript代码错误会导致 React 的内部状态被破坏&#xff0c;导致整个应用崩溃&#xff0c;这是不应该出现的现象 作为一个框架&#xff0c;react也有自身对于错误的处理的解…

spring boot利用redis作为缓存

一、缓存介绍 在 Spring Boot 中&#xff0c;可以使用 Spring Cache abstraction 来实现缓存功能。Spring Cache abstraction 是 Spring 框架提供的一个抽象层&#xff0c;它对底层缓存实现&#xff08;如 Redis、Ehcache、Caffeine 等&#xff09;进行了封装&#xff0c;使得在…

vue实现多时间文字条件查询

// 搜索功能 // 获取搜索框内容 const dateOneref() const dateTworef() // 重新创建数组 const itemList ref([]); const query()>{console.log(formattedDateTime.value);console.log(list.value);itemList.value list.value.filter(item > {//获取数据框的内容是否与…

垃圾回收系统小程序

在当今社会&#xff0c;废品回收不仅有利于环境保护&#xff0c;也有利于资源的再利用。随着互联网技术的发展&#xff0c;个人废品回收也可以通过小程序来实现。本文将介绍如何使用乔拓云网制作个人废品回收小程序。 1. 找一个合适的第三方制作平台/工具&#xff0c;比如乔拓云…

C++之Linux syscall实例总结(二百四十六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

进程和多线程

目录 进程 1. 如何管理进程 2. 进程调度 3. 内存管理 4. 进程间通信 多线程 线程和进程的关系&#xff1a; 线程安全问题 进程 一个正在运行的程序,就是一个 进程,进程是一个重要的 "软件资源",是由操作系统内核负责管理的。每个进程都对应一些资源,在上图中…

【面试经典150 | 栈】简化路径

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;字符串数组模拟栈 其他语言python3 写在最后 Tag 【栈】【字符串】 题目来源 71. 简化路径 题目解读 将 Unix 风格的绝对路径转化成更加简洁的规范路径。字符串中会出现 字母、数字、/、_、. 和 .. 这几种字符&#…

c语言之源码反码和补码

c语言源码反码和补码 c语言之源码反码和补码 c语言源码反码和补码一、源码反码补码的介绍二、源码反码补码例子三、源码反码补码练习 一、源码反码补码的介绍 原码、反码、补码是计算机中对数字的二进制表示方法。 原码&#xff1a;将最高位作为符号位&#xff08;0表示正&…

sipp3.6多方案压测脚本

概述 SIP压测工具sipp&#xff0c;免费&#xff0c;开源&#xff0c;功能足够强大&#xff0c;配置灵活&#xff0c;优点多。 有时候我们需要模拟现网的生产环境来压测&#xff0c;就需要同时有多个sipp脚本运行&#xff0c;并且需要不断的调整呼叫并发。 通过python脚本的子…

一文讲透 “中间层” 思想

作者&#xff1a;明明如月学长&#xff0c; CSDN 博客专家&#xff0c;大厂高级 Java 工程师&#xff0c;《性能优化方法论》作者、《解锁大厂思维&#xff1a;剖析《阿里巴巴Java开发手册》》、《再学经典&#xff1a;《EffectiveJava》独家解析》专栏作者。 热门文章推荐&…