【基础】Flink -- DataStream API

news2024/11/19 8:48:50

Flink -- DataStream API

  • 执行环境 Execution Environment
    • 创建执行环境
    • 设置执行模式
    • 触发程序执行
  • 源算子 Source
    • 从集合中读取数据
    • 从文件读取数据
    • 从 Socket 读取数据
    • 自定义数据源
  • 转换算子 Transformation
    • 基本转换算子
      • 映射 map
      • 过滤 filter
      • 扁平映射 flatMap
    • 聚合算子 Aggregation
      • 按键分区 keyBy
      • 简单聚合
      • 规约聚合 reduce
      • 物理分区 Physical Partitioning
        • 随机分区 Random
        • 轮询分区 Round-Robin
        • 重缩放分区 Rescale
        • 广播 Broadcast
        • 全局分区 Global
        • 自定义分区 Custom
  • 输出算子 Sink
    • 输出到文件
    • 输出到 Redis
    • 输出到数据库 MySQL
    • 输出到 ElasticSearch
    • 自定义输出算子 Sink

DataStream 本质上就是 Flink 中用于表示集合的类,其用法类似于 Java 集合,通过 API 定义出一系列的操作进行数据处理。

一个 Flink 程序,实际上就是对 DataStream 的各种转换,代码一般由以下几部分构成:

  • 获取执行环境(execution environment)

  • 读取数据源(source)

  • 定义转换操作(transformations)

  • 定义计算结果的输出位置(sink)

  • 触发程序执行(execute)

执行环境 Execution Environment

创建执行环境

因为 Flink 可以在各种环境中运行,因此在提交作业执行计算时,首先要获取 Flink 的执行环境,从而建立起程序与 Flink 框架的关系。

执行环境的创建,需要调用 StreamExecutionEnvironment 类的静态方法:

方法概述
getExecutionEnvironment自动判断当前程序的运行方式,并返回对应的运行环境,开发中最常用
createLocalEnvironment返回一个本地执行环境。在调用时可以传入一个参数指定并行度;若不传入则默认并行度为本地 CPU 核心数
createRemoteEnvironment返回集群执行环境。在调用时需要依次传入 JobManager 的主机名、端口号以及要执行 jar 包的路径

设置执行模式

基于执行环境,我们可以设置不同的执行模式让 Flink 程序在流处理与批处理之间进行切换。调用 StreamExecutionEnvironment 类的setRuntimeMode()方法,传入对应的参数即可完成设置。

Flink 存在以下 3 种执行模式:

执行模式概述
RuntimeExecutionMode.STREAMING流处理模式,用于需要持续实时处理的无界流数据,程序默认使用该模式
RuntimeExecutionMode.BATCH批处理模式,用于不会持续计算的有界数据
RuntimeExecutionMode.AUTOMATIC自动模式,该模式下程序将根据输入数据是否有界来自动选择执行模式

触发程序执行

通过 StreamExecutionEnvironment 类的execute()方法,来触发程序执行。该方法将一直等待作业完成,并返回一个执行结果。

源算子 Source

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。数据的输入来源称为数据源,读取数据的算子则称为源算子。Flink 代码中添加源算子的方法是调用执行环境的addSource()方法。

Flink 的源可以有多种方式获取,下面介绍几种获取元数据的方式。

本文中所用到的实例对象 Score 如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Score {

    String className;

    String name;

    int score;

}

从集合中读取数据

该方法是最简单的读取数据的方式,直接在 Java 中创建一个集合,调用执行环境的fromCollection()方法即可。

public class FromCollectionDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 模式选择
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. 加载数据源
        ArrayList<Score> scores = new ArrayList<>();
        scores.add(new Score("一班", "zzz", 89));
        scores.add(new Score("二班", "qqq", 92));
        scores.add(new Score("三班", "fff", 97));
        DataStreamSource<Score> source = environment.fromCollection(scores);
        // 3. 数据展示
        source.print();
        // 4. 执行程序
        environment.execute();
    }
}

此外,也可以不构建集合,直接列举元素,并调用fromElements()方法即可。

        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz", 76),
                new Score("二班", "qqq", 94),
                new Score("三班", "fff", 99)
        );

从文件读取数据

实际开发应用中,一般不会通过代码将数据写在代码中。通常需要从文件中读取数据进行解析和处理,如读取日志文件。调用执行环境的readTextFile()方法即可读取文件,方法中需要传入文件的相对路径或绝对路径。

public class ReadTextFileDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. 加载数据源
        DataStreamSource<String> source = environment.readTextFile("D:/work/my_project/FlinkDemo/src/main/resources/test.txt");
        // 3. 数据展示
        source.print();
        // 4. 执行程序
        environment.execute();
    }
}

从 Socket 读取数据

从集合和文件中获取的数据都是有界数据,而在流处理的场景中,数据一般是无界的。我们可以简单的通过 Socket 的方式进行无界数据的获取测试。

测试代码的远程 Socket 采用阿里云服务器,开放端口 8080 作为 Socket 文本流端口。

nc -l 8080

运行程序,从 Socket 中读取无界数据。

public class SocketTextStreamDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        String address = "47.92.146.85";
        int port = 8080;
        DataStreamSource<String> source = environment.socketTextStream(address, port);
        // 3. 数据输出
        source.print();
        // 4. 执行数据
        environment.execute();
    }
}

在 Socket 端输入数据,可以看到数据被读取进程序。

在这里插入图片描述

自定义数据源

在日常开发中,我们可以自定义数据源以获取来自各种数据库以及中间件的数据。自定义数据源需要编写自定义数据源类并继承SourceFunction接口,实现接口中的run()以及cancel()方法。

public class MySource implements SourceFunction<String> {

    /**
     * 实现数据的获取逻辑并通过 sourceContext 进行转发
     * @param sourceContext source 函数用于发出数据的接口
     */
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while (true) {
            sourceContext.collect(String.valueOf(new Random().nextInt(100)));
            Thread.sleep(1000);
        }
    }

    /**
     * 取消数据源,用于终止循环获取数据的逻辑
     */
    @Override
    public void cancel() {

    }
}

在使用自定义的数据源时,只需要调用执行环境的addSource()方法,将自定义的数据源对象传入即可。

public class MySourceDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.enableCheckpointing(5000);
        // 2. addSource 配置自定义数据源
        DataStreamSource<String> source = environment.addSource(new MySource());
        // 3. 数据输出
        source.print();
        // 4. 执行程序
        environment.execute();
    }
}

转换算子 Transformation

在使用源算子将数据读取到程序之后,我们便可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。Flink 程序的核心就是各种转换操作,它规定了数据处理转换的逻辑。

基本转换算子

映射 map

map 是一一映射的转换算子,即消费一个元素便产出一个元素。

map 算子的使用只需要调用 DataStream 对象的map()方法即可,方法需要传入的参数是 MapFunction 接口的实现类。map()方法的返回值仍然为 DataStream,不过泛型可能改变。

下列代码从 Socket 中读取数据,并根据输入数据将 1 转换为 ”男“,将 2 转换为 ”女“.

public class MapDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        String address = "47.92.146.85";
        int port = 8080;
        DataStreamSource<String> source = environment.socketTextStream(address, port);
        // 3. 定义数据转换规则
        SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                if ("1".equals(s)) {
                    return "男";
                } else if ("2".equals(s)) {
                    return "女";
                } else {
                    return "输入有误!";
                }
            }
        });
        // 4. 数据输出
        outputStreamOperator.print();
        // 5. 执行数据
        environment.execute();
    }
}

代码测试:

在这里插入图片描述

过滤 filter

filter 操作实际上是对一个数据流按规定的方式进行过滤,通过一个布尔表达式设置一个过滤条件,对流内的每一个因素进行判断。若返回 true 则元素正常通过;若返回 false 则元素被过滤掉。

filter 算子的使用只需要调用 DataStream 对象的filter()方法即可,方法需要传入 FilterFunction 接口的实现类。

下列代码从 Socket 中读取数据,并过滤掉所有值小于等于 100 的数据。

public class FilterDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        String address = "47.92.146.85";
        int port = 8080;
        DataStreamSource<String> source = environment.socketTextStream(address, port);
        // 3. 定义数据转换规则
        SingleOutputStreamOperator<String> outputStreamOperator = source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                if (Integer.parseInt(s) > 100) {
                    return true;
                }
                return false;
            }
        });
        // 4. 数据输出
        outputStreamOperator.print();
        // 5. 执行数据
        environment.execute();
    }
}

代码测试:

在这里插入图片描述

扁平映射 flatMap

flatMap 扁平映射可以将数据流中的数据拆分成多个个体处理,即消费一个元素,可以获得 0 个、1 个或者多个数据。

flatMap 算子的使用只需要调用 DataStream 对象的flapMap()方法即可,方法需要传入 FlatMapFunction 接口的实现类。

下列方法实现了将输入的数据按照空格进行划分,获得多个数据。

public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        String address = "47.92.146.85";
        int port = 8080;
        DataStreamSource<String> source = environment.socketTextStream(address, port);
        // 3. 定义数据转换规则
        SingleOutputStreamOperator<String> outputStreamOperator = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] strings = s.split(" ");
                for (String string:strings) {
                    collector.collect(string);
                }
            }
        });
        // 4. 数据输出
        outputStreamOperator.print();
        // 5. 执行数据
        environment.execute();
    }
}

测试代码:

在这里插入图片描述

聚合算子 Aggregation

聚合算子,顾名思义,就是将一系列的数据按照某种规则进行统计和整合,从而提炼出更有用的信息的算子。

按键分区 keyBy

在 Flink 中,DataStream 对象没有直接进行聚合的 API,因为我们需要对海量的数据进行分区,然后并行处理数据以提高效率。因此,若要对数据进行聚合,首先需要对数据进行分区,keyBy 就是用来做分区处理的。

keyBy 可以通过指定一个 key 作为分区的依据,将一条数据流从逻辑上划分为不同的分区 partitions。在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写hashCode()方法。

keyBy 的使用需要调用 DataSource 的keyBy()方法,传入的参数为 KeySelector 接口的实现类。

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为
KeyedStream,即”分区流“或”键控流“。

下列代码通过 Score 类的 className 属性对数据流进行分区。

public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. 加载数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz", 76),
                new Score("一班", "zzz1", 77),
                new Score("二班", "qqq", 94),
                new Score("二班", "qqq1", 76),
                new Score("三班", "fff", 99),
                new Score("三班", "fff1", 81)
                );
        // 3. 数据分区
        KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {
            @Override
            public String getKey(Score score) throws Exception {
                return score.getClassName();
            }
        });
        // 4. 打印数据
        keyedStream.print();
        // 5. 执行程序
        environment.execute();
    }
}

测试代码:可以看到,班级相同的对象在最前面所对应的分区号也相同,即进入一个分区进行处理。

在这里插入图片描述

简单聚合

有了分区流 KeyedStream 之后,我们就可以根据它进行数据的聚合操作了。Flink 内置实现了一些简单的聚合 API:

  • sum():对指定字段做叠加求和;

  • min():对指定字段求最小值;

  • max():对指定字段求最大值;

  • minBy():对指定字段求最小值并保留含最小字段的整条数据;

  • maxBy():对指定字段求最大值并保留含最大子段的整条数据;

测试代码:(注意打印数据时每条聚合算子单独使用,否则结果不容易观察)

public class SimpleAggregationDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. 加载数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz01", 76),
                new Score("一班", "zzz02", 89),
                new Score("一班", "zzz03", 84),
                new Score("二班", "qqq01", 94),
                new Score("二班", "qqq02", 74),
                new Score("二班", "qqq03", 64),
                new Score("三班", "fff01", 84),
                new Score("三班", "fff02", 94),
                new Score("三班", "fff03", 96)
                );
        // 3. 数据分区
        KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {
            @Override
            public String getKey(Score score) throws Exception {
                return score.getClassName();
            }
        });
        // 4. 打印数据
        keyedStream.max("score").print();
        keyedStream.min("score").print();
        keyedStream.sum("score").print();
        keyedStream.maxBy("score").print();
        keyedStream.minBy("score").print();
        // 5. 执行程序
        environment.execute();
    }
}

规约聚合 reduce

reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。该操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元
素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。

下述代码利用 reduce 算子以及 Tuple 数据结构,同时计算出了 max 与 sum 的值

public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. 加载数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz01", 76),
                new Score("一班", "zzz02", 89),
                new Score("一班", "zzz03", 84),
                new Score("二班", "qqq01", 94),
                new Score("二班", "qqq02", 74),
                new Score("二班", "qqq03", 64),
                new Score("三班", "fff01", 84),
                new Score("三班", "fff02", 94),
                new Score("三班", "fff03", 96)
        );
        // 3. 数据分区
        source.map(new MapFunction<Score, Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> map(Score score) throws Exception {
                return new Tuple3<String, Integer, Integer>(score.getClassName(), score.getScore(), score.getScore());
            }
        }).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, Integer> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1,
                                                           Tuple3<String, Integer, Integer> t2) throws Exception {
                return Tuple3.of(t1.f0, t1.f1 > t2.f1 ? t1.f1 : t2.f1, t1.f2 + t2.f2);
            }
        })
        // 4. 打印数据
                .print();
        // 5. 执行程序
        environment.execute();
    }
}

物理分区 Physical Partitioning

分区操作即将数据进行重新分布,传递到不同的分区中进行下一步的操作。比如之前介绍过的 keyBy 按键分区,就是通过指定 key 的哈希值对数据进行分区的。对于 keyBy 而言,无法确定数据到底分到哪个区,也不会考虑数据的分区是否均匀,它是一种逻辑分区(logical partitioning)。

若我们想精确的对数据进行分区,即真正的控制分区策略,那我们就需要物理分区策略(physical partitioning)。物理分区策略就类似消息队列和 Nginx 做的那样,常见的物理分区策略包括随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)、以及广播(Broadcast)。

随机分区 Random

通过调用 DataStream 对象的shuffle()方法,即可对数据进行“洗牌”,将数据随机分配到下游算子的并行任务当中。经过随机分区后,我们得到的仍然是一个 DataStream 对象。

下列代码将任务流从一个分区拓展为 4 个分区,并进行随机分区。

public class ShuffleDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        DataStreamSource<String> source = environment.addSource(new MySource());
        // 3. 打印输出
        source.shuffle().print("shuffle").setParallelism(4);
        // 4. 执行程序
        environment.execute();
    }
}

测试结果如下,可以看到,数据被随机分配到不同的分区执行打印的操作。

在这里插入图片描述

轮询分区 Round-Robin

轮询分区按照顺序依次将数据分发到不同的分区,通过调用 DataStream 对象的rebalance()方法即可实现数据的轮询分区。

下列代码将任务流从一个分区拓展为 4 个分区,并进行轮询分区。

public class RebalanceDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        DataStreamSource<String> source = environment.addSource(new MySource());
        // 3. 打印输出
        source.rebalance().print("rebalance").setParallelism(4);
        // 4. 执行程序
        environment.execute();
    }
}

代码输出结果如下,可以看到,数据按照分区编号 2-3-4-1 的顺序被依次分发到对应的分区。

在这里插入图片描述

重缩放分区 Rescale

重缩放分区与轮询分区类似,只不过两者的作用范围不同。轮询分区针对所有的上游任务和下游任务进行重新分区;而重缩放仅对部分上游任务和下游任务之间进行重新分区,节省更多资源。

当下游任务的数量是上游任务数量的整数倍时,rescale 的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

通过调用 DataStream 对象的rescale()方法可以实现数据的重缩放分区。

下列代码采用并行数据源的富函数,将奇数发送到索引为 1 的并行子任务;将偶数发送到索引为 0 的并行子任务。随后将任务流从 2 个分区拓展为 4 个分区,并进行重缩放分区。

public class RescaleDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        DataStreamSource<Integer> source = environment.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                for (int i = 0; i < 8; i++) {
                    if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        sourceContext.collect(i + 1);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2);
        // 3. 打印输出
        source.rescale().print("rescale").setParallelism(4);
        // 4. 执行程序
        environment.execute();
    }
}

执行结果如下,可以观察到,奇数数据在重分区时,被轮流分发到 3 和 4 两个子分区(对应索引为 1 的原子任务);而偶数数据在重分区时,被轮流分发到 1 和 2 两个子分区(对应索引为 0 的原子任务)。因此,重缩放分区实际上就是局部上的轮询分区。

在这里插入图片描述

广播 Broadcast

广播即将数据重新分发到所有的下游子任务当中,数据将存在在每一个子分区。通过调用 DataStream 对象的broadcast()方法,可以实现数据的广播。注意,该方法可能会导致数据的重复处理。

下列代码将任务流从一个分区拓展为 4 个分区,并进行数据广播分区。

public class BroadcastDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zqf", 100));
        // 3. 打印输出
        source.broadcast().print("broadcast").setParallelism(4);
        // 4. 执行程序
        environment.execute();
    }
}

代码结果如下,可以看到,数据被广播到 4 个子分区当中。

在这里插入图片描述

全局分区 Global

全局分区是一种特殊的分区方式。通过调用 DataStream 对象的global()方法,可以强行将下游任务的并行度变为 1,因此使用该操作需要特别谨慎,可能会对程序造成较大压力。

下列代码将任务流从 2 个分区通过全局分区的方式修改为 1 个分区。

public class GlobalDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(2);
        // 2. 加载数据源
        DataStreamSource<String> source = environment.addSource(new MySource());
        // 3. 打印输出
        source.global().print("global");
        // 4. 执行程序
        environment.execute();
    }
}

代码结果如下。

在这里插入图片描述

自定义分区 Custom

若 Flink 提供的分区策略均不能满足我们的需求,此时可以通过partitionCustom()方法来自定义分区策略,该方法在调用时需要传入两个参数:

  • 第一个参数为自定义分区器对象;

  • 第二个参数为应用自定义分区器的字段;

下列代码演示了如何通过数值的奇偶性进行分区。

public class CustomDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        DataStreamSource<String> source = environment.addSource(new MySource());
        // 3. 打印输出
        source.partitionCustom(new Partitioner<Integer>() {
            @Override
            public int partition(Integer key, int numPartitions) {
                return key % 2;
            }
        }, new KeySelector<String, Integer>() {
            @Override
            public Integer getKey(String s) throws Exception {
                return Integer.valueOf(s);
            }
        }).print().setParallelism(2);
        // 4. 执行程序
        environment.execute();
    }
}

代码结果如下,可以看到,所有奇数被分配到分区 2;所有偶数被分配到分区 1。

在这里插入图片描述

输出算子 Sink

Flink 作为数据处理框架,最终还需要把处理的结果写入外部系统。这个过程主要通过 Sink 算子实现,我们可以使用 Flink 提供的 Sink 算子,也可以自定义 Sink 算子。

之前的代码中我们一直使用的print()方法就是一种 Sink 算子,它表示将数据流写入标准控制台打印输出。

输出到文件

Flink 专门提供了一个流式文件系统的连接器 StreamingFileSink,其为流处理和批处理提供了统一的 Sink,可以将分区文件写入 Flink 支持的文件系统。

StreamingFileSink 支持行编码和批量编码两种方式,可以直接调用静态方法构建:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)

  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径
(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

下列代码创建了一个简单的文件 Sink,并制定了文件写入的滚动策略,滚动策略即我们开启新文件记录数据的标准,下列代码设置的滚动策略为:

  • 至少包含 15 分钟的数据;

  • 最近 5 分钟内没有收到新的数据;

  • 文件大小已经达到 1 GB;

public class SinkToFileDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz01", 76),
                new Score("二班", "qqq03", 64),
                new Score("三班", "fff01", 84)
        );
        // 3. 定义数据转换规则
        SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<Score, String>() {
            @Override
            public String map(Score score) throws Exception {
                return score.toString();
            }
        });
        // 4. 数据输出
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path("D:/work/my_project/FlinkDemo/src/main/resources"),
                new SimpleStringEncoder<>("utf-8")
        ).withRollingPolicy(
                DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build()
        ).build();
        outputStreamOperator.addSink(fileSink);
        // 5. 执行数据
        environment.execute();
    }
}

输出到 Redis

Bahir 项目为我们提供了 Flink 和 Redis 的连接器,首先我们导入连接器的依赖

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

连接器为我们提供了一个 RedisSink,创建对象时需要输入两个参数:

  • 第一个参数是 Jedis 连接池的配置信息;

  • 第二个参数需要传入 Redis 映射类的接口,需要我们写一个类继承 RedisMapper 接口,并实现相应的方法,定义数据转换成 Redis 数据格式的逻辑;

Redis 的映射类代码如下,此处我们保存的数据类型是 hash,表名设置为 score;每条数据的 name 字段作为 key,score 字段作为 value。

public class MyRedisMapper implements RedisMapper<Score> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "score");
    }

    @Override
    public String getKeyFromData(Score score) {
        return score.getName();
    }

    @Override
    public String getValueFromData(Score score) {
        return Integer.toString(score.getScore());
    }
}

完整的实现数据 sink 到 redis 的代码如下。

public class SinkToRedisDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz01", 76),
                new Score("二班", "qqq03", 64),
                new Score("三班", "fff01", 84)
        );
        // 3. 创建一个 redis 的连接
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("47.92.146.85")
                .setPort(6379)
                .build();
        // 4. 数据输出到 redis
        source.addSink(new RedisSink<Score>(jedisPoolConfig, new MyRedisMapper()));
        // 5. 执行数据
        environment.execute();
    }
}

运行代码,然后使用 redis-cli 连接 redis,查看结果。

root@2d1c2701081c:/data# redis-cli
127.0.0.1:6379> hgetall score
1) "fff01"
2) "84"
3) "zzz01"
4) "76"
5) "qqq03"
6) "64"
127.0.0.1:6379> 

输出到数据库 MySQL

本小节将介绍如何将数据 sink 到 MySQL 数据库进行持久化存储。

首先我们需要引入相应的依赖,如下所示。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.0.0-1.16</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

根据要存储的信息在数据库中创建相应的表结构。

在这里插入图片描述

编写代码,对数据库的连接、sql 语句进行配置,完整代码如下。

public class SinkToMySQLDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        DataStreamSource<Score> source = environment.fromElements(
                new Score("一班", "zzz01", 76),
                new Score("二班", "qqq03", 64),
                new Score("三班", "fff01", 84)
        );
        // 3. 数据输出
        source.addSink(JdbcSink.sink(
                "INSERT INTO score (class_name, name, score) VALUES (?, ?, ?)",
                (statement, score) -> {
                    statement.setString(1, score.getClassName());
                    statement.setString(2, score.getName());
                    statement.setInt(3, score.getScore());
                },
                JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://47.92.146.85:3306/SysManage?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true&rewriteBatchedStatements=true")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("XXXXXXXXX")
                        .build()
        ));
        // 5. 执行程序
        environment.execute();
    }
}

运行程序,连接数据库进行数据查看。

在这里插入图片描述

输出到 ElasticSearch

学习完 ES 相关知识后补充…

自定义输出算子 Sink

自定义输出算子需要我们编写自定义 Sink 类,并继承 RichSinkFunction 类,重写该类的invoke()方法和finish()方法,定义数据 sink 的处理逻辑。

下列自定义的 sink 算子将数据按照标准格式进行打印。

public class MySink extends RichSinkFunction<String> {

    /**
     * 每条读取到的记录都会调用该方法
     * @param value 获取到的值
     * @param context 可用于获取有关输入记录的附加数据的上下文
     */
    @Override
    public void invoke(String value, Context context) throws Exception {
        super.invoke(value, context);
        System.out.println("{data=" + value + "}");
    }

    /**
     * 任务完成后调用该方法
     */
    @Override
    public void finish() throws Exception {
        System.out.println("mission complete~");
    }
}

在使用自定义的 sink 算子时,只需要将实例化的对象传入addSink()方法即可。

public class MySinkDemo {
    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2. socketTextStream 配置数据源
        String address = "47.92.146.85";
        int port = 8080;
        DataStreamSource<String> source = environment.socketTextStream(address, port);
        // 3. 定义数据转换规则
        SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                return s;
            }
        });
        // 4. 数据输出
        outputStreamOperator.addSink(new MySink());
        // 5. 执行数据
        environment.execute();
    }
}

代码测试结果如下。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/149080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WPS配置mathtype

笔者电脑软件版本&#xff1a;WPS Office mathtype_7.4.8.0 请注意WPS适配的mathtype版本。 下载地址 mathtype_7.4.8.0&#xff1a;https://wwl.lanzoum.com/iuJDz0guffuh wps.vba.exe&#xff1a;https://handong1201.lanzouw.com/iX5GZtn70pe&#xff08;mathtype右侧选…

查找算法之线性查找

目录 线性查找 算法实现 算法实现 python C 复杂度分析 优点与缺点 线性查找 线性查找&#xff08;Linear Search&#xff09;是一种最基础的查找方法&#xff0c;其从数据结构的一端开始&#xff0c;依次访问每个元素&#xff0c;直到另一端后停止。 算法实现 线性查…

【凸优化】Gradient Descent and Newton Descent【梯度下降法和牛顿下降法】(含Python代码绘制等高线图)

文章目录Gradient Descent and Newton Descent一、下降法【Descent】二、梯度下降法【Gradient Descent】三、牛顿下降法【Newton Descent】四、示例Example五、ReferenceGradient Descent and Newton Descent 一、下降法【Descent】 首先介绍什么是下降法【Descent Methods】…

【Git】下载安装学习记录

【下载安装】 1.官网下载64位的安装包 2.双击安装&#xff0c;设置安装路径&#xff0c;一路next即可 使用学习指南&#xff1a;一、设置用户签名二、操作本地库的命令git initgit statusgit add 文件名git rm --cached 文件名git commit -m "注释" 文件名git refl…

在PyCharm中配置Anaconda环境

目录 1.创建项目 2.设置解释器 ​​​​​​ 3.常见问题 1.创建项目 2.设置解释器 3.常见问题 1.%matplotlib inline报错&#xff1a;Python关于%matplotlib inline 2.from d2l import torch as d2l报错&#xff1a; 报错原因&#xff0c;没有导入d2l这个包 如何导入呢&…

JDK8下载安装与配置环境变量(linux)

一、前言 基于Linux平台的MySQL安装文件有三个版本&#xff0c;分别是RPM软件、GenericBinaries软件包、源码包&#xff0c;具体介绍如下&#xff1a; ①RPM软件包是一种Linux平台下的安装文件&#xff0c;通过相关命令可以很方便地安装与卸载。该软件包分为两个&#xff1a;…

SVF Saber的实现

SVF Saber1.基本原理2.API类型定义3.Memory Leak Checker3.1.示例3.2.初始化3.3.程序切片3.4.路径约束求解3.5.报告错误4.总结5.参考文献Saber是一个静态漏洞检测器&#xff0c;最初集成到open64中&#xff0c;现已集成到SVF中&#xff0c;主要检测内存泄漏&#xff0c;DoubleF…

GAN的损失函数

1.GAN 在训练过程中&#xff0c;生成器和判别器的目标是相矛盾的&#xff0c;并且这种矛盾可以体现在判别器的判断准确性上。生成器的目标是生成尽量真实的数据&#xff0c;最好能够以假乱真、让判别器判断不出来&#xff0c;因此生成器的学习目标是让判别器上的判断准确性越来…

PCL点云处理之快速点特征直方图(FPFH)描述符(八十六)

PCL点云处理之快速点特征直方图(FPFH)描述符(八十六) 前言一、快速点特征直方图理论二、FPFH和PFH的区别二、实验过程1.代码2输入法线的NAN值检查用 OpenMP 加速 FPFH前言 对于具有 n 个点的给定点云 P,点特征直方图(见点特征直方图(PFH)描述符)的理论计算复杂度为 O (nk…

VueRouter路由的使用(上)

文章目录VueRouter路由的使用p21路由的原理_hash改变historyP22 认识vue-router路由的基本使用流程默认路径router-link路由懒加载路由的其他属性动态路由的基本匹配NotFound路由嵌套编程式导航VueRouter路由的使用 p21 路由的原理_hash改变 <!DOCTYPE html> <html …

JavaScript 运算符

文章目录JavaScript 运算符JavaScript 算术运算符JavaScript 赋值运算符用于字符串的 运算符对字符串和数字进行加法运算JavaScript 运算符 运算符 用于赋值。 运算符 用于加值。 运算符 用于给 JavaScript 变量赋值。 算术运算符 用于把值加起来。 实例 指定变量值&am…

Java基础学习笔记(十)—— 异常

异常1 异常概述2 异常处理方式2.1 JVM默认处理异常的方式2.2 throws方式处理异常2.3 throw抛出异常2.4 try-catch方式处理异常2.5 Throwable成员方法2.6 异常的练习3 自定义异常1 异常概述 异常&#xff1a;就是程序出现了不正常的情况。程序在执行的过程中&#xff0c;出现的…

Vivado综合设置之-gated_clock_conversion

本文验证-gated_clock_conversion设置为on或off时&#xff0c;给Schematic带来的差异。 -gated_clock_conversion设置为on时&#xff0c;用于移除门控时钟&#xff0c;门控时钟是由门电路而非专用时钟模块&#xff08;例如MMCM或PLL&#xff09;生成的时钟。 门控时钟会对设计…

Java-集合(3)

Vector集合类 1.Vector底层保存数据的也是一个对象数组&#xff1a;protected Object[] elementDate; 2.Vector是线程同步的&#xff0c;也就是线程安全Vactor的操作方法都带有synchronized修饰。以此可以进行安全线程保障&#xff0c;所以在开发中如果确认只有一个线程操作集…

dubbo学习笔记3(小d课堂)

dubbo高级特性 启动依赖检查 我们现在直接来启动我们的消费者&#xff1a; 它会报错。 我们 再去直接运行我们的消费者就不会报错。 我们也可以不在代码中去配置&#xff1a; 实际工作中比较建议使用这种方式。 dubbo.reference.check是配置所有的reference里的service都是f…

C语言:大小端

大小端 对于整型来说&#xff0c;内存中存放的是补码&#xff0c;补码是按大小端模式进行存储&#xff1a; 大端存储数据低位存在内存高地址&#xff0c;高位存在内存低地址小端存储数据低位存在内存低地址&#xff0c;高位存在内存高地址 记忆方法 小小小&#xff1a;数据 小…

前端组件库自定义主题切换探索-01

探索原因背景 首先自然是项目有需求&#xff0c;这是必须去做的原因 其次&#xff0c;是我们项目没有直接使用市面上现成的基于element-ui或者ant-design的第三方UI框架&#xff0c;比如avue&#xff0c;而是有着自己的UI组件库 第三&#xff0c;我们的组件库基于ant-design-v…

Win10下干净卸载VMware15.5

一、说明 虚拟机属于服务软件&#xff0c;在Windows10下卸载实属不易。下面请看我所总结的卸载文档。 二、如何彻底卸载VMware虚拟机 需要删除的部分很多&#xff0c;包括&#xff1a; 三、删除步骤一&#xff1a;需要禁用VM虚拟机服务 首先&#xff0c;因为VM的软件属于底层…

【自学Python】Python转义字符

文章来源嗨客网&#xff08;www.haicoder.net&#xff09; Python转义字符 Python转义字符教程 在编程中有一些字符是打不出来的&#xff0c;比如换行&#xff0c;提示音&#xff0c;下一个制表位等等&#xff0c;于是程序语言的开发者就设计了转义序列&#xff08;escape se…

4 机器学习之决策树

学习笔记自&#xff0c;慕课网 《Python3 入门人工智能》 https://https://coding.imooc.com/lesson/418.html#mid32776 决策树、异常检测、主成分分析 常用的分类方法&#xff1a; 逻辑回归的思路&#xff1a; 决策树的思路&#xff1a; 1. 决策树 1.1 ID3决策树&#x…