一、说明
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())
时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。
二、思路
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
2.我们传递给window函数的对象叫窗口分配器.
三、数据准备
准备一个WaterSensor类方便演示
package com.lyh.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
四、代码
package com.lyh.flink07;
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class Window_s {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("hadoop100",9999)
.map(line -> {
String[] data = line.split(",");
return new WaterSensor(
data[0],
Long.valueOf(data[1]),
Integer.valueOf(data[2])
);
})
.keyBy(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {
@Override
public void process(String key,
Context ctx,
Iterable<WaterSensor> elements,
Collector<String> out) throws Exception {
List<WaterSensor> list = toList(elements);
long starttime = ctx.window().getStart();
long endtime = ctx.window().getEnd();
out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);
}
}).print();
env.execute();
}
private static <T>List<T> toList(Iterable<T> it) {
List<T> list = new ArrayList<>();
for (T t : it) {
list.add(t);
}
return list;
}
}
五、结果
在hadoop100 服务器
输入nc -lk 999
消费结果: