1、Apache Flink
是一个实时计算的框架和分布式处理引擎,用于在无边界喝有边界数据流上进行有状态的计算,并且能够在常见的集群上运行,并能以内存速度和任意规模进行计算。
有边界数据流:指的是有开始,也有结束,类似于批处理数据。
无边界数据流:指的是有开始,没有结束,类似于流处理数据。
配置flink的本地模式:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
2、Flink的特性:
Flink是一种流批一体的,就是一套Flink的代码就可以进行流处理,也可以进行批处理。
1、批处理,可以处理历史的数据集
2、流处理,可以处理实时的数据流
3、事件驱动应用,监控事件的服务。
4、支持具有反压功能的持续流模型。反压:指的是处理的数据远高于Flink最大的处理数据量,就会出现反压,此时可能会有延迟。
5、支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
3、Apache Flink的组件栈
1、Deployment层:主要涉及Flink底层的部署模式
2、Runtime层:提供了支持Flink计算的全部核心实现
3、API&Libraries层:
a、API实现了面向无界Stream的流处理和面向Batch的批处理API
b、Libraries在API层之上构建的满足特定应用的实现计算框架
4、
第一个演示案例:使用案例WordCount来举例
1、无界流(流处理统计wc):
public class Demo01WordCount {
public static void main(String[] args) throws Exception{
// //构建Flink的环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// //读取数据,使用socket进行模拟实时的数据,nc -lk 8888
// DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
// //打印结果
// lineDS.print();
// //启动Flink
// env.execute();
//需求:使用Flink实时处理WordCount
//构建Flink的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置Flink的并行度
env.setParallelism(2);
//因为在Flink中,在shuffle阶段的时候,数据会在200毫秒或者是达到32kb的时候会将数据大打包,传输给下游
//对于达到的时间是可以设置的
env.setBufferTimeout(0);//设置成0表示的每一毫秒都在传输数据到下游。
//使用socket模拟实时的数据,nc -lk 8888
DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
//首先需要将一行数据变成多行数据:
/**
* 有底层的源码:
* public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper)
*
* public interface FlatMapFunction<T, O> extends Function, Serializable
*
* void flatMap(T value, Collector<O> out) throws Exception;
*
* 需要实现flatmap的方法
*
* public class SingleOutputStreamOperator<T> extends DataStream<T>
* 又源码,可以将SingleOutputStreamOperator转化成DataStream,可以简化代码
*
*/
//将一行的数据变成多行的数据:
DataStream<String> wordDS = lineDS.flatMap( (line, out) -> {
//将接受的数据按照进行分割
String[] word = line.split(",");
//循环的将数据传递给下游
for (String s : word) {
//将切分好的数据传递给下游
out.collect(s);
}
}, Types.STRING); //指定返回的数据类型,否则会报错
//将接受的数据转化成kv的形式,因为使用的是java,并不似scala,所以是没有元组的概念,但是Flink提供了一种Tuple2.of(word, 1),构建kv形式的数据。
/**
* 在使用map的时候,通过观察底层的源码,发现使用的方法和flatmap的使用的方法是基本一致
* 还是要实现方法
* 可以使用的lambda表达式
*/
DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
//按照相同的key机型分组,使用的是KeyBy
KeyedStream<Tuple2<String, Integer>, String> keybyDS = kvDS.keyBy(kv -> kv.f0);
//统计单词的数量,这里的1指的是下标的位置
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = keybyDS.sum(1);
//打印
countDS.print();
//启动Flink
env.execute();
}
}
2、有节流(批处理统wc):
public class Demo02WordCountBatch {
public static void main(String[] args) throws Exception{
//构建一个Flink的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置Flink的并行度
env.setParallelism(2);
//读取数据文件
DataStreamSource<String> lineDS = env.readTextFile("flink/data/words.txt");
/**
* 此时默认的是使用的流处理的模式,展示的是所以的计算的过程,也会将最终的结果展示出来
* 但是Flink可以进行批处理,因为批处理只展示最终的结果,所以在使用Flink做有界流的时候需要将模式换成批处理的模式,
* 这样可以只展示最终的结果
*/
/*
* 设置flink处理模式
* BATCH
* 1、输出最终结果
* 2、底层是MapReduce模型
* 2、批处理模式只能用于处理有界流,不能用于处理无界流
* STREAMING
* 1、输出连续结果
* 2、持续流模型
* 3、可以用于处理有界流和无界流
*
* 流批统一: 同一套api既能用于流处理也能用于批处理
*/
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//将获取的数据展平,进一出多
DataStream<String> wordDS = lineDS.flatMap((line, out) ->
{
String[] split = line.split(",");
//将切分的数据循环的发送给下游
for (String word : split) {
out.collect(word);
}
}, Types.STRING);
//将获得的数据变成kv的形式
SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1),Types.TUPLE(Types.STRING, Types.INT));
//按照设置好的kv形式的数据进行分组
KeyedStream<Tuple2<String, Integer>, String> keyDS = kvDS.keyBy(kv -> kv.f0);
//进行求和
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = keyDS.sum(1);
//打印数据
countDS.print();
//启动Flink
env.execute();
}
}
Flink的处理模式,可以分成两种:
/*
* 设置flink处理模式
* BATCH
* 1、输出最终结果
* 2、底层是MapReduce模型
* 2、批处理模式只能用于处理有界流,不能用于处理无界流
* STREAMING
* 1、输出连续结果
* 2、持续流模型
* 3、可以用于处理有界流和无界流
*
* 流批统一: 同一套api既能用于流处理也能用于批处理
*/
5、Flink的底层原理:
1、Flink与Spark的底层区别:
Spark的底层是MapReduce模型,所以在执行任务的时候就会分成两个阶段,一个是MapTask阶段,还有一个就是ReduceTask阶段,分阶段执行,这样的好处是可以在Map阶段进行预聚合,从而可以减少shuffle阶段传输的数据量,但是因为如此,导致Spark不能做实时任务,因为Spark需要在map阶段进行预聚合,导致延迟怎增加,从而不适合做实时。
对于Flink来说,底层是持续流模型,这个模型会使处于上游和下游的task都同时启动,在执行任务的时候,数据就像流一样进行传输,但是需要注意的是Flink在做批处理的时候,底层也是Mapreduce模型。
总之二者的主要的区别是任务调度的时候不同,就是在执行task任务的方式不同。
6、DataFlows:Source、Transformation、Sink
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。
7、DataStream API Source,读取文件
在使用Flink读取文件的时候需要进行导包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.15.2</version>
</dependency>
1、基于本地集合的source:
public class Demo02ListSource {
public static void main(String[] args) throws Exception{
//构建flink的运行环境:
/**
* 数据的来源是来自集合
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据,数据来源是一个集合
//创建一个集合
ArrayList<Object> list = new ArrayList<>();
list.add("java");
list.add("java");
list.add("java");
list.add("java");
list.add("java");
list.add("java");
DataStreamSource<Object> listDS = env.fromCollection(list);
//打印数据
listDS.print();
//启动flink
env.execute();
}
}
2、基于文件的source:
public class Demo01FileSource {
public static void main(String[] args) throws Exception {
//构建flink的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件数据:
//老版本读取文件:--有界流
DataStreamSource<String> stuDS = env.readTextFile("flink/data/students.txt");
/**
* 新版本读取文件的方式
* 可以有界流读取文件、
* 可以无界流读取文件,增加定时监控参数
*/
FileSource<String> filesource =
FileSource.forRecordStreamFormat(
//读取文件的格式
new TextLineInputFormat(),
//指定文件的路径
new Path("spark/data/word")
)
//定时监控目录读取目录下新的文件。得到一个无界流
.monitorContinuously(Duration.ofSeconds(5))
.build();
//将其转换成dataStream
DataStreamSource<String> wordDS = env.fromSource(filesource, WatermarkStrategy.noWatermarks(), "filesource");
//打印数据:
stuDS.print();
//启动flink
env.execute();
}
}
3、基于网络套接字的source:
public class Demo03SocketSource {
public static void main(String[] args) throws Exception {
/**
* 数据源使用的socket的方式做数据源
* 需要注意的是soucket只支持单线程读取,所以source task只能有一个。
*/
//构建flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//启动socket的环境
DataStreamSource<String> sourceDS = env.socketTextStream("master", 8888);
//打印数据
sourceDS.print();
//启动flink
env.execute();
}
}
4、自定的sourc,自定义的source有Apache kafka、Amazon Kesis Stream 、RabbitMQ,也可以是自己定义的source
8、DataStream API sink,写入文件
public class Demo01FileSink {
public static void main(String[] args) throws Exception {
/**
* 需求:统计班级的人数,并且将统计的数据文件存储
* 因为是刚开始学习,并不适应lambda表达式,所以使用的匿名内部类的方式在学习
*/
//首先构建flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
*改进:因为一个并行度就会产生一个文件,为了是所有的数据都写入到一个文件中,
* 将flink的并行度设置成1
*/
env.setParallelism(1);
//将flink的处理模式改成batch的模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//使用readTextFile的方式读取文件
DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");
//将读取的数据文件进行切分统计班级的人数
DataStream<Tuple2<String, Integer>> kvDS = studentDS.map(new MapFunction<String,Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String clazz = value.split(",")[4];
return Tuple2.of(clazz, 1);
}
},Types.TUPLE(Types.STRING,Types.INT));
//根据kv的形式进行分组统计
KeyedStream<Tuple2<String, Integer>, Object> clazzSum = kvDS.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//将构建的kv形式的数据,取出下标索引是1的进行求和
DataStream<Tuple2<String, Integer>> clazzCount = clazzSum.sum(1);
//读取数据:
// clazzCount.print();
//将读取的数据进行保存,但是需要注意的是数据的格式,此时的数据的格式是元组的格式
//所以需要对数据的格式处理
SingleOutputStreamOperator<Object> newClazzCount = clazzCount.map(new MapFunction<Tuple2<String, Integer>, Object>() {
@Override
public Object map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + "\t" + value.f1;
}
});
//将结果保存到文件中:
FileSink<Object> flinkSink = FileSink.forRowFormat(
new Path("flink/data/clazzCount"),//指定文件的存储路径,路径不存在会自动的生成
new SimpleStringEncoder<>("utf-8")//指定数据保存的编码格式
).build();
//使用flink的sink
newClazzCount.sinkTo(flinkSink);
//启动flink
env.execute();
}
}
9、DataStream API Transformation,较为常见的转换算子:
1、Map算子:
public class Demo01Map {
public static void main(String[] args) throws Exception{
/**
* map的处理的方式是每一条数据都会处理一次
*/
//使用Map对数据处理,满足下面的三个需求
//构建flink的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为2
env.setParallelism(2);
//基于文件的source
DataStreamSource<String> lineDS = env.readTextFile("flink/data/students.txt");
//使用类的方式获取学生的年龄
/**
* 由底层的源码可以知道map方法是需要传入的参数是:map(MapFunction<T, R> mapper)
* 由于public interface MapFunction<T, O> extends Function, Serializable
* O map(T value) throws Exception;
* 想要实现该接口就需要创建一个类继承父类并且重写里面的方法
*/
SingleOutputStreamOperator<Integer> ageDS = lineDS.map(new MyMapFunction());
//打印数据
// ageDS.print();
//使用内部类的方式获取学生的班级
SingleOutputStreamOperator<String> clazzDS = lineDS.map(new MapFunction<String, String>() {
//因为是接口,需要重写里面的map方法,因为返回的是班级,是一个string类型的数据
@Override
public String map(String value) throws Exception {
String clazz = value.split(",")[4];
return clazz;
}
});//flink中是不会数据类型推断的,一般是需要指定数据类型,但是这里不需要指定的原因是在开始的时候数据的输出类型已经指定过了。
//打印数据
// clazzDS.print();
//使用lambda的方式获取学生的姓名
//从底层源码可以知道public class SingleOutputStreamOperator<T> extends DataStream<T>
//所以可以将SingleOutputStreamOperator换成DataStream,从而简化代码
DataStream<String> nameDS = lineDS.map(kv -> kv.split(",")[1]);
nameDS.print();
//启动flink
env.execute();
}
}
class MyMapFunction implements MapFunction<String,Integer>{
@Override
public Integer map(String value) throws Exception {
String age = value.split(",")[2];
return Integer.parseInt(age);
}
}
2、FlatMap算子:
public class Demo02FlatMap {
public static void main(String[] args) throws Exception{
/**
* FlatMap是一条数据就会处理一次,并且是进一条数据出多条数据
*/
//构建flink的环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件数据源
DataStreamSource<String> lineDS = env.readTextFile("flink/data/students.txt");
//使用FlatMap的方式
//使用匿名内部类的形式
DataStream<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.split(",");
//使用循环将数据传输到下游
for (String word : split) {
out.collect(word);
}
}
});
//使用lambda的方式
DataStream<String> wordDS1 = lineDS.flatMap((value, out) -> {
String[] split = value.split(",");
for (String word : split) {
out.collect(word);
}
}, Types.STRING);
wordDS1.print();
env.execute();
}
}
3、Filter算子:
public class Demo03Filter {
public static void main(String[] args) throws Exception{
/**
* 转换算子:filter,作用是过滤
* 由底层的源码:
* public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter)
*
* boolean filter(T value) throws Exception;
*
* 能对filter有一个基本的了解
*/
//构建flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件数据源的数据
DataStreamSource<String> lineDS = env.readTextFile("flink/data/students.txt");
//需求:在学生表中过滤性别是男的学生
DataStream<String> genderDS = lineDS.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
//将学生表中的数据按照逗号进行切分,并取出性别那一列
String gender = value.split(",")[3];
//有底层的源码可以知道返回的数据类型是boolean的类型
return "男".equals(gender);
}
});
//使用匿名内部类的方式:
SingleOutputStreamOperator<String> genderDS1 = lineDS.filter(filter -> "男".equals(filter.split(",")[3]));
//genderDS.print();
genderDS1.print();
//执行flink
env.execute();
}
}
4、KeyBy算子:
public class Demo04Keyby {
public static void main(String[] args) throws Exception{
/**
*会将一个相同的key传送到同一个下游算子中。
*/
//构建flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件数据源
DataStream<String> lineDS = env.readTextFile("flink/data/students.txt");
//使用keyBy的方式进行分组,按照班级进行分组:
//首先需要将数据变成kv的形式
SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap((value, out) -> {
String[] split = value.split(",");
for (String word : split) {
//将传输到下游的数据构建成kv形式的数据
out.collect(Tuple2.of(word, 1));
}
}, Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, Object> clazzDS = wordDS.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
wordDS.keyBy(kv->kv.f0).print();
// clazzDS.print();
env.execute();
}
}
5、Reduce算子:
public class Demo05Reduce {
public static void main(String[] args) {
/**
* 在使用reduce的时候必须是在分组之后才能进行的操作
*
*/
//构建flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
//使用keyBy的方式进行分组,按照班级进行分组:
//首先需要将数据变成kv的形式
SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap((value, out) -> {
String[] split = value.split(",");
for (String word : split) {
//将传输到下游的数据构建成kv形式的数据
out.collect(Tuple2.of(word, 1));
}
}, Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, Object> clazzDS = wordDS.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
/**
* public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer)
*
* T reduce(T value1, T value2) throws Exception;
*
*
* 对于reduce来说是一条数据就会执行一次
* value1指的是聚合前的状态,有状态算子
* value2指的是当前计算的数据
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = clazzDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
int count = value1.f1 + value2.f1;
//然后将结果返回出去
return Tuple2.of(value1.f0, count);
}
});
}
}
6、Window算子:
public class Demo06Window {
public static void main(String[] args) throws Exception{
//构建flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
//使用keyBy的方式进行分组,按照班级进行分组:
//首先需要将数据变成kv的形式
SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap((value, out) -> {
String[] split = value.split(",");
for (String word : split) {
//将传输到下游的数据构建成kv形式的数据
out.collect(Tuple2.of(word, 1));
}
}, Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, Object> clazzDS = wordDS.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
/**
* 在使用窗口的时候还是需要先进行分组
* 指的是每一个指定的时间内输出一次结果
*/
WindowedStream<Tuple2<String, Integer>, Object, TimeWindow> window = clazzDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));//每5秒就会输出一次结果。
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = window.sum(1);
countDS.print();
env.execute();
}
}
7、Union算子:
public class Demo07Union {
public static void main(String[] args) throws Exception {
//构建flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> ds1 = env.readTextFile("flink/data/words.txt");
DataStream<String> ds2 = env.readTextFile("flink/data/word");
//将两个读取的数据进行合并
DataStream<String> unionDS = ds1.union(ds2);
unionDS.print();
env.execute();
}
}