文章目录
- Flink四大基石
- 一、Flink的四大基石
- 1. Checkpoint
- 2. State
- 3. Time
- 4. Window
- 二、案例
- 1.需求
- 2.代码实现
- 3.运行,查看结果
- 4.增加需求2的实现
- 5.重启程序,查看结果
Flink四大基石
一、Flink的四大基石
Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。
1. Checkpoint
这是Flink最重要的一个特性。
Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。
Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。
Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。
https://zhuanlan.zhihu.com/p/53482103
2. State
提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。
3. Time
除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。
4. Window
另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
二、案例
官网API:https://nightlies.apache.org/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
1.需求
基于时间的滚动和滑动窗口
nc -lk 9999
有如下的数据表示:
信号灯的编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5
需求1:每10秒钟统计一次,最近10秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滚动窗口
需求2:每10秒钟统计一次,最近20秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滑动窗口
2.代码实现
package cn.edu.hgu.bigdata20.flink.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* description:flink的Window演示
* author 王
* date 2023/04/20
*/
public class FlinkWindowDemo {
public static void main(String[] args) throws Exception {
// 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.source
DataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);
// 3. Transformation
//将9,3转为CartInfo(9,3)
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口有数据)
SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum("count");
//4.Sink
result.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
3.运行,查看结果
输入数据:
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5
4.增加需求2的实现
package cn.edu.hgu.bigdata20.flink.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* description:flink的Window演示
* author 王
* date 2023/04/20
*/
public class FlinkWindowDemo {
public static void main(String[] args) throws Exception {
// 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.source
DataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);
// 3. Transformation
//将9,3转为CartInfo(9,3)
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求1:每10秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum("count");
// 需求2:每10秒钟统计一次,最近20秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)
.window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10)))
.sum("count");
//4.Sink
//result.print();
result1.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
5.重启程序,查看结果
输入数据:
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5
9,3
9,2
9,7
4,9
2,6
1,5
结果: