转换算子(Transformation)
数据源读入数据之后,就是各种转换算子的操作,将一个或者多个DataSream转换为新的DataSteam,并且Flink可以针对一条流进行转换处理,也可以进行分流或者河流等多流转换操作,从而组成复杂的数据流拓扑。
1. 基本转换算子
这里介绍的都是最基本的转换算子,在官方文档会有更多的算子介绍
1.1 Map(映射)
------- | 内容 |
---|---|
描述: | 在 DataStream 上应用映射转换。转换为DataStream 的每个元素调用一个 MapFunction 。每个 MapFunction 调用只返回一个元素。用户还可以扩展 RichMapFunction 以访问org.apache.flink.api.common.functions.RichFunction 接口提供的其他功能。 |
参数: | DataStream 的每个元素调用的 MapFunction 。 |
返回值: | SingleOutputStreamOperator 转换后的数据流 |
总结: | 数据转换,一一映射 |
图示:
源码:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType =
TypeExtractor.getMapReturnTypes(
clean(mapper), getType(), Utils.getCallLocationName(), true);
return map(mapper, outType);
}
public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
可以看到参数是一个MapFunction
,然后通过实现的map方法去一一返回对应的元素。
实例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 从集合中读取数据
ArrayList<String> list = new ArrayList<>();
list.add("one");
list.add("two");
list.add("three");
// 3. 读取数据
DataStreamSource<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
// 4. map操作
SingleOutputStreamOperator<String> mapStreamOperator = stringDataStreamSource.map(string -> string + " yes");
mapStreamOperator.print();
// 5. 执行程序
env.execute();
}
这里的public class SingleOutputStreamOperator<T> extends DataStream<T> {}
可以看出map 是将一个 DataStream 转换成另一个 DataStream 是完全正确的。
1.2 filter(过滤)
------- | 内容 |
---|---|
描述: | 对 DataStream 应用过滤器转换。转换为 DataStream 的每个元素调用 FilterFunction ,并仅保留函数返回 true 的那些元素。过滤函数返回 false 的元素。用户还可以扩展 RichFilterFunction 以访问 org.apache.flink.api.common.functions.RichFunction 接口提供的其他功能。 |
参数: | 为 DataStream 的每个元素调用的 FilterFunction |
返回值: | SingleOutputStreamOperator 转换后的数据流 |
总结: | 筛选数据 |
图示:
源码:
public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
}
这里的参数是一个FilterFunction
,然后通过实现的filter
方法判断是否返回改元素。
实例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 从集合中读取数据
ArrayList<String> list = new ArrayList<>();
list.add("one");
list.add("two");
list.add("three");
// 3. 读取数据
DataStreamSource<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
// 4. filter操作
SingleOutputStreamOperator<String> filterStreamOperator = stringDataStreamSource.filter(string -> string.contains("o"));
filterStreamOperator.print();
// 5. 执行程序
env.execute();
}
进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。
1.3 FlatMap(扁平映射)
------- | 内容 |
---|---|
描述: | 在 DataStream 上应用 `FlatMap 转换。转换为 DataStream 的每个元素调用 FlatMapFunction。每个 FlatMapFunction 调用都可以返回任意数量的元素,包括无元素。用户还可以扩展 RichFlatMapFunction 以访问 org.apache.flink.api.common.functions.RichFunction 接口提供的其他功能。 |
参数: | 为 DataStream 的每个元素调用的 FlatMapFunction |
返回值: | SingleOutputStreamOperator 转换后的数据流 |
总结: | 是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素 |
图示:
源码:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType =
TypeExtractor.getFlatMapReturnTypes(
clean(flatMapper), getType(), Utils.getCallLocationName(), true);
return flatMap(flatMapper, outType);
}
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
这里的参数是一个FlatMapFunction
,然后通过实现的flatMap
方法来处理返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。
实例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 从集合中读取数据
ArrayList<String> list = new ArrayList<>();
list.add("one,yes");
list.add("two,no");
list.add("three");
// 3. 读取数据
DataStreamSource<String> stringDataStreamSource = env.fromCollection(list, BasicTypeInfo.STRING_TYPE_INFO);
// 4. flatMap操作
SingleOutputStreamOperator<String> flatMapStreamOperator = stringDataStreamSource.flatMap((FlatMapFunction<String, String>) (s, collector) -> {
if (s.contains(",")) {
for (String str : s.split(",")) {
collector.collect(str);
}
} else {
collector.collect(s + ", go");
}
}).returns(BasicTypeInfo.STRING_TYPE_INFO);
// 这里的returns要指定返回类型 因为类型擦出不知道Collector返回的泛型是什么类型
flatMapStreamOperator.print();
// 5. 执行程序
env.execute();
}
要注意要指定返回类型.returns(BasicTypeInfo.STRING_TYPE_INFO);
2.聚合算子(Aggregation)
基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。
2.1 KeyBy(按键分区)
对于Flink而言,DataStream是没有直接进行聚合的API的,因此对海量数据处理做聚合一定要先做分区处理,这样才能提高效率。而分区就是通过KeyBy
来完成的。
标题 | 内容 |
---|---|
描述: | 它创建一个新的 KeyedStream ,它使用提供的Key 来划分其操作员状态。 |
参数: | 用于提取分区键的 KeySelector |
返回值: | 具有分区状态的 DataStream (即 KeyedStream ) |
总结: | 根据Key 的hashCode 方法进行分区 |
注意: | 以下情况,一个类不能作为 key:1.它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于Object.hashCode() 实现。2.它是任意类的数组。 |
图示:
基于不同的 key
,流中的数据将被分配到不同的分区中去,如图 5-8 所示;这样一来,所
有具有相同的 key
的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot
中进行处理了。在内部,是通过计算key
的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key
如果是 POJO
的话,必须要重写 hashCode()
方法。
源码:
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(keyType);
return new KeyedStream<>(this, clean(key), keyType);
}
这里的参数是一个KeySelector
,然后通过实现的getKey
方法返回指定的Key
值来分区。需要注意的是KeyBy
的结果不在是DataStream
,而是将DataStream
转换为KeyedStream
。KeyedStream
可以认为是“分区流”
或者“键控流”
,它是对 DataStream
按照key
的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key
的类型。
实例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. keyBy操作
KeyedStream<Event, String> keyedStream = eventDataStreamSource.keyBy(event -> event.user);
keyedStream.print();
// 5. 执行程序
env.execute();
}
KeyedStream
也继承自 DataStream
,所以基于它的操作也都归属于 DataStream API
。但它跟之前的转换操作得到的SingleOutputStreamOperator
不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream
是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum
,reduce
);而且它可以将当前算子任务的状态(state
)也按照 key
进行划分、限定为仅对当前key
有效。
2.2 简单聚合操作
KeyedStream
提供了很多种简单的聚合操作,比如求和,求最大值等,主要有以下几种:
聚合操作名称 | 简介 |
---|---|
sum() | 在输入流上,对指定的字段做叠加求和的操作。 |
min() | 在输入流上,对指定的字段求最小值。 |
max() | 在输入流上,对指定的字段求最大值。 |
minBy() | 与 min() 类似,在输入流上针对指定字段求最小值。不同的是,min() 只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy() 则会返回包含字段最小值的整条数据。 |
maxBy() | 与 max() 类似,在输入流上针对指定字段求最大值。两者区别与min() /minBy() 完全一致。 |
这些简单聚合函数的参数很简单,不需要自定义函数,只需要说明聚合指定的字段就可以,指定字段的方式有两种,指定位置
和指定名称
对于元祖类型,如果指定字段名,需要这样写字段名f0,f1
,类似于这种stream.keyBy(r -> r.f0).max("f1").print();
对于POJO类,只能通过通过字段名称来指定,不能通过位置来指定,类似这种stream.keyBy(e -> e.user).max("timestamp")
public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
return aggregate(
new ComparableAggregator<>(
positionToMaxBy,
getType(),
AggregationFunction.AggregationType.MAXBY,
first,
getExecutionConfig()));
}
简单聚合算子返回的是SingleOutputStreamOperator
,从KeyStream
又转换成了常规的DataStream
,所以可以理解为KeyBy
和聚合是成对出现的,先分区后聚合,得到的依然是一个DataStream
。经过简单聚合之后的数据流,元素的类型是保持不变的。
一个聚合算子,会为每一个Key
保存一个聚合的值,在Flink中这称为状态(state)
,每当有一个新的数据输入,算子就会更新保存聚合结果,并发送一个更新后的聚合值的事件到下游算子。
对于无界流来说,这个状态是永远不会被清除的,所以使用聚合算子应该只用在有限个Key的数据流上。
实例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. keyBy操作
SingleOutputStreamOperator<Event> timestamp1 = eventDataStreamSource.keyBy(event -> event.user).max("timestamp");
SingleOutputStreamOperator<Event> timestamp2 = eventDataStreamSource.keyBy(event -> event.user).maxBy("timestamp");
// 这里的returns要指定返回类型 因为类型擦出不知道Collector返回的泛型是什么类型
timestamp1.print("max:");
timestamp2.print("maxBy:");
// 5. 执行程序
env.execute();
}
注意这里的Max和MaxBy Max只返回指定的字段取最大值,MaxBy返回指定字段的最大的整条记录。
2.3 Reduce(归约聚合)
简单聚合是对一些特定需求的实现,呢么reduce
算子就是一个一般化的聚合统计操作,reduce
是对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算
与简单聚合类似,reduce
也是将KeyStream
转换为DataStream
,不会改变流的元素数据,所以输入输出都是一样的
标题 | 内容 |
---|---|
描述: | 对按给定键位置分组的分组数据流应用减少转换。 ReduceFunction 将根据键值接收输入值。只有具有相同键的输入值才会进入相同的 reducer 。 |
参数: | 将为具有相同键的输入值的每个元素调用的 ReduceFunction 。 |
返回值: | 转换后的数据流 |
总结: | 对已有数据进行归约处理,把每一个新输入的数据和当前已经归约的值在做一次聚合运算 |
与简单聚合类似,reduce
操作也是将KeyedStream
转换为DataStream
,reduce
不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
源码
:
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
ReduceTransformation<T, KEY> reduce =
new ReduceTransformation<>(
"Keyed Reduce",
environment.getParallelism(),
transformation,
clean(reducer),
keySelector,
getKeyType());
getExecutionEnvironment().addOperator(reduce);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), reduce);
}
调用KeyedStream
的reduce
方法时,需要传入一个参数,实现ReduceFunction
接口的reduce
方法,接口源码如下:
@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T var1, T var2) throws Exception;
}
处理过程类似下图,这个方法接收两个参数,经过转换处理之后输出同一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后在将合并的结果看做一个数据,在跟后面的数据合并,最终简化成唯一的一个数据。
ReduceFunction
内部会维护一个初始值为空的累加器,累加器的类型和输入数据的类型一致,当第一条数据到来时,累加器更新为第一条数据的值,当新数据到来时,新元素就和累加器进行累加操作,然后将更新后的累加器的值向下游输出。
实例
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. 统计访问频率
SingleOutputStreamOperator<Tuple2<String, Long>> clickDataStreamSource = eventDataStreamSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.user, 1L);
}
}).keyBy(data -> data.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
});
// 5. 找到最大的频率
SingleOutputStreamOperator<Tuple2<String, Long>> maxClickStreamSource = clickDataStreamSource.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
return t1.f1 > t2.f1 ? t1 : t2;
}
});
// 6. 输出数据
maxClickStreamSource.print();
// 7. 执行程序
env.execute();
}
3. 用户自定义函数UDF
大多数操作都需要用户自定义 function,至于什么是自定义函数?我们可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是谓的用户自定义函数(user-defined function,UDF)
3.1 Function Classes(函数类)
Flink暴露了所有的UDF函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。所以最简单的方式就是自定义一个函数类,实现对应的接口即可。
示例
:
class MyMapFunction implements MapFunction<String, String > {
public String map(String value) { return value.startsWith("https") ? value.concat("_https") : value.concat("_http"); }
}
data.map(new MyMapFunction());
更丰富一点的方式,也可以参加一点构造函数
class MyMapFunction implements MapFunction<String, String> {
private final String defaultStr;
public MyMapFunction(String defaultStr) {
this.defaultStr = defaultStr;
}
public String map(String value) { return value.startsWith(defaultStr) ? value.concat("_https") : value.concat("_http"); }
}
data.map(new MyMapFunction("https"));
3.2 Lambda(匿名函数)
Java8就已经支持Lambda表达式了,Flink的所有算子都支持Lambda表达式来编码,但是当Lambda表达式使用Java泛型的时候,需要显式声明类型信息。
示例
:
// 匿名类
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
// Lambda碰到泛型擦除的时候,需要指定returns
data.map(event -> event.user);
如果碰到flatmap
则必须指定returns
,map
操作如果是简单类型就可以不用指定,但是碰到复杂数据类型或者POJO就需要指定returns
3.3 Rich Function Classes(富函数类)
富函数类也是DataStreamAPI提供的一个函数类接口,所有的Flink函数类都有Rich版本。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。
@Public
public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
private static final long serialVersionUID = 1L;
public RichMapFunction() {
}
public abstract OUT map(IN var1) throws Exception;
}
富函数类会比常规函数类提供更多的功能,
- 富函数类可以获取运行环境的上下文getRuntimeContext 和setRuntimeContext。
- 富函数类拥有生命周期方法,提供了open、close方法
对于富函数类有生命周期的概念。典型的生命周期方法有:
- open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当
一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调
用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的
工作,都适合在 open()方法中完成。。 - close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一
些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,
实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。
示例
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. map操作
SingleOutputStreamOperator<String> userSingleOutputStream = eventDataStreamSource.map(new RichMapFunction<Event, String>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("这是 open方法, 索引为 " + getRuntimeContext().getIndexOfThisSubtask() + "的任务开始了");
}
@Override
public void close() throws Exception {
super.close();
System.out.println("这是 close, 索引为 " + getRuntimeContext().getIndexOfThisSubtask() + "的任务结束了");
}
@Override
public String map(Event event) throws Exception {
return event.user;
}
});
// 5. 打印
userSingleOutputStream.print();
// 6. 执行程序
env.execute();
}
打印结果
:
这是 open方法, 索引为 0的任务开始了
ming
xiaohu
xiaohu
gala
ming
xiaohu
这是 close, 索引为 0的任务结束了
4. 物理分区
分区
操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步处理。
之前使用的keyBy
是一种按照键的哈希值来重新分区的操作,只不过这种分区操作只能保证把数据按Key
分开,至于分得均不均匀,每个key的数据具体分到哪一个区,这些都是无从控制的,所以KeyBy
是一种逻辑分区
操作。
KeyBy
是一种软分区
,Flink还有一种物理分区
,是真正控制分区策略,精准地调整数据,告诉每个数据到底去哪。在Flink任务过程中,当我们设置多个处理任务并设置了不同的并行度,当数据执行的上下游任务并行度发生变化时,系统会自动地将数据均匀的发往下游的所有并行任务,保证各个分区的负载均衡。
但是有些时候需要我们手动控制数据分区分配策略。比如当数据发生数据倾斜的时候,系统无法调整,就需要我们进行干预重新进行负载均衡,将数据流较为平均的发送到下游任务中去。Flink为提供了多种操作接口帮助我们实现数据流的手动重分区。这种操作叫做物理分区操作
物理分区和keyBy
的一大区别在于keyBy得到的是一个KeyedStream
,而物理分区结果之后还是DataStream
,并且流中元素数据类型保持不变。分区算子并不对数据进行转换处理,只是定义了数据的传输方式。
常见的物理分区策略有有随机分配(shuffle)
、轮询分配(Round-Robin)
、重缩放(Rescale)
和广播(Broadcast)
,
4.1 随机分区(shuffle)
标题 | 描述 |
---|---|
简介: | 随机分区就是洗牌,将数据随机地分到下游算子的并行任务中去,随机分区服从均匀分布,会把数据流中的数据随机打乱,均匀地传递到下游任务分区 |
特点 : | 完全随机,均分分布; 经过随机分区之后,得到的依然是一个 DataStream。 |
调用方法: | DataStream.shuffle() 方法 |
图解
:
示例代码
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
list.add(new Event("xiaohu2","www.baidu8.com",5500L));
list.add(new Event("xiaohu3","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. map操作
eventDataStreamSource.shuffle().print("shuffle").setParallelism(4);
// 5. 执行程序
env.execute();
}
多执行几次,结果看起来是没有规律的,并且也不是均匀处理相同个数的,其实数据量多起来的话,结果就近似于每个下游算子处理的任务数相同了。
4.2 轮询分配(Round-Robin)
标题 | 描述 |
---|---|
简介: | 轮询分区是常见的一种重分区方式,可以看做是发牌 ,按照先后顺序将数据一次分发。 |
特点 : | 按照顺序,均分分布; 经过轮询分区之后,得到的依然是一个 DataStream。 |
调用方法: | DataStream.rebalance() 方法 ,rebalance使用的是 Round-Robin 负载均衡算法 |
注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。
图解
:
示例代码
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",0L));
list.add(new Event("xiaohu","www.baidu2.com",1L));
list.add(new Event("xiaohu","www.baidu5.com",2L));
list.add(new Event("gala","www.baidu6.com",3L));
list.add(new Event("ming","www.baidu7.com",4L));
list.add(new Event("xiaohu","www.baidu8.com",5L));
list.add(new Event("xiaohu2","www.baidu8.com",6L));
list.add(new Event("xiaohu3","www.baidu8.com",7L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. map操作
eventDataStreamSource.rebalance().print("shuffle").setParallelism(4);
// 5. 执行程序
env.execute();
}
打印结果
:
shuffle:2> Event{user='gala', url='www.baidu6.com', timestamp=1970-01-01 08:00:00.003}
shuffle:3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:00.0}
shuffle:2> Event{user='xiaohu3', url='www.baidu8.com', timestamp=1970-01-01 08:00:00.007}
shuffle:4> Event{user='xiaohu', url='www.baidu2.com', timestamp=1970-01-01 08:00:00.001}
shuffle:1> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:00.002}
shuffle:3> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:00.004}
shuffle:4> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:00.005}
shuffle:1> Event{user='xiaohu2', url='www.baidu8.com', timestamp=1970-01-01 08:00:00.006}
看着好像不是按照顺序,其实按照时间排序之后再看,顺序是3412
4.3 重缩放分区(rescale)
标题 | 描述 |
---|---|
简介: | 重缩放分区和轮询分区很相似,重缩放也是使用的Round-Robin算法轮询,只不过是将数据轮询发送到下游算子并行任务的一部分中,也就是类似于只会在自己的小团体中进行轮询。 |
特点 : | 团体内部,轮询分区 |
调用方法: | DataStream.rescale() 方法 ,rescale使用的是 Round-Robin 负载均衡算法 |
图解
:
这里说一下当下游任务的数量是上游任务数量的整数倍时,rescale的效率会更高一些。rescale和rebalance的区别在于:
rebalance
是所有分区数据的重新平衡,当TaskManger
数据量较多时,这种跨节点的网络传输必然影响效率,如果配置的task slot
数量合适,rescale
方式进行局部重缩放,就可以让数据只在当前TaskManger
的多个slot
之间重新分配,从而避免网络传输带来的损耗。- 底层区别是
rescale
和rebalance
在于任务之间的连接机制不同,rebalance
会针对所有上游任务和所有下游任务之间建立通信通道,呈现笛卡尔积的关系,而rescale
仅仅针对每一个任务以及通过某种方式得到的分组内的任务之间建立通信,节省很多资源。
示例代码
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",0L));
list.add(new Event("xiaohu","www.baidu2.com",1L));
list.add(new Event("xiaohu","www.baidu5.com",2L));
list.add(new Event("gala","www.baidu6.com",3L));
list.add(new Event("ming","www.baidu7.com",4L));
list.add(new Event("xiaohu","www.baidu8.com",5L));
list.add(new Event("xiaohu2","www.baidu8.com",6L));
list.add(new Event("xiaohu3","www.baidu8.com",7L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 4. map操作
eventDataStreamSource.rescale().print("shuffle").setParallelism(4);
// 5. 执行程序
env.execute();
}
这里的打印结果其实是1234,为什么? 因为我的数据输入使用的fromCollection 本身就是只有一个任务,所以下游任务的4个 都是一组,如果换成addSource效果会好一些。
示例:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 读取数据
DataStreamSource<Integer> integerDataStreamSource = env.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for (int i = 0; i< 10 ;i ++) {
// 将奇数发送到索引为 1 的并行子任务
// 将偶数发送到索引为 0 的并行子任务
if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
ctx.collect(i + 1);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2);
// 3. map操作
integerDataStreamSource.rescale().print("shuffle").setParallelism(4);
// 4. 执行程序
env.execute();
}
输出结果
:
// 手动排序后的
shuffle:3> 1
shuffle:4> 3
shuffle:3> 5
shuffle:4> 7
shuffle:3> 9
shuffle:1> 2
shuffle:2> 4
shuffle:1> 6
shuffle:2> 8
shuffle:1> 10
可以看出来,1和2是一组,3和4是一组
4.4 广播(Broadcast)
标题 | 描述 |
---|---|
简介: | 广播方式不算重分区,使用广播方式后,数据会在不同的分区都保留一份,可能进行重复处理。 |
特点 : | 所有分区都保留一份 |
调用方法: | DataStream.broadcast() 方法 |
示例代码
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("ming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("ming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
// 3. map操作
eventDataStreamSource.broadcast().print("shuffle").setParallelism(2);
// 4. 执行程序
env.execute();
}
这里设置了2个并行度,所以最后会有16条数据输出。
4.5 全局分区(global)
全局分区是一种特殊分区,这种分区非常极端,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。等于强行让下游任务并行度变为1,所以使用这个需谨慎。使用方式
eventDataStreamSource.global().print("shuffle").setParallelism(2);
4.6 自定义分区(Custom)
当Flink提供的分区策略不能满足我们的要求时,还可以通过使用partitionCustom()方法来自定义分区策略
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
}
参数说明:
partitioner
:自定义分区器(Partitioner)对象keySelector
:对DataStream
进行分区的KeySelector
。应用分区器的字段,指定方式与KeyBy
指定key
基本一样,可以通过字段名
指定也可以通过字段位置索引
来指定。
示例
:
public static void main(String[] args) throws Exception {
// 1. 直接调用getExecutionEnvironment 方法,底层源码可以自由判断是本地执行环境还是集群的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 从集合中读取数据
ArrayList<Event> list = new ArrayList<>();
list.add(new Event("xiaoming","www.baidu1.com",1200L));
list.add(new Event("xiaohu","www.baidu2.com",1200L));
list.add(new Event("xiaohu","www.baidu5.com",1267L));
list.add(new Event("gala","www.baidu6.com",1200L));
list.add(new Event("xiaoming","www.baidu7.com",4200L));
list.add(new Event("xiaohu","www.baidu8.com",5500L));
// 3. 读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(list, BasicTypeInfo.of(Event.class));
eventDataStreamSource.partitionCustom(new Partitioner<Event>() {
@Override
public int partition(Event event, int numPartitions) {
if (event != null && event.getUser().contains("xiaohu")) {
return 0;
} else {
return 1;
}
}
}, new KeySelector<Event, Event>() {
@Override
public Event getKey(Event event) throws Exception {
if (event.getUser().contains("x")) {
return event;
} else {
return null;
}
}
}).print().setParallelism(2);
// 4. 执行程序
env.execute();
}
结果就是只有 0 输出xiaohu