Flink SQL自定义表值函数(Table Function)

news2024/12/27 3:07:44

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 输入数据:
 * nc -lk 8888
 * a,bb,cc
 * 
 * 输出结果:
 * 
 * res1=>:5> +I[a,bb,cc, a, 1]
 * res1=>:7> +I[a,bb,cc, cc, 2]
 * res1=>:6> +I[a,bb,cc, bb, 2]
 * res8=>:4> +I[a,bb,cc, a, 1]
 * res8=>:5> +I[a,bb,cc, bb, 2]
 * res8=>:6> +I[a,bb,cc, cc, 2]
 * res4=>:3> +I[a,bb,cc, cc, 2]
 * res4=>:1> +I[a,bb,cc, a, 1]
 * res4=>:2> +I[a,bb,cc, bb, 2]
 * res7=>:8> +I[a,bb,cc, bb, 2]
 * res7=>:1> +I[a,bb,cc, cc, 2]
 * res7=>:7> +I[a,bb,cc, a, 1]
 * res2=>:2> +I[a,bb,cc, cc, 2]
 * res2=>:8> +I[a,bb,cc, a, 1]
 * res2=>:1> +I[a,bb,cc, bb, 2]
 * res6=>:1> +I[a,bb,cc, cc, 2]
 * res6=>:7> +I[a,bb,cc, a, 1]
 * res6=>:8> +I[a,bb,cc, bb, 2]
 * res3=>:6> +I[a,bb,cc, bb, 2]
 * res3=>:7> +I[a,bb,cc, cc, 2]
 * res3=>:5> +I[a,bb,cc, a, 1]
 * res5=>:7> +I[a,bb,cc, bb, 2]
 * res5=>:8> +I[a,bb,cc, cc, 2]
 * res5=>:6> +I[a,bb,cc, a, 1]
 */
public class TableFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        Table table = tEnv.fromDataStream(source, "field");

        tEnv.createTemporaryView("SourceTable", table);

        // 在 Table API ⾥可以直接调⽤ UDF
        Table res1 = tEnv.from("SourceTable")
                .joinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));

        Table res2 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));


        // 在 Table API ⾥重命名 UDF 的结果字段
        Table res3 = tEnv.from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .as("myField", "newWord", "newLength")
                .select($("myField"), $("newWord"), $("newLength"));

        // 注册函数
        tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

        // 在 Table API ⾥调⽤注册好的 UDF
        Table res4 = tEnv
                .from("SourceTable")
                .joinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));


        Table res5 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));

        // 在 SQL ⾥调⽤注册好的 UDF
        Table res6 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable, LATERAL TABLE(SplitFunction(field))");

        Table res7 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");


        // 在 SQL ⾥重命名 UDF 字段
        Table res8 = tEnv.sqlQuery(
                "SELECT field, newWord, newLength " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");

        tEnv.toDataStream(res1).print("res1=>");
        tEnv.toDataStream(res2).print("res2=>");
        tEnv.toDataStream(res3).print("res3=>");
        tEnv.toDataStream(res4).print("res4=>");
        tEnv.toDataStream(res5).print("res5=>");
        tEnv.toDataStream(res6).print("res6=>");
        tEnv.toDataStream(res7).print("res7=>");
        tEnv.toDataStream(res8).print("res8=>");

        env.execute();
    }

    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class SplitFunction extends TableFunction<Row> {
        public void eval(String str) {
            for (String s : str.split(",")) {
                // 输出结果
                collect(Row.of(s, s.length()));
            }
        }
    }
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1194902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是进程等待?

什么是进程等待 在了解进程等待之前&#xff0c;我们要回顾一下什么是僵尸进程&#xff1a;是指一个已经终止执行的进程&#xff0c;但其父进程还没有通过 wait() 系统调用来获取该进程的退出状态信息。当一个进程正常退出或者被终止时&#xff0c;其所占用的系统资源会被操作…

基于SSM的智能物业管理网站的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

rabbitMq虚拟主机概念

虚拟主机是RabbitMQ中的一种逻辑隔离机制&#xff0c;用于将消息队列、交换机以及其他相关资源进行隔离。 在RabbitMQ中&#xff0c;交换机&#xff08;Exchange&#xff09;用于接收生产者发送的消息&#xff0c;并根据特定的路由规则将消息分发到相应的队列中。而虚拟主机则…

使用 pubsub-js 进行消息发布订阅

npm 包地址 github 包地址 pubsub-js 是一个轻量级的 JavaScript 基于主题的消息订阅发布库 &#xff0c;压缩后小于1b。它具有使用简单、性能高效、支持多平台等优点&#xff0c;可以很好地满足各种需求。 功能特点&#xff1a; 无依赖同步解耦ES3 兼容。pubsub-js 能够在…

快速修复因相机断电导致视频文件打不开的问题

3-5 本文主要解决因相机突然断电导致拍摄的视频文件打不开的问题。 在日常工作中&#xff0c;有时候需要使用相机拍摄视频&#xff0c;比如现在有不少短视频拍摄的需求&#xff0c;如果因电池突然断电的原因&#xff0c;导致拍出来的视频播放不了&#xff0c;这时候就容易出大…

el-table 多表格弹窗嵌套数据显示异常错乱问题

1、业务背景 使用vueelement开发报表功能时&#xff0c;需要列表上某列的超链接按钮弹窗展示&#xff0c;在弹窗的el-table列表某列中再次使用超链接按钮点开弹窗&#xff0c;以此类推多表格弹窗嵌套&#xff0c;本文以弹窗两次为例 最终效果如下示例页面 2、具体实现和问题…

hub.docker访问不了的问题(一步解决)

暂时我也不清楚&#xff0c;但是下面这个网址可以用(可以先用着)Docker Hub Container Image Library | App Containerization (axlinux.top)https://hub.axlinux.top/

美格智能5G RedCap模组顺利完成中国联通5G物联网OPENLAB开放实验室认证

近日&#xff0c;美格智能5G RedCap模组SRM813Q顺利通过中国联通5G物联网OPENLAB开放实验室端到端的测试验收&#xff0c;并获得OPENLAB实验室的认证证书。这标志着该模组产品各项性能均已符合RedCap商用标准&#xff0c;为5G RedCap规模商用奠定了坚实基础。 中国联通5G物联网…

使用WinDbg分析软件突然崩溃的问题

为了测试windbg有多么牛逼&#xff0c;所以仅仅只是测试一下&#xff0c;属于事后诸葛亮型&#xff0c;也只是为了验证一下&#xff0c;把此方法学会即可。 模拟场景&#xff1a; 软件运行后&#xff0c;点击按钮&#xff0c;直接崩溃掉&#xff0c;什么提示都没有。因此&…

统计学习笔记第 1 部分:Hoeffding 的不等式推导与模拟

照片由Unsplash上的Luca Bravo拍摄 1&#xff1a;背景与动机 霍夫丁不等式是数理统计和机器学习 (ML) 中的一个重要的集中不等式&#xff0c;广泛应用于统计学习理论等理论领域以及强化学习等应用领域。 我注意到&#xff0c;在机器学习社区的一些地方&#xff0c;通常将 Hoeff…

图数据库Neo4j详解

文章目录 第一章 图和Neo4j1.1 图数据库概念1.1.1 图论起源1.1.2 节点-关系及图1.1.3 图数据库1.1.4 图数据库分类1.1.4 图数据库应用场景1.1.5 与关系型数据库对比1.1.6 图数据库优势 1.2 Neo4j介绍1.2.1 Neo4j是什么1.2.2 Neo4j特点1.2.3 Neo4j的优势1.2.4 Neo4j的限制1.2.5 …

网络安全(黑客)-高效自学

首先给大家简单介绍一下网络安全&#xff1a; 1.什么是网络安全&#xff1f; 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、…

eNsp使用技巧

文章目录 显示网格对齐到网络水平对齐和垂直对齐 显示所有接口添加文本进入CLI界面数据抓包方式一方式二 显示网格 对齐到网络 水平对齐和垂直对齐 显示所有接口 添加文本 进入CLI界面 数据抓包 方式一 方式二

No source control providers registered

使用vscode时碰到这个问题 git扩展没启动

Linux前言

目录 Linux的应用场景 Linux的应用现状 Linux的版本 操作系统 什么是Linux操作系统&#xff1f; 为什么要用操作系统&#xff1f; 上篇我们介绍了Linux的历史背景和安装环境。 Linux的应用场景 因为Linux操作系统是开源&#xff0c;所以它流向各个领域。 场景1&…

【有限元方法】Newton-Raphson Method

Newton-Raphson Method Linear vs Nonlinear Analysis: At this point, we can conduct a linear analysis no problem ∫ ∑ i , j 1 3 σ i j ε i j ∗ d v ∫ t n ⋅ u ∗ d s ∫ ρ b ⋅ u ∗ d v ⇒ ∫ e [ B ] T [ C ] [ B ] d x ⏟ k e u e ∫ ∂ e [ N ] T t n …

2022最新版-李宏毅机器学习深度学习课程-P50 BERT的预训练和微调

模型输入无标签文本&#xff08;Text without annotation&#xff09;&#xff0c;通过消耗大量计算资源预训练&#xff08;Pre-train&#xff09;得到一个可以读懂文本的模型&#xff0c;在遇到有监督的任务是微调&#xff08;Fine-tune&#xff09;即可。 最具代表性是BERT&…

Arcgis连接Postgis数据库(Postgre入门十)

效果 步骤 1、矢量数据首先有在postgis数据库中 这个postgis数据库中的一个空间数据&#xff0c;数据库名称是test3&#xff0c;数据表名称是test 2、Arcgis中连接postgis数据库中 3、成功连接 可以将数据拷贝或导入到gdb数据库中

Python---练习:把8名讲师随机分配到3个教室

案例&#xff1a;把8名讲师随机分配到3个教室 列表嵌套&#xff1a;有3个教室[[],[],[]]&#xff0c;8名讲师[A,B,C,D,E,F,G,H]&#xff0c;将8名讲师随机分配到3个教室中。 分析&#xff1a; 思考1&#xff1a;我们第一间教室、第二间教室、第三间教室&#xff0c;怎么表示…

FPGA运算

算数运算中&#xff0c;输入输出的负数全用补码来表示&#xff0c;例如用三位小数位来表示的定点小数a-1.625和b-1.375。那么原码分别为a6b‘101101, b6b101011, 补码分别是a6’b110011&#xff0c;b6‘b110101&#xff1b; 如果想在fpga中实现a*b&#xff0c;则需要将a和b用补…