文章目录
- 1、概述
- 2、示例
- 2.1、普通函数
- 2.2、富函数
- 2.2.1、获取富函数的运行时上下文
- 3、源码截取
- 3.1、RichFunction
- 3.2、RuntimeContext
1、概述
-
Rich Function,译名富函数,和普通函数相比,多了:
-
生命周期(
open
和close
方法)
获取函数的运行时上下文(getRuntimeContext
方法)
-
本文版本
-
Flink:1.14.6
Java:1.8
Scala:2.12
2、示例
2.1、普通函数
MapFunction
接口 继承了 Function
接口
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class H1 {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
//获取数据源
DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
//普通函数
dss.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer i) {
return i * i;
}
}).print();
//执行
env.execute();
}
}
测试结果
2.2、富函数
RichMapFunction
抽象类 继承了 AbstractRichFunction
抽象类
AbstractRichFunction
抽象类 实现了 RichFunction
接口
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class H1 {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
//获取数据源
DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
//普通函数
dss.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) {
System.out.println("生命周期开始");
}
@Override
public void close() {
System.out.println("生命周期结束");
}
@Override
public Integer map(Integer i) {
return i * i;
}
}).print();
//执行
env.execute();
}
}
测试结果
2.2.1、获取富函数的运行时上下文
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class H1 {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(2);
//获取数据源
DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
//普通函数
dss.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) {
System.out.println("生命周期开始");
//获取运行时上下文
RuntimeContext context = getRuntimeContext();
System.out.println("子任务索引:" + context.getIndexOfThisSubtask());
}
@Override
public void close() {
System.out.println("生命周期结束");
}
@Override
public Integer map(Integer i) {
return i * i;
}
}).print();
//执行
env.execute();
}
}
并行度设置为2,测试结果
3、源码截取
3.1、RichFunction
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;
@Public
public interface RichFunction extends Function {
/** 函数的生命周期 */
void open(Configuration parameters) throws Exception;
void close() throws Exception;
/** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */
RuntimeContext getRuntimeContext();
/** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */
void setRuntimeContext(RuntimeContext t);
}
3.2、RuntimeContext
/**
* RuntimeContext 包含 函数的运行时上下文信息
* 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态
*/
@Public
public interface RuntimeContext {
JobID getJobId();
String getTaskName();
int getIndexOfThisSubtask();
int getAttemptNumber();
String getTaskNameWithSubtasks();
// ------------------------------------ 累加器 -------------------------------------------
<V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);
<V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);
@PublicEvolving
IntCounter getIntCounter(String name);
@PublicEvolving
LongCounter getLongCounter(String name);
@PublicEvolving
DoubleCounter getDoubleCounter(String name);
@PublicEvolving
Histogram getHistogram(String name);
// ---------------------------------- 广播变量 -------------------------------------------
@PublicEvolving
boolean hasBroadcastVariable(String name);
<RT> List<RT> getBroadcastVariable(String name);
<T, C> C getBroadcastVariableWithInitializer(
String name, BroadcastVariableInitializer<T, C> initializer);
// -------------------------- 访问【状态】的方法 --------------------------------
@PublicEvolving
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
@PublicEvolving
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
@PublicEvolving
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
}