1.背景
flink本身给我们提供了大量的内置函数,已经能满足我们绝大部分的需求,但是如果确实是碰到了一些特殊的场景,无法满足我们的需求的时候,我们可以使用自定义函数来解决。
自定义函数大致可以分为标量函数(UDF函数)、表值函数(UDTF函数)、聚合函数(UDAGG函数)和表值聚合函数(UDTAGG函数)。
flink内置函数可通过官网进行查询,不同版本有不同的内置函数,当前我们使用的是Flink1.12,具体查询内置函数地址如下:
Apache Flink 1.12 Documentation: System (Built-in) Functions
2.标量函数(UDF函数)
自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。
定义标量函数
- 必须扩展org.apache.flink.Table.function中的基类ScalarFunction和实现(一个或多个)评估方法。
- 必须公开声明一个评估方法并命名为eval。评估方法的参数类型和返回类型也决定标量函数的参数和返回类型。可以通过实现名为eval的多个评估方法来重载评估方法。评估方法支持变量参数,比如eval(String…str)。评估方法必须是 public 的,而且名字必须是 eval。
示例代码,以下实现的是String类型日期字符串转换成时间戳:
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public class getTimeMillis extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(getTimeMillis.class);
/**
* @param input 输入参数
* @return 返回参数
*/
public Long eval(String input) {
Long output = 0000000000000L;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date date = simpleDateFormat.parse(input);
output = date.getTime();
} catch (Exception e) {
LOG.error(e.toString());
}
return output;
}
}
默认情况下,评估方法的结果类型由Flink的类型提取工具决定。对于基本类型或简单pojo,这已经足够了。但是对于更复杂的、自定义的或复合类型,这可能是错误的。在这些情况下,可以通过通过重写ScalarFunction 的 getResultType()手动定义结果类型的类型信息。
3.生成JAR
在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。TableEnvironment通过调用registerFunction()方法来注册函数。使用第三方平台,可直接导入jar包进行使用。
我使用的是IDEA,IDEA导出JAR包过程如下所示:
1.右键项目,选择“Open Module Settings”;
2.选择Artifacts,点击“+”号,选择JAR,点击From modules with dependencies;
3.无main class可直接点击OK。
4.可根据自己的需要添加外部依赖包,点击OK;
5.项目编写完成后,点击Build ——> Build Artifacts,即可在out目录下看到生成的JAR包。