【入门Flink】- 07Flink DataStream API【万字篇】

news2024/10/3 8:22:38

DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对DataStream的各种转换。

代码基本上都由以下几部分构成:

image-20231104230137211

执行环境(Execution Environment)

1)创建执行环境StreamExecutionEnvironment

StreamExecutionEnvironment 类的对象,这是所有Flink程序的基础。调用这个类的静态方法,具体有以下三种

(1)getExecutionEnvironment √

最简单的方式,就是直接调用 getExecutionEnvironment 方法。这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()

(2)createLocalEnvironment

返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

(3)createRemoteEnvironment

返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
                            "host", // JobManager 主机名
                            1234, // JobManager 进程端口号
                            "path/to/jarFile.jar" // 提交给 JobManager 的JAR包
);

在获取到程序执行环境后,还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

2)执行模式(Execution Mode)

从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API 执行模式包括:流执行模式、批执行模式和自动模式

  • 流执行模式(Streaming)

    DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式

  • 批执行模式(Batch)

    专门用于批处理的执行模式。

  • 自动模式(AutoMatic)

    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加 execution.runtime-mode 参数,指定值为BATCH。

(2)通过代码配置

image-20231104231309108

实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

3)触发程序执行

Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”“懒执行”

同步执行 or 异步执行(了解)

execute() or executeAsync()

env.execute(); // 同步
env.executeAsync(); // 异步

exexute总结

  1. 默认 env.execute() 触发一个Flink job
    • 一个main方法可以调用多个execute,但是没有意义,指定一个就会阻塞住(同步)
  2. env.executeAsync(),异步,不阻塞
    • 一个main方法里 executeAsync()个数 = 生成Flink job数

源算子(Source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source 是整个处理程序的输入端。

在 Flink1.12 以前,旧的添加 source 的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入的参数是一个“源函数”(source function),需要实现SourceFunction 接口。

从 Flink1.12 开始,主要使用流批统一的新 Source 架构fromSource()方法):

DataStreamSource<String> stream = env.fromSource();

Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也实现了对应的Source,通常情况下足以应对实际需求。

1)从集合中读取数据

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .fromElements(1, 2, 3, 4, 5)  // 直接填写元素
//                .fromCollection(Arrays.asList(1, 2, 3, 4, 5))  // 从集合读取元素
                .print();

        env.execute();

2)从文件读取数据

通常情况,会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

读取文件,需要添加文件连接器依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                new TextLineInputFormat(),
                // 文件路径,相对路径不行就用绝对路径
                Path.fromLocalFile(new File("D:\\workspace\\IdeaProjects\\second-java\\day5-flink\\src\\main\\resources\\input\\words.txt"))

        ).build();
        env.
                fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource")
                .print();

        env.execute();

说明

  • 参数可以是目录,也可以是文件;还可以从 HDFS 目录下读取,使用路径hdfs://…;
  • 路径可以是相对路径,也可以是绝对路径;
  • 相对路径是从系统属性 user.dir 获取路径:idea 下是project 的根目录,standalone模式下是集群节点根目录。

3)从 Socket 读取数据

读取 socket 文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStreamSource<String> lineStream = env.socketTextStream("localhost", 7777);

4)从 Kafka 读取数据

Flink 官方提供了连接工具 flink-connector-kafka , 直接实现了一个消费者FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

引入Kafka 连接器的依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

代码如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("124.222.253.33:9092") // ip:port
                .setTopics("topic_1") // topic
                .setGroupId("group1") // 消费者组
                // latest 将偏移初始化为最新偏移的OffsetInitializer
                // earliest 偏移初始化为最早可用偏移的OffsetInitializer
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 仅Value反序列化

        env
                .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source")
                .print();
        env.execute();

5)从数据生成器读取数据

Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

需要导入依赖:

  		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-datagen</artifactId>
            <version>${flink.version}</version>
        </dependency>

代码如下:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                // 数据转换生成函数
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                // 从0开始递增至这个数
                Long.MAX_VALUE,
                // 数据生产效率,每秒10个
                RateLimiterStrategy.perSecond(10), Types.STRING

        );
        env
                .fromSource(dataGeneratorSource,
                        WatermarkStrategy.noWatermarks(), "dataGenerator")
                .print();

        env.execute();

Flink 支持的数据类型

1)Flink 的类型系统

Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2)Flink 支持的数据类型

对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部,Flink 对支持不同的类型进行了划分,这些类型可以在 Types 工具类中找到:

(1)基本类型

所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和BigInteger。

(2)数组类型

包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

(3)复合数据类型

  • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是Java API 的一部分。最多25 个字段,也就是从 Tuple0~Tuple25,不支持空字段。
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
  • POJO:Flink 自定义的类似于 Java bean 模式的类。

Flink 对 POJO 类型的要求如下:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是公有(public)的
  • 所有属性的类型都是可以序列化的

(4)辅助类型

Option、Either、List、Map 等。

(5)泛型类型(GENERIC)

Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的

3)类型提示(Type Hints)

有时,需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

或者,使用TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。仍通过.returns()方法,明确地指定转换之后的DataStream 里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

转换算子(Transformation)

开始之前,准备 WaterSensor 作为数据模型。

字段分别代表如下含义:

字段名数据类型说明
idString水位传感器类型
tsLong传感器记录时间戳
vcInteger水位记录
public class WaterSensor {

    public String id;
    public Long ts;
    public Integer vc;

    public WaterSensor() {
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public Integer getVc() {
        return vc;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
}

数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的 DataStream。

转换算子一般有三种写法:

  1. 匿名类
  2. lambda表达式
  3. 实现函数接口

1)基本转换算子(map/ filter/ flatMap)

(1)映射(map)

“一一映射”,消费一个元素就产出一个元素。

需求:提取 WaterSensor 中的id 字段

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1L, 1),
                new WaterSensor("sensor_2", 2L, 2)
        );

        stream.map(WaterSensor::getId)
                .print();

        env.execute();
    }

(2)过滤(filter)

需求:将数据流中传感器 id 为 sensor_1 的数据过滤出来

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1L, 1),
                new WaterSensor("sensor_2", 2L, 2)
        );

        stream.filter(el -> "sensor_1".equals(el.getId()))
                .print();

        env.execute();

(3)扁平映射(flatMap)

flatMap 一进多出

需求:如果输入的数据是 sensor_1,只打印 vc;如果输入的数据是sensor_2,既打印 ts 又打印 vc。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1L, 1),
                new WaterSensor("sensor_2", 2L, 2)
        );

        stream.flatMap(
                new FlatMapFunction<WaterSensor, String>() {
                    @Override
                    public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                        if ("sensor_1".equals(value.getId())) {
                            out.collect(value.getVc().toString());
                        } else if ("sensor_2".equals(value.getId())) {
                            out.collect(value.getTs().toString());
                            out.collect(value.getVc().toString());
                        }
                    }
                }
        ).print();

        env.execute();

输出结果:

image-20231105170456761

map如何控制一进一出:

​ 使用return

flatmap怎么控制一进多出

​ 通过Collector输出,调用几次就输出几条(向下游输送数据)

2) 聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce 中的reduce操作。

(1 )按键分区(keyBy)

在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

id 作为 key 做一个分区操作,代码实现如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1L, 1),
                new WaterSensor("sensor_1", 2L, 2),
                new WaterSensor("sensor_2", 2L, 2),
                new WaterSensor("sensor_3", 3L, 3)
        );

        KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
        keyedStream.print();
        env.execute();

运行结果如下:

image-20231105174334517

keyBy:

  1. 返回的是 KeyedStream ,键控流
  2. 不是转换算子,只是对数据进行重分区,不能设置并行度

keyBy分组 和 分区 的关系:

  1. keyBy对数据分组(相同key的数据被分为一组),同时保证 相同的key的数据 在同一个分区
  2. 分区):一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)

KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。

(2)简单聚合(sum/min/max/minBy/maxBy)

有了按键分区的数据流 KeyedStream,就可以基于它进行聚合操作了。

Flink内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。

  • min():在输入流上,对指定的字段求最小值。

  • max():在输入流上,对指定的字段求最大值。

  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。

  • maxBy():同上

聚合方法调用时,也需要传入参数,聚合指定的字段。指定字段的方式有两种:指定位置,和指定名称。【指定位置索引,适用于 Tuple类型,POJP不行】

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);

      DataStreamSource<WaterSensor> stream = env.fromElements(
              new WaterSensor("sensor_1", 1L, 1),
              new WaterSensor("sensor_1", 2L, 2),
              new WaterSensor("sensor_2", 2L, 2),
              new WaterSensor("sensor_3", 3L, 3)
      );

      KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
      keyedStream
              .maxBy("vc") // 指定字段名称
              .print();
      env.execute();

keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。

一个聚合算子,会为每一个 key 保存一个聚合的值,在Flink 中把它叫作“状态”(state)

每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的。

使用聚合算子,应该只用在含有有限个 key 的数据流上。

(3)归约聚合(reduce)

案例:使用 reduce 实现 max 和 maxBy 的功能。

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1L, 1),
                new WaterSensor("sensor_1", 2L, 2),
                new WaterSensor("sensor_2", 2L, 2),
                new WaterSensor("sensor_3", 3L, 3)
        );

        KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
        keyedStream
                .reduce(new ReduceFunction<WaterSensor>() {
                    // 同组元素规约处理
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {

                        System.out.println("value1: " + value1);
                        System.out.println("value2: " + value2);
                        int maxVc = Math.max(value1.getVc(), value2.getVc());
                        if (value1.getVc() > value2.getVc()) {
                            value1.setVc(maxVc);
                            return value1;
                        } else {
                            value2.setVc(maxVc);
                            return value2;
                        }
                    }
                })
                .print();
        env.execute();

reduce 输入类型 = 输出类型,类型不变

每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出

reduce方法中的两个参数

​ value1:之前计算结果,有状态

value2:现在来的数据

reduce 算子也应该作用在一个有限 key 的流上。

用户自定义函数(UDF)

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类。

1)函数类

函数类可以传参数更加灵活

public class FilterIdFunction implements FilterFunction<WaterSensor> {
    
    private final String id;

    public FilterIdFunction(String id) {
        this.id = id;
    }

    @Override
    public boolean filter(WaterSensor value) throws Exception {
        return id.equals(value.getId());
    }
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的Flink 函数类都有其 Rich版本

富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • open()方法,是 Rich Function 的初始化方法,每个子任务启动时,调用一次。
  • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
    • 如果是Flink程序异常挂掉,不会调用close
    • 正常调用cancel命令,可以close

代码实现:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env
                .fromElements(1, 2, 3, 4)
                .map(new RichMapFunction<Integer, Integer>() {
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
                    }

                    @Override
                    public Integer map(Integer integer)throws Exception {
                        return integer + 1;
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
                    }
                })
                .print();
        env.execute();

物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)轮询分配(Round-Robin)重缩放(Rescale)广播(Broadcast)

1)随机分区(shuffle)

将数据随机地分配到下游算子的并行任务中去。可以打乱数据。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);
        stream.shuffle().print();
        env.execute();

2)轮询分区(Round-Robin)

雨露均沾,下游算子一个一个来(所有算子)

stream.rebalance()

3)重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用Round-Robin 算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。(部分算子)

stream.rescale()

4)广播(broadcast)

将输入数据复制并发送到下游算子的所有并行任务中去。

stream.broadcast()

5)全局分区(global)

一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1。

stream.global()

6)自定义分区(Custom)

Flink 内置所有分区策略都不能满足用户的需求时,可以通过使用partitionCustom()方法来自定义分区策略。

(1)自定义分区器

public class MyPartitioner implements Partitioner<String> {

    // numPartitions: 子任务数量(分区数量)
    @Override
    public int partition(String key, int numPartitions) {
        return Integer.parseInt(key) % numPartitions;
    }
}

(2)使用自定义分区

public class PartitionCustomDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<String> socketDS = env.socketTextStream("124.222.253.33", 7777);
        DataStream<String> myDS = socketDS
                .partitionCustom(
                        new MyPartitioner(),
                        value -> value);
        myDS.print();
        env.execute();
    }
}

八种分区器

Flink提供了8种分区器,7种内置,1种自定义

image-20231106001644153

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

1)简单实现

使用filter筛选多次,将原始数据流stream复制多份,然后对每一份分别做筛选;不够高效。

2)侧输出流(process)

process算子

调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个**“输出标签”(OutputTag)**,指定了侧输出流的 id 和类型。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
		// 定义输出标签
        OutputTag<WaterSensor> s1 = new OutputTag<WaterSensor>("s1", Types.POJO(WaterSensor.class)) {
        };
        OutputTag<WaterSensor> s2 = new OutputTag<WaterSensor>("s2", Types.POJO(WaterSensor.class)) {
        };

        // 返回的都是主流
        SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                if ("s1".equals(value.getId())) {
                    ctx.output(s1, value);
                } else if ("s2".equals(value.getId())) {
                    ctx.output(s2, value);
                } else {
                    // 主流
                    out.collect(value);
                }
            }
        });

        // 打印主流
        ds1.print("主流数据");
        // 通过主流获取侧边流
        SideOutputDataStream<WaterSensor> sideOutput1 = ds1.getSideOutput(s1);
        SideOutputDataStream<WaterSensor> sideOutput2 = ds1.getSideOutput(s2);
        sideOutput1.printToErr("测流s1");
        sideOutput2.printToErr("测流s2");

        env.execute();

合流

1)联合(Union)

联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。(可以将多条流联合在一起)

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);
        DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");
        // ds3 类型不一致,不能联合
        ds1.union(ds2).print();

        env.execute();

image-20231105224515470

2)连接(Connect)

(1)简单使用

连接一次只能连接2条流,流的数据类型可以不一样

代码实现:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);

        DataStreamSource<String> source2 = env.fromElements("a", "b", "c");
        ConnectedStreams<Integer, String> connect = source1.connect(source2);
        /*
          connect:
              1、一次只能连接 2 条流
              2、流的数据类型可以不一样
              3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的
         */
        connect.map(new CoMapFunction<Integer, String, String>() {
                    @Override
                    public String map1(Integer value) throws Exception {
                        return "来源于数字流:" + value.toString();
                    }

                    @Override
                    public String map2(String value) throws Exception {
                        return "来源于字母流:" + value;
                    }
                })
                .print();

        env.execute();

(2)CoProcessFunction

需求:连接两条流,输出能根据 id 匹配上的数据(类似inner join 效果)

注意:connectedStreams.keyBy(keySelector1, keySelector2);

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(2);
        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        // 定义HashMap , 缓存来过的数据,key=id,value=list<数据>
        Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
        Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
        // 将合流元素,按key分到同一分区才能得到如下结果 ***
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(v -> v.f0, v1 -> v1.f0);
        connectKey.process(
                        new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                            @Override
                            public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                                Integer id = value.f0;

                                if (!s1Cache.containsKey(id)) {
                                    List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                                    s1Values.add(value);
                                    s1Cache.put(id, s1Values);
                                } else {
                                    s1Cache.get(id).add(value);
                                }

                                if (s2Cache.containsKey(id)) {
                                    for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                        out.collect("s1:" + value + "<--------->s2:" + s2Element);
                                    }
                                }

                            }

                            @Override
                            public void processElement2
                                    (Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.
                                            Context ctx, Collector<String> out) throws Exception {
                                Integer id = value.f0;
                                if (!s2Cache.containsKey(id)) {
                                    List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                                    s2Values.add(value);
                                    s2Cache.put(id, s2Values);
                                } else {
                                    s2Cache.get(id).add(value);
                                }

                                if (s1Cache.containsKey(id)) {
                                    for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                        out.collect("s1:" + s1Element + "<--------->s2:" + value);
                                    }
                                }

                            }
                        }
                ).

                print();

        env.execute();

输出算子(Sink)

1)连接到外部系统

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

Flink1.12 开始,同样重构了 Sink 架构,使用 .sinkTo()方法实现。

2)输出到文件

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

代码实现:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每个目录中,都有并行度个数的文件在写入
        env.setParallelism(2);

        // 【必须开启】 checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000,
                CheckpointingMode.EXACTLY_ONCE);

        // 数据生成器
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource,
                WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("d:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("li-").withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new
                        DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略: 1 分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new
                                        MemorySize(1024 * 1024))
                                .build()
                )
                .build();

        dataGen.sinkTo(fieSink);

        env.execute();

3)输出到 Kafka

添加 Kafka 连接器依赖。

代码实现:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是【精准一次,必须开启】 checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);

        /*
          Kafka Sink:
           注意:如果要使用 【精准一次】 写入 Kafka,需要满足以下条件,缺一不可
          1、开启 checkpoint
          2、设置事务前缀
          3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的 15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口
                .setBootstrapServers("124.222.253.33:9092")
                // 指定序列化器:指定 Topic 名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到 kafka 的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("li-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15 分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        sensorDS.sinkTo(kafkaSink);
        env.execute();

自定义序列化器,实现带 key 的 record:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);
        
        /*
         如果要指定写入 kafka 的 key,可以自定义序列化器:
          1、实现 一个接口,重写 序列化 方法
          2、指定 key,转成 字节数组
          3、指定 value,转成 字节数组
          4、返回一个 ProducerRecord 对象,把 key、value 放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("124.222.253.33:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("li-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        sensorDS.sinkTo(kafkaSink);
        env.execute();

4)输出到 MySQL(JDBC)

(1)添加 MySQL 驱动

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>

(2)jdbc连接器

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
        </dependency>

官网案例:

public class JdbcSinkExample {

    static class Book {
        public Book(Long id, String title, String authors, Integer year) {
            this.id = id;
            this.title = title;
            this.authors = authors;
            this.year = year;
        }

        final Long id;
        final String title;
        final String authors;
        final Integer year;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
            写入 mysql
            1、只能用老的 sink 写法: addSink
            2、JDBCSink 的 4 个参数:
                第一个参数: 执行的 sql,一般就是 insert into
                第二个参数: 预编译 sql, 对占位符填充值
                第三个参数: 执行选项 ---》 攒批、重试
                第四个参数: 连接选项 ---》 url、用户名、密码
         */
        env.fromElements(
                new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
                new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
                new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
                new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
        ).addSink(
                JdbcSink.sink(
                        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
                        (statement, book) -> {
                            statement.setLong(1, book.id);
                            statement.setString(2, book.title);
                            statement.setString(3, book.authors);
                            statement.setInt(4, book.year);
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000) // 批次的大小:条数
                                .withBatchIntervalMs(200) // 批次的时间
                                .withMaxRetries(5) // 重试次数
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF8")
                                .withUsername("root")
                                .withPassword("root")
                                .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                                .build()
                ));

        env.execute();
    }
}

5)自定义Sink输出

实现RichSinkDunction 抽象类,自定义逻辑比较麻烦,不建议。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1179911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【启扬方案】基于RK3568核心板的激光打标机应用解决方案

激光打标机是一种利用激光技术进行标记和刻字的设备&#xff0c;作为激光技术应用的一个细分领域&#xff0c;是最早引入工业市场的一类激光装备&#xff0c;它采用激光束在工件表面进行刻印、打标&#xff0c;常用于工业生产中的物料标识、产品追溯、防伪标记等应用&#xff0…

centos7安装mysql-阿里云服务器

1.背景 2.安装 2.1.下载安装包 wget https://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm2.2.安装mysql rpm -ivh mysql57-community-release-el7-8.noarch.rpm 3.安装mysql服务 3.1.进入目录 首先进入cd /etc/yum.repos.d/目录 cd /etc/yum.repos.d/ 3.…

Netty 高性能原因之一 采用了高性能的NIO 模式

java IO简介 I/O 全称Input/Output&#xff0c;即输入/输出&#xff0c;通常指数据在内部存储器和外部存储器或其他周边设备之间的输入/输出。 涉及 I/O 的操作&#xff0c;不仅仅局限于硬件设备的读写&#xff0c;还要网络数据的传输。无论是从磁盘中读写文件&#xff0c;还…

【广州华锐互动】VR综合布线虚拟实验教学系统

随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术已经逐渐渗透到各个领域&#xff0c;为人们的生活和工作带来了前所未有的便利。在建筑行业中&#xff0c;VR技术的应用也日益广泛&#xff0c;尤其是在综合布线方面。 广州华锐互动开发的VR综合布线虚拟实…

百度上线“文心一言”付费版本,AI聊天机器人市场竞争加剧

原创 | 文 BFT机器人 百度不愧是我国AI技术领域的先行者&#xff0c;每年致力于人工智能领域取得技术产品的突破和创新。据爆料称&#xff0c;百度的文心一言有突破了新境界&#xff0c;开创了文心大模型4.0会员版本。从线上的to C产品到试水商业化&#xff0c;百度都是争先走…

Python的requests库爬取商城优惠券

首先&#xff0c;我们需要了解要抓取的网页的结构和数据格式。在这个例子中&#xff0c;我们使用Python的requests库来发送HTTP请求&#xff0c;并使用BeautifulSoup库来解析HTML内容。 import requests from bs4 import BeautifulSoup然后&#xff0c;我们需要使用requests库的…

LeetCode | 160. 相交链表

LeetCode | 160. 相交链表 O链接 我们这里有两个问题&#xff0c;一是判断是否相交&#xff0c;二是找交点 思路一&#xff1a; 暴力求解 A链表所有节点依次取B链表找一遍&#xff08;时间复杂度是O(N^2)&#xff09; struct ListNode *getIntersectionNode(struct ListNod…

QT not in executable format:file truncated

今天在调研串口打印机的时候出现的&#xff0c;串口打印机有sdk&#xff0c;自己qt的编辑器用的 MinGW 64&#xff0c;编译出现次错误 出现这个错误&#xff0c;主要是sdk和编译器的版本位数不一致。 修改方法&#xff1a;把MinGW64 改为MinGW32&#xff0c;不过这个根据使用的…

为什么说制造企业需要部署MES管理系统

在数字化浪潮席卷的今天&#xff0c;每个企业都期望通过新技术、新模式来优化自身的运营。这其中&#xff0c;MES管理系统成为了不少企业的首选。那么&#xff0c;为何企业需要部署MES管理系统&#xff1f;又该如何搭建MES管理系统呢&#xff1f; 一、企业缘何钟情于MES系统&am…

LeetCode-20-有效的括号

1.我的暴力解法 class Solution {public boolean isValid(String s) {Stack<Character> stknew Stack<Character>();int i0;//奇数直接不可能是匹配的if(s.length()%2!0)return false;for (;i<s.length();i){if(s.charAt(i)(){stk.push(();}else if(s.charAt(i…

NVM安装node后提示没有对应npm包(即:无法将“npm”项识别为 cmdlet、函数、脚本文件)

背景 windows11 node版本降低到v12.22.12后&#xff0c;执行&#xff1a;nvm -v npm -v npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果 包括路径&#xff0c;请确保路径正确&#xff0c;然后再试一次。 所在位置 …

【bug-maven】(一)java: 错误: 不支持发行版本 5 (二):java: 错误: 无效的源发行版:15

【bug-maven】&#xff08;一&#xff09;java: 错误: 不支持发行版本 5 &#xff08;二&#xff09;&#xff1a;java: 错误: 无效的源发行版&#xff1a;15 &#xff08;一&#xff09;java: 错误: 不支持发行版本 5 报错截图&#xff1a; 出错原因&#xff1a; 打开Projec…

GoLong的学习之路(番外)如何使用依赖注入工具:wire

我为什么要直接写番外呢&#xff1f;其原因很简单。项目中会使用&#xff0c;其实在这里大家就可以写一些项目来了。 依赖注入的工具本质思想其实都大差不差。无非控制反转和依赖注入。 文章目录 控制反转为什么需要依赖注入工具 wire的概念提供者&#xff08;provider&#x…

易云维®工厂能耗管理平台系统方案,保证运营质量,推动广东制造企业节能减排

我国《关于完整准确全面贯彻新发展理念推进碳达峰碳中和工作的实施意见》出台&#xff0c;提出了推进碳达峰碳中和工作的总体目标。到2025年&#xff0c;广东具备条件的地区、行业和企业率先实现碳达峰&#xff0c;为全省实现碳达峰、碳中和奠定坚实基础&#xff1b;2030年前实…

51单片机+SIM800C(GSM模块)实现短信发送功能

一、前言 本项目利用51单片机和SIM800C GSM模块实现短信发送功能。短信作为一种广泛应用的通信方式&#xff0c;在许多领域具有重要的作用&#xff0c;如物联网、安防系统、远程监控等。通过将51单片机与SIM800C GSM模块相结合&#xff0c;可以实现在各种应用场景下的短信通信…

美国光量子计算解决方案公司QCI正式开启量子计算商业化道路!

​&#xff08;图片来源&#xff1a;网络&#xff09; Quantum Computing Inc&#xff08;QCI&#xff09;是一家率先实现上市的全栈式光量子计算解决方案公司&#xff0c;近日&#xff0c;美国量子计算公司QCI&#xff08;纳斯达克代码: QUBT&#xff09;宣布&#xff0c;其在…

Xilinx DDR3 MIG系列——内存基本概念及原理

本节目录 一、内存简介 (1)内存基本存储原理 (2)内存频率 (3)DDR数据预取技术(Prefetch) (4)DDR3工作流程 (5)DDR3控制器的特点 二、内存基本参数 (1)物理Bank (2)逻辑Bank (3)内存芯片容量 (4)行激活命令—tRCD (5)列选通—CL (6)写入延迟—tDQSS (7)行预充电有效周期—tRP (8…

同样是BGA扇出,为什么别人设计出来的性能就是比你好!

高速先生成员--黄刚 高速先生经常会说一句话&#xff0c;那就是对于信号质量的优化是无极限的&#xff0c;这里说的优化&#xff0c;其实说的就是PCB的设计优化。首先肯定的是&#xff0c;不同的设计工程师去做同样一块PCB板的设计&#xff0c;做出来的肯定都不会完全一样。那不…

DVWA靶场SQL注入

本次注入的是DVWA靶场的SQL injection 1.判断是字符型注入还是数字型注入&#xff0c;构造SQL语句 1 and 12 由此可以判断出为字符型注入 2.考虑闭合方式&#xff0c;先随便丢一个单引号试试看看报错提示 You have an error in your SQL syntax; check the manual that cor…

C语言C位出道心法(二):结构体

C语言C位出道心法(一):基础语法 C语言C位出道心法(二):结构体 一:C语言结构体认知 忙着去耍帅,后期补充完整...........