Flink基础
1、系统时间与时间时间
系统时间(处理时间)
在Sparksreaming的任务计算时,使用的是系统时间。
假设所用窗口为滚动窗口,大小为5分钟。那么每五分钟,都会对接收的数据进行提交任务.
但是,这里有个要注意的点,有个概念叫时间轴对齐。若我们在12:12开始接收数据,按道理我们会在12:17进行提交任务。事实上我们会在12:20进行提交任务,因为会进行时间轴对齐,将一天按照五分钟进行划分,会对应到12:20。在此时提交任务,后面每个五分钟提交任务,都会对应到我们所划分的时间轴。
事件时间
flink支持带有事件时间的窗口(Window)操作
事件时间区别于系统时间,如下举例:
flink处理实时数据,对数据进行逐条处理。设定事件时间为5分钟,12:00开始接收数据,接收的第一条数据时间为12:01,接收的第二条数据为12:02。假设从此时起没有收到数据,那么将不会进行提交任务。**到了12:06,接收到了第三条数据。第三条数据的接收时间自12:00起,已经超过了五分钟,**那么此时便会进行任务提交。
2、wordcount简单案例的实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Demo01StreamWordCount {
public static void main(String[] args) throws Exception {
// 1、构建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// 2、通过Socket模拟无界流环境,方便FLink处理
// 虚拟机启动:nc -lk 8888
// 从Source构建第一个DataStream
// TODO C:\Windows\System32\drivers\etc\hosts文件中配置了master与IP地址的映射,所以这里可以使用master
DataStream<String> lineDS = env.socketTextStream("master", 8888);
// 统计每个单词的数量
// 第一步:将每行数据的每个单词切出来并进行扁平化处理
DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
/**
*FlatMapFunction<String, String>: 表示输入、输出数据的类型
* @param line DS中的一条数据
* @param out 通过collect方法将数据发送到下游
* @throws Exception
*/
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
// 将每个单词发送到下游
out.collect(word);
}
}
});
// 第二步:将每个单词变成 KV格式,V置为1;返回的数据是一个二元组Tuple2
DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
/**
* 第三步:按每一个单词进行分组; 无法再使用其父类DataStream进行定义(无法向上转型)
* KeyedStream<T, K> 是 DataStream<T> 的一个特殊化版本,它添加了与键控操作相关的特定方法(如 reduce、aggregate、window 等)。
* 由于 KeyedStream 提供了额外的功能和方法,它不能简单地被视为 DataStream 的一个简单实例,
* 因为它实现了额外的接口(如 KeyedOperations<T, K>)并可能覆盖了某些方法的行为以支持键控操作。
*/
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
// 对Key进行分组
return tuple2.f0;
}
});
// 第四步:对1进行聚合sum,下标是从0开始的
DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);
// 3、打印结果:将DS中的内容Sink到控制台
wordCntDS.print();
// 执行任务
env.execute();
}
}
3、设置任务执行的并行度
本机为8核,可并行16的线程
手动改变任务的并行度,若不设置则会显示1-16,设置后只会显示1-2
env.setParallelism(2);
setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。
env.setBufferTimeout(200);
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Demo01StreamWordCount {
public static void main(String[] args) throws Exception {
// 1、构建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// 手动改变任务的并行度,默认并行度为最大,
env.setParallelism(2);
// setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。
env.setBufferTimeout(200);
// 2、通过Socket模拟无界流环境,方便FLink处理
// 虚拟机启动:nc -lk 8888
// 从Source构建第一个DataStream
DataStream<String> lineDS = env.socketTextStream("master", 8888);
System.out.println("lineDS并行度:" + lineDS.getParallelism());
// 统计每个单词的数量
// 第一步:将每行数据的每个单词切出来并进行扁平化处理
DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
/**
*
* @param line DS中的一条数据
* @param out 通过collect方法将数据发送到下游
* @throws Exception
*/
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
// 将每个单词发送到下游
out.collect(word);
}
}
});
System.out.println("wordsDS并行度:" + wordsDS.getParallelism());
// 第二步:将每个单词变成 KV格式,V置为1
DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
System.out.println("wordKVDS并行度:" + wordKVDS.getParallelism());
// 第三步:按每一个单词进行分组
// keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
// 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
});
System.out.println("keyedDS并行度:" + keyedDS.getParallelism());
// 第四步:对1进行聚合sum
DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);
System.out.println("wordCntDS并行度:" + wordCntDS.getParallelism());
// 3、打印结果:将DS中的内容Sink到控制台
keyedDS.print();
env.execute();
}
}
4、设置批/流处理方式,使用Lambda表达式,使用自定类实现接口中抽象的方法
package com.shujia.flink.core;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Demo02BatchWordCount {
public static void main(String[] args) throws Exception {
// 1、构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Flink程序的处理方式:默认是流处理
/**
* BATCH:批处理,只能处理有界流,底层是MR模型,可以进行预聚合
* STREAMING:流处理,可以处理无界流,也可以处理有界流,底层是持续流模型,数据一条一条处理
* AUTOMATIC:自动判断,当所有的Source都是有界流则使用BATCH模式,当Source中有一个是无界流则会使用STREAMING模式
*/
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 2、获得第一个DS
// 通过readTextFile可以基于文件构建有界流
DataStream<String> wordsFileDS = env.readTextFile("flink/data/words.txt");
// 3、DS之间的转换
// 统计每个单词的数量
// 第一步:将每行数据的每个单词切出来并进行扁平化处理
// Flink处理逻辑传入的方式
// new XXXFunction 使用匿名内部类
// DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
// /**
// * @param line DS中的一条数据
// * @param out 通过collect方法将数据发送到下游
// * @throws Exception
// * Type parameters:
// * FlatMapFunction<T, O>
// * <T> – Type of the input elements. <O> – Type of the returned elements.
// */
// @Override
// public void flatMap(String line, Collector<String> out) throws Exception {
// for (String word : line.split(",")) {
// // 将每个单词发送到下游
// out.collect(word);
// }
// }
// });
/**
* 使用Lambda表达式
* 使用时得清楚FlatMapFunction中所要实现的抽象方法flatMap的两个参数的含义
* ()->{}
* 通过 -> 分隔,左边是函数的参数,右边是函数实现的具体逻辑
* 并且需要给出 flatMap函数的输出类型,Types.STRING
* line: 输入数据类型, out: 输出数据类型
*/
DataStream<String> wordsDS = wordsFileDS.flatMap((line, out) -> {
for (String word : line.split(",")) {
out.collect(word);
}
}, Types.STRING);
//TODO 使用自定类实现接口中抽象的方法,一般不使用这种方法
wordsFileDS.flatMap(new MyFunction()).print();
// 第二步:将每个单词变成 KV格式,V置为1
// DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
// @Override
// public Tuple2<String, Integer> map(String word) throws Exception {
// return Tuple2.of(word, 1);
// }
// });
// TODO 此处需要给出 map函数的输出类型,Types.TUPLE(Types.STRING, Types.INT),是一个二元组
DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
/**
* 第三步:按每一个单词进行分组
* keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
* 传递数据的规则:hash取余(线程总数,默认CPU的总线程数,本机为16)原理
*/
// KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
// @Override
// public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
// return tuple2.f0;
// }
// });
// TODO 此处的Types.STRING 并不是直接表示某个方法的输出类型,而是用来指定 keyBy 方法中键(key)的类型。这里可以省略!
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(kv -> kv.f0, Types.STRING);
// 第四步:对1进行聚合sum,无需指定返回值类型
DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);
// 4、最终结果的处理(保存/输出打印)
wordCntDS.print();
env.execute();
}
}
class MyFunction implements FlatMapFunction<String,String>{
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
// 将每个单词发送到下游
out.collect(word);
}
}
}
5、source
1、从本地集合source中读取数据
package com.shujia.flink.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Demo01ListSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 本地集合Source
ArrayList<String> arrList = new ArrayList<>();
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
//TODO 有界流,fromCollection
DataStream<String> listDS = env.fromCollection(arrList);
listDS.print();
env.execute();
}
}
2、新版本从本地文件中读取数据,有界流和无界流两种方式
package com.shujia.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.File;
import java.time.Duration;
public class Demo02FileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 历史版本读文件的方式,有界流
DataStream<String> oldFileDS = env.readTextFile("flink/data/words.txt");
// oldFileDS.print();
//TODO 读取案例一: 新版本加载文件的方式:FileSource,默认是有界流
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat()
, new Path("flink/data/words.txt")
)
.build();
//TODO 从Source加载数据构建DS,使用自带source类,使用 fromSource
DataStream<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
fileSourceDS.print();
//TODO 读取案例二: 将读取文件变成无界流
FileSource<String> fileSource2 = FileSource
.forRecordStreamFormat(
new TextLineInputFormat()
, new Path("flink/data/words")
)
//TODO 使成为无界流读取一个文件夹中的数据,类似Flume中的spool dir,可以监控一个目录下文件的变化
// Duration.ofSeconds(5) 以5秒为间隔持续监控
.monitorContinuously(Duration.ofSeconds(5))
.build();
DataStream<String> fileSourceDS2 = env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),"fileSource2");
fileSourceDS2.print();
env.execute();
}
}
3、自定义source类,区分有界流与无界流
- 只有在Source启动时会执行一次
run方法如果会结束,则Source会得到一个有界流
run方法如果不会结束,则Source会得到一个无界流
package com.shujia.flink.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Demo03MySource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 使用自定义source类,通过addSource对其进行添加
DataStream<String> mySourceDS = env.addSource(new MySource());
mySourceDS.print();
env.execute();
}
}
class MySource implements SourceFunction<String>{
/**
* 只有在Source启动时会执行一次
* run方法如果会结束,则Source会得到一个有界流
* run方法如果不会结束,则Source会得到一个无界流
* 下面的例子Source会得到一个无界流
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("run方法启动了");
// ctx 可以通过collect方法向下游发送数据
long cnt = 0L;
while(true){
ctx.collect(cnt+"");
cnt ++;
// 休眠一会
Thread.sleep(1000);
}
}
// Source结束时会执行
@Override
public void cancel() {
System.out.println("Source结束了");
}
}
4、自定义source类,读取MySQL中的数据,并进行处理
package com.shujia.flink.source;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class Demo04MyMySQLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Students> studentDS = env.addSource(new MyMySQLSource());
// 统计班级人数
DataStream<Tuple2<String, Integer>> clazzCntDS = studentDS
.map(stu -> Tuple2.of(stu.clazz, 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
.sum(1);
clazzCntDS.print();
// 统计性别人数
DataStream<Tuple2<String, Integer>> genderCntDS = studentDS
.map(stu -> Tuple2.of(stu.gender, 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t2 -> t2.f0)
.sum(1);
genderCntDS.print();
env.execute();
}
}
// TODO 自定义source类从MySQL中读取数据
class MyMySQLSource implements SourceFunction<Students> {
@Override
public void run(SourceContext<Students> ctx) throws Exception {
//TODO run方法只会执行一次创建下列对象的操作
// 建立连接
Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata_30", "root", "123456");
// 创建Statement
Statement st = conn.createStatement();
// 执行查询
ResultSet rs = st.executeQuery("select * from students2");
// 遍历rs提取每一条数据
while (rs.next()) {
long id = rs.getLong("id");
String name = rs.getString("name");
int age = rs.getInt("age");
String gender = rs.getString("gender");
String clazz = rs.getString("clazz");
Students stu = new Students(id, name, age, gender, clazz);
ctx.collect(stu);
/**
* 16> (文科四班,1)
* 15> (女,1)
* 15> (女,2)
* 2> (男,1)
* 7> (文科六班,1)
* 15> (女,3)
* 2> (男,2)
* 17> (理科六班,1)
* 17> (理科六班,2)
* 13> (理科五班,1)
* 20> (理科二班,1)
* 13> (理科四班,1)
*/
}
rs.close();
st.close();
conn.close();
}
@Override
public void cancel() {
}
}
// TODO 创建一个类,用于存储从MySQL中取出的数据
class Students {
Long id;
String name;
Integer age;
String gender;
String clazz;
public Students(Long id, String name, Integer age, String gender, String clazz) {
this.id = id;
this.name = name;
this.age = age;
this.gender = gender;
this.clazz = clazz;
}
}
6、sink
1、构建FileSink,监控一个端口中的数据并将其写入到本地文件夹中
package com.shujia.flink.sink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class Demo01FileSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
// 构建FileSink
FileSink<String> fileSink = FileSink
.<String>forRowFormat(new Path("flink/data/fileSink"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
// 这个设置定义了滚动的时间间隔。
.withRolloverInterval(Duration.ofSeconds(10))
// 这个设置定义了一个不活动间隔。
.withInactivityInterval(Duration.ofSeconds(10))
// 这个设置定义了单个日志文件可以增长到的最大大小。在这个例子中,每个日志文件在被滚动之前可以增长到最多1MB。
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
lineDS.sinkTo(fileSink);
env.execute();
}
}
2、自定义sink类
package com.shujia.flink.sink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
public class Demo02MySink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> arrList = new ArrayList<>();
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
arrList.add("flink");
DataStreamSource<String> ds = env.fromCollection(arrList);
ds.addSink(new MySinkFunction());
env.execute();
/**
* 进入了invoke方法
* flink
* 进入了invoke方法
* flink
* 进入了invoke方法
* flink
* 进入了invoke方法
* flink
*/
}
}
class MySinkFunction implements SinkFunction<String>{
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("进入了invoke方法");
// invoke 每一条数据会执行一次
// 最终数据需要sink到哪里,就对value进行处理即可
System.out.println(value);
}
}