Flink -- DataStream API
- 执行环境 Execution Environment
- 创建执行环境
- 设置执行模式
- 触发程序执行
- 源算子 Source
- 从集合中读取数据
- 从文件读取数据
- 从 Socket 读取数据
- 自定义数据源
- 转换算子 Transformation
- 基本转换算子
- 映射 map
- 过滤 filter
- 扁平映射 flatMap
- 聚合算子 Aggregation
- 按键分区 keyBy
- 简单聚合
- 规约聚合 reduce
- 物理分区 Physical Partitioning
- 随机分区 Random
- 轮询分区 Round-Robin
- 重缩放分区 Rescale
- 广播 Broadcast
- 全局分区 Global
- 自定义分区 Custom
- 输出算子 Sink
- 输出到文件
- 输出到 Redis
- 输出到数据库 MySQL
- 输出到 ElasticSearch
- 自定义输出算子 Sink
DataStream 本质上就是 Flink 中用于表示集合的类,其用法类似于 Java 集合,通过 API 定义出一系列的操作进行数据处理。
一个 Flink 程序,实际上就是对 DataStream 的各种转换,代码一般由以下几部分构成:
-
获取执行环境(execution environment)
-
读取数据源(source)
-
定义转换操作(transformations)
-
定义计算结果的输出位置(sink)
-
触发程序执行(execute)
执行环境 Execution Environment
创建执行环境
因为 Flink 可以在各种环境中运行,因此在提交作业执行计算时,首先要获取 Flink 的执行环境,从而建立起程序与 Flink 框架的关系。
执行环境的创建,需要调用 StreamExecutionEnvironment 类的静态方法:
方法 | 概述 |
---|---|
getExecutionEnvironment | 自动判断当前程序的运行方式,并返回对应的运行环境,开发中最常用 |
createLocalEnvironment | 返回一个本地执行环境。在调用时可以传入一个参数指定并行度;若不传入则默认并行度为本地 CPU 核心数 |
createRemoteEnvironment | 返回集群执行环境。在调用时需要依次传入 JobManager 的主机名、端口号以及要执行 jar 包的路径 |
设置执行模式
基于执行环境,我们可以设置不同的执行模式让 Flink 程序在流处理与批处理之间进行切换。调用 StreamExecutionEnvironment 类的setRuntimeMode()
方法,传入对应的参数即可完成设置。
Flink 存在以下 3 种执行模式:
执行模式 | 概述 |
---|---|
RuntimeExecutionMode.STREAMING | 流处理模式,用于需要持续实时处理的无界流数据,程序默认使用该模式 |
RuntimeExecutionMode.BATCH | 批处理模式,用于不会持续计算的有界数据 |
RuntimeExecutionMode.AUTOMATIC | 自动模式,该模式下程序将根据输入数据是否有界来自动选择执行模式 |
触发程序执行
通过 StreamExecutionEnvironment 类的execute()
方法,来触发程序执行。该方法将一直等待作业完成,并返回一个执行结果。
源算子 Source
Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。数据的输入来源称为数据源,读取数据的算子则称为源算子。Flink 代码中添加源算子的方法是调用执行环境的addSource()
方法。
Flink 的源可以有多种方式获取,下面介绍几种获取元数据的方式。
本文中所用到的实例对象 Score 如下:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Score {
String className;
String name;
int score;
}
从集合中读取数据
该方法是最简单的读取数据的方式,直接在 Java 中创建一个集合,调用执行环境的fromCollection()
方法即可。
public class FromCollectionDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 模式选择
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 加载数据源
ArrayList<Score> scores = new ArrayList<>();
scores.add(new Score("一班", "zzz", 89));
scores.add(new Score("二班", "qqq", 92));
scores.add(new Score("三班", "fff", 97));
DataStreamSource<Score> source = environment.fromCollection(scores);
// 3. 数据展示
source.print();
// 4. 执行程序
environment.execute();
}
}
此外,也可以不构建集合,直接列举元素,并调用fromElements()
方法即可。
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz", 76),
new Score("二班", "qqq", 94),
new Score("三班", "fff", 99)
);
从文件读取数据
实际开发应用中,一般不会通过代码将数据写在代码中。通常需要从文件中读取数据进行解析和处理,如读取日志文件。调用执行环境的readTextFile()
方法即可读取文件,方法中需要传入文件的相对路径或绝对路径。
public class ReadTextFileDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 加载数据源
DataStreamSource<String> source = environment.readTextFile("D:/work/my_project/FlinkDemo/src/main/resources/test.txt");
// 3. 数据展示
source.print();
// 4. 执行程序
environment.execute();
}
}
从 Socket 读取数据
从集合和文件中获取的数据都是有界数据,而在流处理的场景中,数据一般是无界的。我们可以简单的通过 Socket 的方式进行无界数据的获取测试。
测试代码的远程 Socket 采用阿里云服务器,开放端口 8080 作为 Socket 文本流端口。
nc -l 8080
运行程序,从 Socket 中读取无界数据。
public class SocketTextStreamDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
String address = "47.92.146.85";
int port = 8080;
DataStreamSource<String> source = environment.socketTextStream(address, port);
// 3. 数据输出
source.print();
// 4. 执行数据
environment.execute();
}
}
在 Socket 端输入数据,可以看到数据被读取进程序。
自定义数据源
在日常开发中,我们可以自定义数据源以获取来自各种数据库以及中间件的数据。自定义数据源需要编写自定义数据源类并继承SourceFunction
接口,实现接口中的run()
以及cancel()
方法。
public class MySource implements SourceFunction<String> {
/**
* 实现数据的获取逻辑并通过 sourceContext 进行转发
* @param sourceContext source 函数用于发出数据的接口
*/
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
sourceContext.collect(String.valueOf(new Random().nextInt(100)));
Thread.sleep(1000);
}
}
/**
* 取消数据源,用于终止循环获取数据的逻辑
*/
@Override
public void cancel() {
}
}
在使用自定义的数据源时,只需要调用执行环境的addSource()
方法,将自定义的数据源对象传入即可。
public class MySourceDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.enableCheckpointing(5000);
// 2. addSource 配置自定义数据源
DataStreamSource<String> source = environment.addSource(new MySource());
// 3. 数据输出
source.print();
// 4. 执行程序
environment.execute();
}
}
转换算子 Transformation
在使用源算子将数据读取到程序之后,我们便可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。Flink 程序的核心就是各种转换操作,它规定了数据处理转换的逻辑。
基本转换算子
映射 map
map 是一一映射的转换算子,即消费一个元素便产出一个元素。
map 算子的使用只需要调用 DataStream 对象的map()
方法即可,方法需要传入的参数是 MapFunction 接口的实现类。map()
方法的返回值仍然为 DataStream,不过泛型可能改变。
下列代码从 Socket 中读取数据,并根据输入数据将 1 转换为 ”男“,将 2 转换为 ”女“.
public class MapDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
String address = "47.92.146.85";
int port = 8080;
DataStreamSource<String> source = environment.socketTextStream(address, port);
// 3. 定义数据转换规则
SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
if ("1".equals(s)) {
return "男";
} else if ("2".equals(s)) {
return "女";
} else {
return "输入有误!";
}
}
});
// 4. 数据输出
outputStreamOperator.print();
// 5. 执行数据
environment.execute();
}
}
代码测试:
过滤 filter
filter 操作实际上是对一个数据流按规定的方式进行过滤,通过一个布尔表达式设置一个过滤条件,对流内的每一个因素进行判断。若返回 true 则元素正常通过;若返回 false 则元素被过滤掉。
filter 算子的使用只需要调用 DataStream 对象的filter()
方法即可,方法需要传入 FilterFunction 接口的实现类。
下列代码从 Socket 中读取数据,并过滤掉所有值小于等于 100 的数据。
public class FilterDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
String address = "47.92.146.85";
int port = 8080;
DataStreamSource<String> source = environment.socketTextStream(address, port);
// 3. 定义数据转换规则
SingleOutputStreamOperator<String> outputStreamOperator = source.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
if (Integer.parseInt(s) > 100) {
return true;
}
return false;
}
});
// 4. 数据输出
outputStreamOperator.print();
// 5. 执行数据
environment.execute();
}
}
代码测试:
扁平映射 flatMap
flatMap 扁平映射可以将数据流中的数据拆分成多个个体处理,即消费一个元素,可以获得 0 个、1 个或者多个数据。
flatMap 算子的使用只需要调用 DataStream 对象的flapMap()
方法即可,方法需要传入 FlatMapFunction 接口的实现类。
下列方法实现了将输入的数据按照空格进行划分,获得多个数据。
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
String address = "47.92.146.85";
int port = 8080;
DataStreamSource<String> source = environment.socketTextStream(address, port);
// 3. 定义数据转换规则
SingleOutputStreamOperator<String> outputStreamOperator = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] strings = s.split(" ");
for (String string:strings) {
collector.collect(string);
}
}
});
// 4. 数据输出
outputStreamOperator.print();
// 5. 执行数据
environment.execute();
}
}
测试代码:
聚合算子 Aggregation
聚合算子,顾名思义,就是将一系列的数据按照某种规则进行统计和整合,从而提炼出更有用的信息的算子。
按键分区 keyBy
在 Flink 中,DataStream 对象没有直接进行聚合的 API,因为我们需要对海量的数据进行分区,然后并行处理数据以提高效率。因此,若要对数据进行聚合,首先需要对数据进行分区,keyBy 就是用来做分区处理的。
keyBy 可以通过指定一个 key 作为分区的依据,将一条数据流从逻辑上划分为不同的分区 partitions。在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写hashCode()
方法。
keyBy 的使用需要调用 DataSource 的keyBy()
方法,传入的参数为 KeySelector 接口的实现类。
需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为
KeyedStream,即”分区流“或”键控流“。
下列代码通过 Score 类的 className 属性对数据流进行分区。
public class KeyByDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 加载数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz", 76),
new Score("一班", "zzz1", 77),
new Score("二班", "qqq", 94),
new Score("二班", "qqq1", 76),
new Score("三班", "fff", 99),
new Score("三班", "fff1", 81)
);
// 3. 数据分区
KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {
@Override
public String getKey(Score score) throws Exception {
return score.getClassName();
}
});
// 4. 打印数据
keyedStream.print();
// 5. 执行程序
environment.execute();
}
}
测试代码:可以看到,班级相同的对象在最前面所对应的分区号也相同,即进入一个分区进行处理。
简单聚合
有了分区流 KeyedStream 之后,我们就可以根据它进行数据的聚合操作了。Flink 内置实现了一些简单的聚合 API:
-
sum()
:对指定字段做叠加求和; -
min()
:对指定字段求最小值; -
max()
:对指定字段求最大值; -
minBy()
:对指定字段求最小值并保留含最小字段的整条数据; -
maxBy()
:对指定字段求最大值并保留含最大子段的整条数据;
测试代码:(注意打印数据时每条聚合算子单独使用,否则结果不容易观察)
public class SimpleAggregationDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 加载数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz01", 76),
new Score("一班", "zzz02", 89),
new Score("一班", "zzz03", 84),
new Score("二班", "qqq01", 94),
new Score("二班", "qqq02", 74),
new Score("二班", "qqq03", 64),
new Score("三班", "fff01", 84),
new Score("三班", "fff02", 94),
new Score("三班", "fff03", 96)
);
// 3. 数据分区
KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {
@Override
public String getKey(Score score) throws Exception {
return score.getClassName();
}
});
// 4. 打印数据
keyedStream.max("score").print();
keyedStream.min("score").print();
keyedStream.sum("score").print();
keyedStream.maxBy("score").print();
keyedStream.minBy("score").print();
// 5. 执行程序
environment.execute();
}
}
规约聚合 reduce
reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。该操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元
素数据类型,所以输出类型和输入类型是一样的。
调用 KeyedStream 的reduce()
方法时,需要传入一个参数,实现 ReduceFunction 接口。
下述代码利用 reduce 算子以及 Tuple 数据结构,同时计算出了 max 与 sum 的值
public class ReduceDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 加载数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz01", 76),
new Score("一班", "zzz02", 89),
new Score("一班", "zzz03", 84),
new Score("二班", "qqq01", 94),
new Score("二班", "qqq02", 74),
new Score("二班", "qqq03", 64),
new Score("三班", "fff01", 84),
new Score("三班", "fff02", 94),
new Score("三班", "fff03", 96)
);
// 3. 数据分区
source.map(new MapFunction<Score, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(Score score) throws Exception {
return new Tuple3<String, Integer, Integer>(score.getClassName(), score.getScore(), score.getScore());
}
}).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
@Override
public String getKey(Tuple3<String, Integer, Integer> tuple3) throws Exception {
return tuple3.f0;
}
}).reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1,
Tuple3<String, Integer, Integer> t2) throws Exception {
return Tuple3.of(t1.f0, t1.f1 > t2.f1 ? t1.f1 : t2.f1, t1.f2 + t2.f2);
}
})
// 4. 打印数据
.print();
// 5. 执行程序
environment.execute();
}
}
物理分区 Physical Partitioning
分区操作即将数据进行重新分布,传递到不同的分区中进行下一步的操作。比如之前介绍过的 keyBy 按键分区,就是通过指定 key 的哈希值对数据进行分区的。对于 keyBy 而言,无法确定数据到底分到哪个区,也不会考虑数据的分区是否均匀,它是一种逻辑分区(logical partitioning)。
若我们想精确的对数据进行分区,即真正的控制分区策略,那我们就需要物理分区策略(physical partitioning)。物理分区策略就类似消息队列和 Nginx 做的那样,常见的物理分区策略包括随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)、以及广播(Broadcast)。
随机分区 Random
通过调用 DataStream 对象的shuffle()
方法,即可对数据进行“洗牌”,将数据随机分配到下游算子的并行任务当中。经过随机分区后,我们得到的仍然是一个 DataStream 对象。
下列代码将任务流从一个分区拓展为 4 个分区,并进行随机分区。
public class ShuffleDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
DataStreamSource<String> source = environment.addSource(new MySource());
// 3. 打印输出
source.shuffle().print("shuffle").setParallelism(4);
// 4. 执行程序
environment.execute();
}
}
测试结果如下,可以看到,数据被随机分配到不同的分区执行打印的操作。
轮询分区 Round-Robin
轮询分区按照顺序依次将数据分发到不同的分区,通过调用 DataStream 对象的rebalance()
方法即可实现数据的轮询分区。
下列代码将任务流从一个分区拓展为 4 个分区,并进行轮询分区。
public class RebalanceDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
DataStreamSource<String> source = environment.addSource(new MySource());
// 3. 打印输出
source.rebalance().print("rebalance").setParallelism(4);
// 4. 执行程序
environment.execute();
}
}
代码输出结果如下,可以看到,数据按照分区编号 2-3-4-1 的顺序被依次分发到对应的分区。
重缩放分区 Rescale
重缩放分区与轮询分区类似,只不过两者的作用范围不同。轮询分区针对所有的上游任务和下游任务进行重新分区;而重缩放仅对部分上游任务和下游任务之间进行重新分区,节省更多资源。
当下游任务的数量是上游任务数量的整数倍时,rescale 的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。
通过调用 DataStream 对象的rescale()
方法可以实现数据的重缩放分区。
下列代码采用并行数据源的富函数,将奇数发送到索引为 1 的并行子任务;将偶数发送到索引为 0 的并行子任务。随后将任务流从 2 个分区拓展为 4 个分区,并进行重缩放分区。
public class RescaleDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
DataStreamSource<Integer> source = environment.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
for (int i = 0; i < 8; i++) {
if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
sourceContext.collect(i + 1);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2);
// 3. 打印输出
source.rescale().print("rescale").setParallelism(4);
// 4. 执行程序
environment.execute();
}
}
执行结果如下,可以观察到,奇数数据在重分区时,被轮流分发到 3 和 4 两个子分区(对应索引为 1 的原子任务);而偶数数据在重分区时,被轮流分发到 1 和 2 两个子分区(对应索引为 0 的原子任务)。因此,重缩放分区实际上就是局部上的轮询分区。
广播 Broadcast
广播即将数据重新分发到所有的下游子任务当中,数据将存在在每一个子分区。通过调用 DataStream 对象的broadcast()
方法,可以实现数据的广播。注意,该方法可能会导致数据的重复处理。
下列代码将任务流从一个分区拓展为 4 个分区,并进行数据广播分区。
public class BroadcastDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zqf", 100));
// 3. 打印输出
source.broadcast().print("broadcast").setParallelism(4);
// 4. 执行程序
environment.execute();
}
}
代码结果如下,可以看到,数据被广播到 4 个子分区当中。
全局分区 Global
全局分区是一种特殊的分区方式。通过调用 DataStream 对象的global()
方法,可以强行将下游任务的并行度变为 1,因此使用该操作需要特别谨慎,可能会对程序造成较大压力。
下列代码将任务流从 2 个分区通过全局分区的方式修改为 1 个分区。
public class GlobalDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(2);
// 2. 加载数据源
DataStreamSource<String> source = environment.addSource(new MySource());
// 3. 打印输出
source.global().print("global");
// 4. 执行程序
environment.execute();
}
}
代码结果如下。
自定义分区 Custom
若 Flink 提供的分区策略均不能满足我们的需求,此时可以通过partitionCustom()
方法来自定义分区策略,该方法在调用时需要传入两个参数:
-
第一个参数为自定义分区器对象;
-
第二个参数为应用自定义分区器的字段;
下列代码演示了如何通过数值的奇偶性进行分区。
public class CustomDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
DataStreamSource<String> source = environment.addSource(new MySource());
// 3. 打印输出
source.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % 2;
}
}, new KeySelector<String, Integer>() {
@Override
public Integer getKey(String s) throws Exception {
return Integer.valueOf(s);
}
}).print().setParallelism(2);
// 4. 执行程序
environment.execute();
}
}
代码结果如下,可以看到,所有奇数被分配到分区 2;所有偶数被分配到分区 1。
输出算子 Sink
Flink 作为数据处理框架,最终还需要把处理的结果写入外部系统。这个过程主要通过 Sink 算子实现,我们可以使用 Flink 提供的 Sink 算子,也可以自定义 Sink 算子。
之前的代码中我们一直使用的print()
方法就是一种 Sink 算子,它表示将数据流写入标准控制台打印输出。
输出到文件
Flink 专门提供了一个流式文件系统的连接器 StreamingFileSink,其为流处理和批处理提供了统一的 Sink,可以将分区文件写入 Flink 支持的文件系统。
StreamingFileSink 支持行编码和批量编码两种方式,可以直接调用静态方法构建:
-
行编码:
StreamingFileSink.forRowFormat(basePath,rowEncoder)
-
批量编码:
StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径
(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
下列代码创建了一个简单的文件 Sink,并制定了文件写入的滚动策略,滚动策略即我们开启新文件记录数据的标准,下列代码设置的滚动策略为:
-
至少包含 15 分钟的数据;
-
最近 5 分钟内没有收到新的数据;
-
文件大小已经达到 1 GB;
public class SinkToFileDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz01", 76),
new Score("二班", "qqq03", 64),
new Score("三班", "fff01", 84)
);
// 3. 定义数据转换规则
SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<Score, String>() {
@Override
public String map(Score score) throws Exception {
return score.toString();
}
});
// 4. 数据输出
StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
new Path("D:/work/my_project/FlinkDemo/src/main/resources"),
new SimpleStringEncoder<>("utf-8")
).withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build()
).build();
outputStreamOperator.addSink(fileSink);
// 5. 执行数据
environment.execute();
}
}
输出到 Redis
Bahir 项目为我们提供了 Flink 和 Redis 的连接器,首先我们导入连接器的依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
连接器为我们提供了一个 RedisSink,创建对象时需要输入两个参数:
-
第一个参数是 Jedis 连接池的配置信息;
-
第二个参数需要传入 Redis 映射类的接口,需要我们写一个类继承 RedisMapper 接口,并实现相应的方法,定义数据转换成 Redis 数据格式的逻辑;
Redis 的映射类代码如下,此处我们保存的数据类型是 hash,表名设置为 score;每条数据的 name 字段作为 key,score 字段作为 value。
public class MyRedisMapper implements RedisMapper<Score> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "score");
}
@Override
public String getKeyFromData(Score score) {
return score.getName();
}
@Override
public String getValueFromData(Score score) {
return Integer.toString(score.getScore());
}
}
完整的实现数据 sink 到 redis 的代码如下。
public class SinkToRedisDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz01", 76),
new Score("二班", "qqq03", 64),
new Score("三班", "fff01", 84)
);
// 3. 创建一个 redis 的连接
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("47.92.146.85")
.setPort(6379)
.build();
// 4. 数据输出到 redis
source.addSink(new RedisSink<Score>(jedisPoolConfig, new MyRedisMapper()));
// 5. 执行数据
environment.execute();
}
}
运行代码,然后使用 redis-cli 连接 redis,查看结果。
root@2d1c2701081c:/data# redis-cli
127.0.0.1:6379> hgetall score
1) "fff01"
2) "84"
3) "zzz01"
4) "76"
5) "qqq03"
6) "64"
127.0.0.1:6379>
输出到数据库 MySQL
本小节将介绍如何将数据 sink 到 MySQL 数据库进行持久化存储。
首先我们需要引入相应的依赖,如下所示。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.0.0-1.16</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
根据要存储的信息在数据库中创建相应的表结构。
编写代码,对数据库的连接、sql 语句进行配置,完整代码如下。
public class SinkToMySQLDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
DataStreamSource<Score> source = environment.fromElements(
new Score("一班", "zzz01", 76),
new Score("二班", "qqq03", 64),
new Score("三班", "fff01", 84)
);
// 3. 数据输出
source.addSink(JdbcSink.sink(
"INSERT INTO score (class_name, name, score) VALUES (?, ?, ?)",
(statement, score) -> {
statement.setString(1, score.getClassName());
statement.setString(2, score.getName());
statement.setInt(3, score.getScore());
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://47.92.146.85:3306/SysManage?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true&rewriteBatchedStatements=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("XXXXXXXXX")
.build()
));
// 5. 执行程序
environment.execute();
}
}
运行程序,连接数据库进行数据查看。
输出到 ElasticSearch
学习完 ES 相关知识后补充…
自定义输出算子 Sink
自定义输出算子需要我们编写自定义 Sink 类,并继承 RichSinkFunction 类,重写该类的invoke()
方法和finish()
方法,定义数据 sink 的处理逻辑。
下列自定义的 sink 算子将数据按照标准格式进行打印。
public class MySink extends RichSinkFunction<String> {
/**
* 每条读取到的记录都会调用该方法
* @param value 获取到的值
* @param context 可用于获取有关输入记录的附加数据的上下文
*/
@Override
public void invoke(String value, Context context) throws Exception {
super.invoke(value, context);
System.out.println("{data=" + value + "}");
}
/**
* 任务完成后调用该方法
*/
@Override
public void finish() throws Exception {
System.out.println("mission complete~");
}
}
在使用自定义的 sink 算子时,只需要将实例化的对象传入addSink()
方法即可。
public class MySinkDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. socketTextStream 配置数据源
String address = "47.92.146.85";
int port = 8080;
DataStreamSource<String> source = environment.socketTextStream(address, port);
// 3. 定义数据转换规则
SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s;
}
});
// 4. 数据输出
outputStreamOperator.addSink(new MySink());
// 5. 执行数据
environment.execute();
}
}
代码测试结果如下。