0基础学习PyFlink——用户自定义函数之UDTF

news2024/11/20 3:26:00

大纲

  • 表值函数
  • 完整代码

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
在这里插入图片描述

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,
        input_types: Union[List[DataType], DataType, str, List[str]] = None,
        result_type: Union[DataType, str] = None,
        deterministic: bool = None, 
        name: str = None, 
        func_type: str = "general",
        udf_type: str = None
        ) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,
         input_types: Union[List[DataType], DataType, str, List[str]] = None,
         result_types: Union[List[DataType], DataType, str, List[str]] = None,
         deterministic: bool = None,
         name: str = None
         ) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)
    def rowFunc(row):
        if row[0].isupper():
            yield row[0]
            yield row[0].lower()
        else:
            yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)
    tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')
    tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算

    tab_trans_alias.group_by(col('trans_word')) \
        .select(col('trans_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "a", "C"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    
    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)
    def rowFunc(row):
        if row[0].isupper():
            yield row[0]
            yield row[0].lower()
        else:
            yield row[0]

    tab_trans=tab_source.flat_map(rowFunc)
    tab_trans.execute().print()
    tab_trans_alias=tab_trans.alias('trans_word')
    tab_trans_alias.execute().print()
    tab_trans_alias.group_by(col('trans_word')) \
        .select(col('trans_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

if __name__ == '__main__':
    word_count()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

macOS Sonoma 14.1正式版发布 改善Apple Music界面 新增保修状态显示

10月26日消息,苹果今天为 macOS Sonoma 推出了 14.1 版本更新,本更新主要改善了 Apple Music 界面,设置中新增保修状态,并修复了多项错误内容。 经过几周的用户测试,Apple 正式向所有 Mac 用户发布了 macOS Sonoma 14.…

【代码随想录01】数组总结

抄去吧,保存去吧!

p5.js 3D图形-立方体

本文简介 带尬猴,我嗨德育处主任 前面写了几篇 p5.js 文章 都还没涉及到3D图形,但其实 p5.js 是提供了基础的3D图形的。 本文就从最简单的立方体讲起,并做几个小demo和各位工友一起掌握立方体的用法。 立方体的基础用法 在 p5.js 里使用 b…

智慧矿山AI算法助力煤矿安全:人员越界识别精准迅速

煤矿作为一种危险性较高的工业领域,安全管理一直是煤矿企业的重要任务。传统煤矿安全管理主要依靠人工巡逻及视频监控等手段,但这些方法往往存在人力不足、盲区多等问题,无法实时监控和预警,难以有效避免事故的发生。随着人工智能…

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载 1,安装anaconda1.1 下载anaconda安装包1.2 安装anaconda1.3 设计环境变量1.4 安装完成验证 2 Anaconda安装pytorch2.1 创建虚拟环境2.2 查看现存环境2.3 激活环境2.4 选择合适的pytorch版本下…

FL Studio水果2023体验版如何破解?

FL Studio是一款非常专业的水果工具,软件功能齐全,拥有编曲、剪辑、录音、混音等功能,可以满足用户的各种音乐制作需求。软件已经成功破解,全中文的软件界面,去除了试用时间限制,有需要的快来下载吧&#x…

《动手学深度学习 Pytorch版》 10.6 自注意力和位置编码

在注意力机制中,每个查询都会关注所有的键-值对并生成一个注意力输出。由于查询、键和值来自同一组输入,因此被称为 自注意力(self-attention),也被称为内部注意力(intra-attention)…

react-typescript-demo

1.使用 Context 来存储数据

rabbitMQ入门指南:管理页面全面指南及实战操作

文章目录 1. 引言2. RabbitMQ 管理页面的概览3. 探索 RabbitMQ 管理页面的主要功能3.1 连接3.2 通道3.3 交换机3.4 队列3.5 生产者3.6 消费者 4. RabbitMQ 的实战例子4.1 创建虚拟机4.2 连接和通道4.3 建立交换机4.3.1 交换机类型4.3.2 创建一个交换机 4.4 创建队列4.4.1 队列类…

海外广告投放保姆级教程,如何使用Quora广告开拓新流量市场?

虽然在Quora 上学习广告相对容易,但需要大量的试验和错误才能找出最有效的方法。一些广告技巧可以让您的工作更有效率。这篇文章将介绍如何有效进行quora广告投放与有价值的 Quora 广告要点,这将为您节省数万美元的广告支出和工作时间!往下看…

五大绩效指标,为企业新媒体矩阵管理注入新动力

⭐关注矩阵通服务号,探索企业新媒体矩阵搭建与营销策略 新媒体矩阵的建设运营是为了能实现企业确定的业务目标。 那如何才能让推动最后业务目标的实现? 对于企业来说,可以将业务目标拆解为短期绩效目标,通过适当调整短期指标来保证…

【Java 进阶篇】Java HTTP 请求消息详解

HTTP(Hypertext Transfer Protocol)是一种用于传输超文本的应用层协议,广泛用于构建互联网应用。在Java中,我们经常需要发送HTTP请求来与远程服务器进行通信。本文将详细介绍Java中HTTP请求消息的各个部分,包括请求行、…

spring-初识spring

初识spring Spring简介 初识spring一、Spring 特性二、IOC容器(反转控制)1、IOC容器1.1、IOC思想1.2、IOC容器在Spring中的实现 2、基于XML管理bean2.1入门案例2.2获取bean2.3依赖注入(DI)2.3.1set方法注入2.3.1构造器注入 2.4 为类类型属性赋值2.4.1 引用外部已声明bean2.4.2 …

一键同步,无处不在的书签体验:探索多电脑Chrome书签同步插件

说在前面 平时大家都是怎么管理自己的浏览器书签数据的呢?有没有过公司和家里的电脑浏览器书签不同步的情况?有没有过电脑突然坏了但书签数据没有导出,导致书签数据丢失了?解决这些问题的方法有很多,我选择自己写个chr…

数据结构与算法基础(青岛大学-王卓)(9)

终于迎来了最后一部分(排序)了,整个王卓老师的数据结构就算是一刷完成了,但是也才是数据结构的开始而已,以后继续与诸位共勉 😃 (PS.记得继续守护家人们的健康当然还有你自己的)。用三根美味的烤香肠开始吧。。。 文章目录 排序基…

高级深入--day41

用Pymongo保存数据 爬取豆瓣电影top250movie.douban.com/top250的电影数据,并保存在MongoDB中。 items.py class DoubanspiderItem(scrapy.Item):# 电影标题title scrapy.Field()# 电影评分score scrapy.Field()# 电影信息content scrapy.Field()# 简介info …

基于SSM的OA办公系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…

nginx浏览器缓存和上流缓存expires指令_nginx配置HTTPS

1.nginx控制浏览器缓存是针对于静态资源[js,css,图片等] 1.1 expires指令 location /static {alias/home/imooc;#设置浏览器缓存10s过期expires 10s;#设置浏览器缓存时间晚上22:30分过期expires @22h30m;#设置浏览器缓存1小时候过期expires -1h;#设置浏览器不缓存expires …

【JavaEE初阶】 线程安全的集合类

文章目录 🍀前言🌲多线程环境使用 ArrayList🚩自己使用同步机制 (synchronized 或者 ReentrantLock)🚩Collections.synchronizedList(new ArrayList);🚩使用 CopyOnWriteArrayList 🎍多线程环境使用队列&am…

通过jdk自制https证书并配置到nginx并配置http2

生成证书 这里使用自己生成的免费证书。在${JAVA_HOME}/bin 下可以看到keytool.exe,在改目录打开cmd然后输入: keytool -genkey -v -alias lgq.com -keyalg RSA -keystore d:/zj/ssl/fastfly.com.keystore -validity 3650生成证书过程中:【你的名字】对…