Flink SQL自定义标量函数(Scalar Function)

news2025/1/12 4:09:38

使用场景: 标量函数即 UDF,⽤于进⼀条数据出⼀条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.ScalarFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中

开发案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

/**
 * 输入数据: 
 * nc -lk 88888
 * a,1
 *
 * 输出结果:
 * res1=>:3> +I[97]
 * res2=>:3> +I[97]
 * res3=>:3> +I[97]
 */
public class ScalarFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, String>> tpStream = source.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String input) throws Exception {
                return new Tuple2<>(input.split(",")[0], input.split(",")[1]);
            }
        });

        Table table = tEnv.fromDataStream(tpStream, "id,name");

        tEnv.createTemporaryView("SourceTable",table);

        // 在 Table API ⾥不经注册直接调⽤函数
        Table res1 = tEnv.from("SourceTable").select(call(HashFunction.class, $("id")));

        // 注册函数
        tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);

        // 在 Table API ⾥调⽤注册好的函数
        Table res2 = tEnv.from("SourceTable").select(call("HashFunction", $("id")));

        // 在 SQL ⾥调⽤注册好的函数
        Table res3 = tEnv.sqlQuery("SELECT HashFunction(id) FROM SourceTable");

        tEnv.toDataStream(res1).print("res1=>");
        tEnv.toDataStream(res2).print("res2=>");
        tEnv.toDataStream(res3).print("res3=>");

        env.execute();
    }

    public static class HashFunction extends ScalarFunction {
        // 接受任意类型输⼊,返回 INT 型输出
        public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
            return o.hashCode();
        }
    }
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1192812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端训练营:1v1私教,帮你拿到满意的offer

Hello&#xff0c;大家好&#xff0c;我是 Sunday。 熟悉我的小伙伴都知道&#xff0c;我最近这几年一直在做前端教育相关的工作。因为这类工作的原因&#xff0c;让我深刻的感受到这几年整个互联网行业的变化。 大量的公司裁员&#xff0c;导致找工作的人急速增加&#xff0…

解压游戏资源,导出游戏模型

游戏中有很多好看的角色&#xff0c;地图等等资源。 你有没有想过&#xff0c;把他们导出到自己的游戏中进行魔改又或则玩换肤等操作呢&#xff1f; 相信很多同学都喜欢拳皇中的角色&#xff0c; 那么我们今天就拿拳皇15举例子&#xff0c;导出他的资源。 首先要先安装好这个…

算法:穷举,暴搜,深搜,回溯,剪枝

文章目录 算法基本思路例题全排列子集全排列II电话号码和字母组合括号生成组合目标和组合总和优美的排列N皇后有效的数独解数独单词搜索黄金矿工不同路径III 总结 算法基本思路 穷举–枚举 画出决策树设计代码 在设计代码的过程中&#xff0c;重点要关心到全局变量&#xff…

软文推广优化技巧:如何写出有创意的文案

今天媒介盒子要给大家分享的干货内容就是&#xff1a;如何写出有创意的文案。 时代背景会改变&#xff0c;大众的趣味焦点也会转移&#xff0c;同样再好的文案也会失效&#xff0c;但文案背后的触发机制不会变。下面是能够使广告文案起作用的关键因素&#xff1a; 一、 研究产…

偶数科技亮相2023中国程序员节——数据库技术高峰论坛

2023年10月24日&#xff0c;由中国软件行业协会主办的“中国程序员节”在北京、深圳、宁波多地同时召开&#xff0c;其中数据库技术高峰论坛在北京举办&#xff0c;偶数科技亮相本次论坛并分享了题为《大模型、实时需求推动湖仓平台走向开放》的主题演讲。 国际局势复杂、科技竞…

面包机上架亚马逊美国站UL1026测试报告办理

面包机&#xff08;又称烤面包机&#xff09;是一种家用电器&#xff0c;用于制作面包、烤饼等食品。在亚马逊美国站销售面包机时&#xff0c;可能需要提供 UL 报告以确保产品安全性。UL1026 是适用于面包机的美国安全标准。 面包机UL1026报告是按照美国国家电气规范NFPA 70所规…

Django生鲜蔬菜采购系统-计算机毕设 附源码 24033

Django生鲜蔬菜采购系统 目 录 摘要 1 绪论 1.1 研究背景 1.2国内外研究现状 1.3论文结构与章节安排 2 生鲜蔬菜采购系统系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 操作可行性分析 2.2 系统流程分析 2.2.1 数据流程 3.3.2 业务流…

Bean_AOP

Bean 源码 https://github.com/cmdch2017/Bean_IOC.git 获取Bean对象 BeanFactory Bean的作用域 第三方Bean需要用Bean注解 比如消息队列项目中&#xff0c;需要用到Json的消息转换器&#xff0c;这是第三方的Bean对象&#xff0c;所以不能用Component&#xff0c;而要用B…

【Mysql】查询mysql的版本

目录 cmd命令查询 mysql -- help(命令&#xff09; mysql -u root -p(命令&#xff09; 数据库管理工具查询 select version(); cmd命令查询 mysql -- help(命令&#xff09; mysql -u root -p(命令&#xff09; 执行该命令并且输入数据库密码 数据库管理工具查询 selec…

C++之旅(学习笔记)第6章 基本操作

C之旅&#xff08;学习笔记&#xff09;第6章 基本操作 6.1 基本操作 class X{ public:X(Sometype); // "普通的构造函数": 创建一个对象X(); // 默认构造函数X(const X&); // 拷贝构造函数X(X&&); // 移动构造函数X& operator(const …

广州华锐互动:VR互动实训内容编辑器助力教育创新升级

随着科技的飞速发展&#xff0c;教育领域也正在经历一场深刻的变革。其中&#xff0c;虚拟现实(VR)技术为教学活动提供了前所未有的便利和可能性。在诸多的VR应用中&#xff0c;VR互动实训内容编辑器无疑是最具潜力和创新性的一种。广州华锐互动开发的这款编辑器以其独特的功能…

HelloGitHub 社区动态,开启新的篇章!

今天这篇文章是 HelloGitHub 社区动态的第一篇文章&#xff0c;所以我想多说两句&#xff0c;聊聊为啥开启这个系列。 我是 2016 年创建的 HelloGitHub&#xff0c;它从最初的一份分享开源项目的月刊&#xff0c;现如今已经成长为 7w Star 的开源项目、1w 用户的开源社区、全网…

Xshell安装+使用教程

简介 Xshell 是一个强大的安全终端模拟软件&#xff0c;它支持SSH1, SSH2, 以及Microsoft Windows 平台的TELNET 协议。Xshell 通过互联网到远程主机的安全连接以及它创新性的设计和特色帮助用户在复杂的网络环境中享受他们的工作。 Xshell可以在Windows界面下用来访问远端不…

linux中使用arthas进行jvm内存分析

1. 安装下载 首先在官方github地址选择合适的版本&#xff0c;下载后上传到对于服务器。 使用unzip arthas-bin.zip 解压文件。进入目录中&#xff0c;执行./install-local.sh进行安装。执行完成后提示succeed&#xff0c;即可使用。 2. 启动 进入目录&#xff0c;执行java…

Python大语言模型实战-利用ChatDev框架自动开发一个游戏软件(附完整教程)

实现功能 ChatDev一个由多智能体协作框架&#xff0c;是一个虚拟软件公司&#xff0c;在人类 “用户” 指定一个具体的任务需求后&#xff0c;不同角色的智能体将进行交互式协同&#xff0c;以生产一个完整软件&#xff08;包括源代码、环境依赖说明书、用户手册等&#xff09…

智汇云舟入选IDC《中国智慧园区解决方案2023年厂商评估》报告

近日&#xff0c;全球领先的市场研究和咨询公司IDC发布报告《中国智慧园区解决方案2023年厂商评估》。报告内&#xff0c;IDC对中国市场具有代表性、且符合评估入围门槛要求的智慧园区解决方案厂商进行了综合评估。智汇云舟凭借在产品、技术等方面的综合优势&#xff0c;与大华…

从事人力资源相关工作,必须要有人力资源证书吗?

人力资源证书不是HR必备&#xff0c;但高含金量的HR证书确实是个加分项&#xff0c;有时候门槛有时候就是一证之隔。 作为人力资源从业者或者打算从事人力行业的同学&#xff0c;如果有意向考证的不妨看看&#xff0c;有哪些证书可以让你真正学以致用&#xff1f;哪些证书可以…

伦敦金冬令时开市时间怎样调整

在刚刚过去的一周&#xff0c;欧美的金融市场已经正式进入了冬令时&#xff0c;这对伦敦金市场的交易时间也产生了影响。由于美国于今年11月5日(星期日&#xff09;开始正式实施冬令时间&#xff0c;所以香港的伦敦金平台的交易时间也随之而有所调整。 从今年11月6日开始&#…

makefile的基础使用

1、建一个目录: mkdir Makefile/makefile(两个任意一个就可以) 2、用vim打开 3、在makefile里面的写法&#xff1a; 目标文件 : 依赖文件 >小例子: test:test.c [tab]依赖关系 gcc -o test test.c 4、…

win10 下 ros + Qt 工程CMakeLists.txt

win10 下 ros Qt 工程CMakeLists.txt 系统&#xff1a;win10 ros: melodic Qt: 5.12.12 源码目录: D:\workspace\catkin_qt 示例代码 https://github.com/ncnynl/ros-qt.git 由于示例代码是Qt4 &#xff0c;目前我是用QT5,所以CMakeLists.txt 修改如下 CMakeLists.txt #####…