flink基础知识
TumblingEventTimeWindows 滚动开窗
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* <p>For example, in order to window into windows of 1 minute:
*
* <pre>{@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
* }</pre>
*/
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long globalOffset;
private Long staggerOffset = null;
private final WindowStagger windowStagger;
protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException(
"TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
this.globalOffset = offset;
this.windowStagger = windowStagger;
}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
* to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
* to time windows based on the element timestamp and offset.
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
* each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
* windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
* China which is using UTC+08:00,and you want a time window with size of one day, and window
* begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
* UTC time.
*
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(
size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
* to time windows based on the element timestamp, offset and a staggering offset, depending on
* the staggering policy.
*
* @param size The size of the generated windows.
* @param offset The globalOffset which window start would be shifted by.
* @param windowStagger The utility that produces staggering offset in runtime.
*/
@PublicEvolving
public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
return new TumblingEventTimeWindows(
size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
SlidingEventTimeWindows 滑动开窗
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
* elements. Windows can possibly overlap.
*
* <p>For example, in order to window into windows of 1 minute, every 10 seconds:
*
* <pre>{@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
* }</pre>
*/
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private final long offset;
protected SlidingEventTimeWindows(long size, long slide, long offset) {
if (Math.abs(offset) >= slide || size <= 0) {
throw new IllegalArgumentException(
"SlidingEventTimeWindows parameters must satisfy "
+ "abs(offset) < slide and size > 0");
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
}
/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
* sliding time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
* time windows based on the element timestamp and offset.
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
* each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
* windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
* China which is using UTC+08:00,and you want a time window with size of one day, and window
* begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
* UTC time.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(
size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
assignTimestampsAndWatermarks 水印
/**
* Assigns timestamps to the elements in the data stream and generates watermarks to signal
* event time progress. The given [[WatermarkStrategy is used to create a [[TimestampAssigner]]
* and [[org.apache.flink.api.common.eventtime.WatermarkGenerator]].
*
* For each event in the data stream, the [[TimestampAssigner#extractTimestamp(Object, long)]]
* method is called to assign an event timestamp.
*
* For each event in the data stream, the
* [[WatermarkGenerator#onEvent(Object, long, WatermarkOutput)]] will be called.
*
* Periodically (defined by the [[ExecutionConfig#getAutoWatermarkInterval()]]), the
* [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be called.
*
* Common watermark generation patterns can be found as static methods in the
* [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class.
*/
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T] = {
val cleanedStrategy = clean(watermarkStrategy)
asScalaStream(stream.assignTimestampsAndWatermarks(cleanedStrategy))
}
allowedLateness 分布式架构
分布式架构,有可能出现数据的乱序,窗口要关闭的时候,数据还没有到,那么窗口等一会再关闭,解决数据的迟到问题。允许处理迟到的数据。
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is [[0L]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
* then an exception is thrown.
*/
@PublicEvolving
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
javaStream.allowedLateness(lateness)
this
}
sideOutputLateData 侧输出流
上面allowedLateness()之后,发现还有没到的,放在侧输出流。将迟到的数据放入侧输出流。
/**
* Send late arriving data to the side output identified by the given [[OutputTag]]. Data
* is considered late after the watermark has passed the end of the window plus the allowed
* lateness set using [[allowedLateness(Time)]].
*
* You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
* resulting from the windowed operation with the same [[OutputTag]].
*/
@PublicEvolving
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
javaStream.sideOutputLateData(outputTag)
this
}
flink实操
(1)
package nj.zb.kb23.api.windows
import java.time.Duration
import nj.zb.kb23.source.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowEventTimeTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
// dataStream.setParallelism()
//设置以事件时间为时间语意
val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
.withTimestampAssigner(
new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
//指定事件时间的字段
element.timestamp * 1000
}
}
)
)
val lataTag = new OutputTag[SensorReading]("lastdata")
val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
// .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
.allowedLateness(Time.minutes(1)) //最长延迟一分钟
.sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
/* val resultStream: DataStream[SensorReading] = windowStream.reduce(
(curReduce, newReducw) => {
SensorReading(curReduce.id, curReduce.timestamp, curReduce.temperature.min(newReducw.temperature))
}
)*/
val resultStream: DataStream[SensorReading] = windowStream.process(new MyEventProcessWindowFunction)
resultStream.print("result:")
resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
env.execute("windowEventTime")
}
}
class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,SensorReading,String,TimeWindow]{
override def process(
key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[SensorReading]): Unit = {
val window: TimeWindow = context.window
println(window.getStart,window.getEnd)
val iterator: Iterator[SensorReading] = elements.iterator
var temp = 100.0
var timestam = 1
out.collect(SensorReading(key,timestam,temp))
// out.collect(SensorReading(key,1,0.0))
}
}
(2)
package nj.zb.kb23.api.windows
import java.time.Duration
import nj.zb.kb23.source.SensorReading
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowEventTimeTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
// dataStream.setParallelism()
//设置以时间时间为时间语意
val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
.withTimestampAssigner(
new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
//指定事件时间的字段
element.timestamp * 1000
}
}
)
)
val lataTag = new OutputTag[SensorReading]("lastdata")
val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
// .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
.allowedLateness(Time.minutes(1)) //最长延迟一分钟
.sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
val resultStream: DataStream[(String, Double, Long, Long)] = windowStream.process(new MyEventProcessWindowFunction)
resultStream.print("result:")
resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
env.execute("windowEventTime")
}
}
class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,(String,Double,Long,Long),String,TimeWindow]{
override def process(
key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Double, Long, Long)]): Unit = {
val window: TimeWindow = context.window
val iterator: Iterator[SensorReading] = elements.iterator
var temp = 100.0
var timestam = 1
while (iterator.hasNext){
val sensorReading: SensorReading = iterator.next()
temp = temp.min(sensorReading.temperature)
}
out.collect((key,temp,window.getStart,window.getEnd))
}
}