Flink提供了一些流API,其中包括WindowedStream、DataStream、KeyedStream和AllWindowStream。
🍊WindowedStream是一种特殊的流,其中数据已按时间或数据元素的键进行分组,并且每个分组的数据都在窗口中按时间划分。这意味着,如果你有一个WindowedStream,你可以对每个窗口执行转换,例如聚合或统计。
🍊DataStream是Flink中最基本的流类型,表示一个无界的、有序的数据流。它可以是任何类型的数据,例如数值、字符串或复杂的对象。
🍊KeyedStream是一种特殊的DataStream,其中数据已按照一个键(通常是一个数值或字符串)进行分组。这意味着你可以对每个键执行转换,例如聚合或计数。
🍊AllWindowStream是一种特殊的WindowedStream,其中数据流被分成固定大小的所有窗口。这意味着你可以对整个数据流执行转换,而无需将数据分组。
如下图所示,WindowedStream、DataStream、KeyedStream、AllWindowStream之间的转换
~下面使用代码做一些简单的转换示例,希望能对你有所帮助
如,你可以使用keyBy()函数将DataStream转换为KeyedStream。( DataStream -> KeyedStream)
DataStream<String> dataStream = ...;
//DataStream -> KeyedStream
KeyedStream<String, String> keyedStream = dataStream.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
要将KeyedStream转换为WindowedStream,你可以使用window()函数。例如,以下代码将每个数据元素的键的流分成5秒的滑动窗口( KeyedStream-> WindowedStream):
KeyedStream<String, String> keyedStream = ...;
//KeyedStream-> WindowedStream
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(SlidingTimeWindows.of(Time.seconds(5)));
还可以使用windowAll()函数将DataStream转换为AllWindowStream(DataStream-> AllWindowStream)。例如,以下代码将数据流分成10秒的滑动窗口:
DataStream<String> dataStream = ...;
//DataStream-> AllWindowStream
AllWindowStream<String, TimeWindow> allWindowStream = dataStream.windowAll(SlidingTimeWindows.of(Time.seconds(10)));
你可以使用以下代码将WindowedStream转换为DataStream(WindowedStream-> DataStream):
WindowedStream<T> windowedStream = ...;
DataStream<T> dataStream = windowedStream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)));
你可以使用reduce函数将KeyedStream转换为DataStream(KeyedStream-> DataStream)。例如,假设你有一个整数类型的KeyedStream,并希望将其转换为所有键的和的DataStream,你可以使用以下代码:
KeyedStream<Integer, String> keyedStream = ...;
DataStream<Integer> sumStream = keyedStream.reduce(new ReduceFunction<Integer>() {
public Integer reduce(Integer value1, Integer value2) {
return value1 + value2;
}
});
你可以使用以下代码将DataStream转换为WindowedStream(DataStream-> WindowedStream)。这段代码将DataStream转换为带有滑动窗口的KeyedStream,然后使用window函数将其转换为WindowedStream,最后使用WindowFunction将WindowedStream中的数据进行转换。
DataStream<T> dataStream = ...;
WindowedStream<T, K, TimeWindow> windowedStream = dataStream.keyBy(new KeySelector<T, K>() {
public K getKey(T value) {
// Return the key for the value
}
}).window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)))
.apply(new WindowFunction<T, T, K, TimeWindow>() {
public void apply(K key, TimeWindow window, Iterable<T> values, Collector<T> out) {
for (T value : values) {
out.collect(value);
}
}
});