Flink--API 之Transformation-转换算子的使用解析

news2025/1/15 22:34:00

目录

一、常用转换算子详解

(一)map 算子

(二)flatMap 算子

(三)filter 算子

(四)keyBy 算子

元组类型

POJO

(五)reduce 算子

二、合并与连接操作

(一)union 算子

(二)connect 算子

三、侧输出流(Side Outputs)

四、总结


        在大数据处理领域,Apache Flink 凭借其强大的流处理和批处理能力备受青睐。而转换算子作为 Flink 编程模型中的关键部分,能够对数据进行灵活多样的处理操作,满足各种复杂业务场景需求。本文将深入介绍 Flink 中常见的转换算子,包括 map、flatMap、filter、keyBy、reduce 等,并结合详细代码示例讲解其使用方法,同时探讨 union、connect 等合并连接操作以及侧输出流等特性,帮助读者全面掌握 Flink 转换算子的精髓。

一、常用转换算子详解

(一)map 算子

功能概述
        map 算子主要用于对输入流中的每个元素进行一对一的转换操作,基于用户自定义的映射逻辑将输入元素转换为新的输出元素。

代码示例
假设我们有一份访问日志数据,格式如下:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

        我们要将其转换为一个 LogBean 对象,包含访问 ip、用户 userId、访问时间戳 timestamp、访问方法 method、访问路径 path 等字段。

假如需要用到日期工具类,可以导入lang3包:

<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
</dependency>

以下是代码实现:


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;


public class MapDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");


        //3. transformation-数据处理转换
        // 此处也可以将数据放入到tuple中,tuple可以支持到tuple25
        DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split(" ");
                String ip = arr[0];
                int userId = Integer.valueOf(arr[1]);
                String createTime = arr[2];
                // 如何将一个时间字符串变为时间戳
                // 17/05/2015:10:05:30
                /*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = simpleDateFormat.parse(createTime);
                long timeStamp = date.getTime();*/
                // 要想使用这个common.lang3 下的工具类,需要导入包
                Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");
                long timeStamp = date.getTime();

                String method = arr[3];
                String path = arr[4];

                LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);
                return logBean;
            }
        });
        //4. sink-数据输出
        mapStream.print();


        //5. execute-执行
        env.execute();
    }
}

        在上述代码中,通过 map 函数的自定义逻辑,将每行日志字符串按空格拆分后,进行相应字段提取与时间戳转换,最终封装成 LogBean 对象输出。

第二个版本:

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;


@Data
@AllArgsConstructor
class LogBean{
    private String ip;      // 访问ip
    private int userId;     // 用户id
    private long timestamp; // 访问时间戳
    private String method;  // 访问方法
    private String path;    // 访问路径
}
public class Demo04 {

    // 将数据转换为javaBean
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");

        //3. transformation-数据处理转换
        SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split("\\s+");

                //时间戳转换  17/05/2015:10:06:53
                String time = arr[2];
                SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = format.parse(time);
                long timeStamp = date.getTime();
                return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);
            }
        });

        //4. sink-数据输出
        map.print();

        //5. execute-执行
        env.execute();
    }
}

(二)flatMap 算子

功能概述
        flatMap 算子可以将输入流中的每个元素转换为零个、一个或多个输出元素。它适用于需要对输入元素进行展开、拆分等操作的场景。

代码示例
        假设有数据格式为“张三,苹果手机,联想电脑,华为平板”这样的文本文件,我们要将其转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等形式。

flatmap.log文件如:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");
        //3. transformation-数据处理转换
        DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                //张三,苹果手机,联想电脑,华为平板
                String[] arr = line.split(",");
                String name = arr[0];
                for (int i = 1; i < arr.length; i++) {
                    String goods = arr[i];
                    collector.collect(name+"有"+goods);
                }
            }
        });
        //4. sink-数据输出
        flatMapStream.print();

        //5. execute-执行
        env.execute();
    }
}

        这里在 flatMap 函数内部,按逗号拆分每行数据,遍历拆分后的数组(除第一个元素作为名称外),通过 collector 将新组合的字符串收集输出。

(三)filter 算子

功能概述
        filter 算子依据用户定义的过滤条件,对输入流元素进行筛选,满足条件的元素继续向下游传递,不满足的则被过滤掉。

代码示例
        读取map算子中的访问日志数据,过滤出访问 IP 是 83.149.9.216 的访问日志,代码实现如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Date;

public class FilterDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");
        // 数据处理转换
        DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                String ip = line.split(" ")[0];
                return ip.equals("83.149.9.216");
            }
        });
        // 数据输出
        filterStream.print();
        // 执行
        env.execute();
    }
}

        在 filter 函数中,通过拆分每行日志获取 IP 地址,并与目标 IP 进行比较决定是否保留该条日志数据。

(四)keyBy 算子

功能概述
        keyBy 算子在流处理中用于对数据按照指定的键进行分组,类似于 SQL 中的 group by,后续可基于分组进行聚合等操作,支持对元组类型 POJO 类型数据按不同方式指定分组键。

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)

//用字段表达式
wordAndOne.keyBy(v -> v.f0)

多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);

//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
        return Tuple2.of(value.f0, value.f1);
    }
});

类似于sql中的group by

select sex,count(1) from student group by sex;

group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组 

POJO

public class PeopleCount {
    private String province;
    private String city;
    private Integer counts;
    public PeopleCount() {
    }
    //省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
        return Tuple2.of(value.getProvince(), value.getCity());
    }
});

代码示例
        假设有数据表示不同球类的数量,格式为 Tuple2(球类名称,数量),如 Tuple2.of("篮球", 1) 等,需求是统计篮球、足球各自的总数量。代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyByDemo {

    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 加载数据
        DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        // 数据处理转换
        /*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        });*/
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);
        keyedStream.sum(1).print();
        // 执行
        env.execute();
    }
}

这里通过 lambda 表达式指定 Tuple2 的第一个元素(球类名称)作为分组键,对数据分组后使用 sum 聚合统计每种球类的数量总和。

pojo演示

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 14:32:52
 **/
public class Demo07 {

    @Data
    @AllArgsConstructor
    static class Ball{
        private String ballName;
        private int num;
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出

        // 以下演示数据是pojo
        DataStreamSource<Ball> ballSource = env.fromElements(
                new Ball("篮球", 1),
                new Ball("篮球", 2),
                new Ball("篮球", 3),
                new Ball("足球", 3),
                new Ball("足球", 2),
                new Ball("足球", 3)
        );
        ballSource.keyBy(ball -> ball.getBallName()).print();
        ballSource.keyBy(new KeySelector<Ball, String>() {
            @Override
            public String getKey(Ball ball) throws Exception {
                return ball.getBallName();
            }
        });

        //5. execute-执行
        env.execute();
    }
}

(五)reduce 算子

功能概述
        reduce 算子可对一个数据集或一个分组进行聚合计算,将多个元素逐步合并为一个最终元素,常用于求和、求最值等场景,sum 底层其实也是基于 reduce 实现。

代码示例
        读取访问日志,统计每个 IP 地址的访问 PV 数量,代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.Date;


public class ReduceDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");


        //3. transformation-数据处理转换
        // 此处也可以将数据放入到tuple中,tuple可以支持到tuple25
        DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split(" ");
                String ip = arr[0];
                int userId = Integer.valueOf(arr[1]);
                String createTime = arr[2];
                // 如何将一个时间字符串变为时间戳
                // 17/05/2015:10:05:30
                /*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = simpleDateFormat.parse(createTime);
                long timeStamp = date.getTime();*/
                // 要想使用这个common.lang3 下的工具类,需要导入包
                Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");
                long timeStamp = date.getTime();

                String method = arr[3];
                String path = arr[4];

                LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);
                return logBean;
            }
        });

        DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(LogBean logBean) throws Exception {
                return Tuple2.of(logBean.getIp(), 1);
            }
        });

        //4. sink-数据输出

        KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);

        // sum的底层是 reduce
       // keyByStream.sum(1).print();
        //  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]
        keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                // t1 => ("10.0.0.1",10)
                // t2 => ("10.0.0.1",1)
                return Tuple2.of(t1.f0,  t1.f1 + t2.f1);
            }
        }).print();



        //5. execute-执行
        env.execute();
    }
}

        先将日志数据处理成包含 IP 和计数 1 的 Tuple2 格式,按 IP 分组后,在 reduce 函数中对相同 IP 的计数进行累加,得到每个 IP 的访问 PV 数。

二、合并与连接操作

(一)union 算子

功能概述
        union 算子能够合并多个同类型的流,将多个 DataStream 合并成一个 DataStream,但注意合并时流的类型必须一致,且不会对数据去重

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UnionConnectDemo {

    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 加载数据
        DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");
        DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");
        // 合并流
        DataStream<String> unionStream = stream1.union(stream2);
        unionStream.print();
        // 执行
        env.execute();
    }
}

上述代码将两个包含字符串元素的流进行合并输出。

(二)connect 算子

功能概述
        connect 算子可连接 2 个不同类型的流,连接后形成 ConnectedStreams,内部两个流保持各自的数据和形式独立,之后需通过自定义处理逻辑(如 CoMapFunction 等)处理后再输出,且处理后的数据类型需相同。

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class UnionConnectDemo {

    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 加载数据
        DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");
        DataStreamSource<Long> stream3 = env.fromSequence(1, 10);
        // 连接流
        ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);
        // 处理流
        DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String value) throws Exception {
                return value;
            }
            @Override
            public String map2(Long value) throws Exception {
                return Long.toString(value);
            }
        });
        // 输出
        mapStream.print();
        // 执行
        env.execute();
    }
}

        这里连接了一个字符串流和一个长整型序列流,通过 CoMapFunction 分别将字符串按原样、长整型转换为字符串后合并输出。

三、侧输出流(Side Outputs)

功能概述
        侧输出流可根据自定义规则对输入流数据进行分流,将满足不同条件的数据输出到不同的“分支”流中,方便后续针对性处理。

代码示例
以下示例对流中的数据按照奇数和偶数进行分流并获取分流后的数据:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputExample {
    public static void main(String[] args) throws Exception {
        // 1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 侧道输出流
        // 从0到100生成一个Long类型的数据流作为示例数据
        DataStreamSource<Long> streamSource = env.fromSequence(0, 100);
        // 定义两个标签,用于区分偶数和奇数的侧输出流
        OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));
        OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));

        // 2. source-加载数据
        // 使用ProcessFunction来处理每个元素,决定将其输出到哪个侧输出流或者主输出流
        SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {
                // value代表每一个数据,判断是否为偶数
                if (value % 2 == 0) {
                    // 如果是偶数,将其输出到偶数的侧输出流中
                    ctx.output(tag_even, value);
                } else {
                    // 如果是奇数,将其输出到奇数的侧输出流中
                    ctx.output(tag_odd, value);
                }
            }
        });

        // 3. 获取奇数的所有数据,从主输出流中获取对应标签(tag_odd)的侧输出流数据
        DataStream<Long> sideOutput = process.getSideOutput(tag_odd);
        sideOutput.print("奇数:");

        // 获取所有偶数数据,同样从主输出流中获取对应标签(tag_even)的侧输出流数据
        DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);
        sideOutput2.print("偶数:");

        // 4. sink-数据输出(这里通过打印展示了侧输出流的数据,实际应用中可对接其他下游操作)

        // 5. 执行任务
        env.execute();
    }
}

在上述代码中:

  • 首先,我们创建了 StreamExecutionEnvironment 来准备 Flink 的执行环境,并设置了运行模式为自动(AUTOMATIC)。
  • 接着,通过 fromSequence 方法生成了一个从 0 到 100 的 Long 类型的数据流作为示例的输入数据。
  • 然后,定义了两个 OutputTag,分别用于标记偶数和奇数的侧输出流,并且指定了对应的类型信息(这里都是 Long 类型)。
  • 在 process 函数中,针对每个输入的元素(value),通过判断其是否能被 2 整除来决定将其输出到对应的侧输出流中。如果是偶数,就通过 ctx.output(tag_even, value) 将其发送到偶数侧输出流;如果是奇数,则通过 ctx.output(tag_odd, value) 发送到奇数侧输出流。
  • 最后,通过 getSideOutput 方法分别获取奇数和偶数侧输出流的数据,并进行打印输出,以此展示了侧输出流的分流及获取数据的完整流程。实际应用场景中,这些侧输出流的数据可以进一步对接不同的业务逻辑进行相应处理,比如奇数流进行一种聚合计算,偶数流进行另一种统计分析等。

        这样,利用侧输出流的特性,我们可以很灵活地根据自定义条件对数据进行分流处理,满足多样化的数据处理需求。

四、总结

        本文围绕 Apache Flink 转换算子展开,旨在助力读者洞悉其核心要点与多样应用,以灵活处理复杂业务数据。

  1. 常用转换算子
    • map:基于自定义逻辑,对输入元素逐一变换,如剖析日志字符串、提取关键信息并转换格式,封装为定制对象输出,契合精细化处理需求。
    • flatMap:将输入元素按需拆分为零个及以上输出元素,凭借拆分、重组操作,挖掘数据深层价值,适配展开、细化数据场景。
    • filter:依设定条件甄别筛选,精准把控数据流向,剔除不符元素,保障下游数据贴合业务关注点。
    • keyBy:类比 SQL 分组,依指定键归拢数据,为聚合奠基,以不同方式适配多元数据类型,实现分类统计。
    • reduce:聚焦数据集或分组,渐进聚合,可求和、求最值等,sum 操作底层亦仰仗于此,高效整合数据得出汇总结果。
  2. 合并与连接操作
    • union:整合同类型多流为一,操作简便,唯需留意类型一致,虽不除重但拓宽数据维度。
    • connect:桥接不同类型流,借自定义逻辑协同处理,统一输出类型,达成跨流数据交互融合。
  3. 侧输出流特性:基于自定义规则巧妙分流,借OutputTag标记、ProcessFunction判定,将数据导向不同 “分支”,按需对接各异业务逻辑,于复杂场景中尽显灵活应变优势,全方位满足数据处理多元化诉求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工业公辅车间数智化节能头部企业,蘑菇物联选择 TDengine 升级 AI 云智控

小T导读&#xff1a;在工业节能和智能化转型的浪潮中&#xff0c;蘑菇物联凭借其自研的灵知 AI 大模型走在行业前沿&#xff0c;为高能耗设备和公辅能源车间提供先进的 AI 解决方案。此次采访聚焦于蘑菇物联与 TDengine 的合作项目&#xff0c;通过 AI 云智控平台的建设&#x…

TensorFlow实战:黄文坚版Python代码详解

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;本书由黄文坚撰写&#xff0c;深入介绍了TensorFlow的使用方法。TensorFlow是谷歌开发的开源库&#xff0c;用于数值计算和机器学习&#xff0c;特别是在深度学习方面。书中通过丰富的实例和详细解释&#xff0c…

Java项目中加缓存

Java项目中加缓存 1.更新频率低&#xff1b;但读写频率高的数据很适合加缓存&#xff1b; 2.可以加缓存的地方很多&#xff1a;浏览器的缓存&#xff1b;CDN的缓存&#xff1b;服务器的缓存&#xff1b; 本地内存&#xff1b;分布式远端缓存&#xff1b; 加缓存的时候不要…

Elasticearch索引mapping写入、查看、修改

作者&#xff1a;京东物流 陈晓娟 一、ES Elasticsearch是一个流行的开源搜索引擎&#xff0c;它可以将大量数据快速存储和检索。Elasticsearch还提供了强大的实时分析和聚合查询功能&#xff0c;数据模式更加灵活。它不需要预先定义固定的数据结构&#xff0c;可以随时添加或修…

PyMOL操作手册

PyMOL 操作手册 The man will be silent, the woman will be tears. – itwangyang ​ 翻译整理&#xff1a;itwangyanng 2024 年 11月 29 日 目录 初识 PyMOL… 5 0.1 安装 PyMOL… 5 0.1.1 Windows 系统开源版 PyMOL 的安装… 5 0.1.2 教育版 PyMOL 的下载安装……

RabbitMQ原理架构解析:消息传递的核心机制

文章目录 一、RabbitMQ简介1.1、概述1.2、特性 二、RabbitMQ原理架构三、RabbitMQ应用场景3.1、简单模式3.2、工作模式3.3、发布订阅3.4、路由模式3.5 主题订阅模式 四、同类中间件对比五、RabbitMQ部署5.1、单机部署5.2、集群部署&#xff08;镜像模式&#xff09;5.3、K8s部署…

《白帽子讲Web安全》15-16章

《白帽子讲Web安全》15-16章 《白帽子讲Web安全》15章15、Web Server配置安全15.1、Apache安全15.2、Nginx安全15.3、jBoss远程命令执行15.4、Tomcat远程命令执行15.5、HTTP Parameter Pollution15.6、小结 第四篇 互联网公司运营安全《白帽子讲Web安全》16章16、互联网业务安全…

使用R语言进行美国失业率时空分析(包括绘图)

今天写一篇利用R语言&#xff0c;针对面板数据的简单分析与绘图。让我们直接开始把。 一、数据准备 这次的示例数据非常简单&#xff0c;只有一个shp格式的美国区县矢量数据&#xff0c;我们在QGIS中打开数据查看一下它的属性表。事实上我们需要的数据都在属性表的字段中。 二…

计算机毕业设计Python+LSTM天气预测系统 AI大模型问答 vue.js 可视化大屏 机器学习 深度学习 Hadoop Spark

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

YOLOv8实战无人机视角目标检测

本文采用YOLOv8作为核心算法框架&#xff0c;结合PyQt5构建用户界面&#xff0c;使用Python3进行开发。YOLOv8以其高效的实时检测能力&#xff0c;在多个目标检测任务中展现出卓越性能。本研究针对无人机目标数据集进行训练和优化&#xff0c;该数据集包含丰富的无人机目标图像…

uniapp App端在renderjs层渲染echarts获取不到service层id的问题

报错信息&#xff1a;Cannot read properties of undefined (reading id) at app-view.js 这样的写法App端有时在renderjs视图层获取不到server逻辑层的数据 server层 renderjs层 解决方法&#xff1a;需要把数据(id)通过server层向renderjs层传值 server层 renderjs层

【数据湖仓】-- 阿里云 EMR 和 AWS EMR 工具对比

目录 1. 基础设施和集成生态 阿里云 EMR: AWS EMR: 2. 性能与可扩展性 阿里云 EMR: AWS EMR: 3. 成本对比 阿里云 EMR: AWS EMR: 4. 易用性和用户体验 阿里云 EMR: AWS EMR: 5. 总结对比 阿里云 EMR(Elastic MapReduce)和 AWS EMR(Amazon Elas…

Ubuntu在NVME硬盘使用Systemback安装记录

问题 使用Systemback重装系统找不到NVME硬盘。 0.使用Systemback制作iso后&#xff0c;制作启动盘 1.插入启动盘进入live mode模式 2.安装gparted sudo apt-get update sudo apt-get install gparted3.使用gparted对待分区硬盘进行分区 gparted按照你希望的分区方式分区即…

BUUCTF—Reverse—GXYCTF2019-luck_guy(9)

下载附件&#xff0c;照例扔入Exeinfo PE查看信息 可执行文件&#xff0c;IDA 64位直接干 进main函数&#xff0c;F5反编译&#xff0c;看主要处理函数&#xff0c;跳转进去 查看&#xff0c;点进patch_me(v4)看看是怎么回事 这里已经相当清楚&#xff0c;逻辑就是如果你输入的…

Kubernetes KubeVirt 让容器和虚拟机一起工作

在不讨论容器与虚拟机的优缺点的情况下&#xff0c;每个虚拟机或都包含其完整操作系统的实例&#xff0c;并且可以像独立服务器一样运行。相比之下&#xff0c;在容器化环境中&#xff0c;多个容器共享一个操作系统实例&#xff0c;而且绝大多数都是类Linux操作系统。 并非所有…

新型大语言模型的预训练与后训练范式,谷歌的Gemma 2语言模型

前言&#xff1a;大型语言模型&#xff08;LLMs&#xff09;的发展历程可以说是非常长&#xff0c;从早期的GPT模型一路走到了今天这些复杂的、公开权重的大型语言模型。最初&#xff0c;LLM的训练过程只关注预训练&#xff0c;但后来逐步扩展到了包括预训练和后训练在内的完整…

【解决安全扫描漏洞】---- 检测到目标站点存在 JavaScript 框架库漏洞

1. 漏洞结果 JavaScript 框架或库是一组能轻松生成跨浏览器兼容的 JavaScript 代码的工具和函数。如果网站使用了存在漏洞的 JavaScript 框架或库&#xff0c;攻击者就可以利用此漏洞来劫持用户浏览器&#xff0c;进行挂马、XSS、Cookie劫持等攻击。 1.1 漏洞扫描截图 1.2 具体…

python爬虫案例精讲:爬取豆瓣电影Top250信息

前言 在这篇博客中&#xff0c;我们将学习如何使用Python爬取豆瓣电影Top250的数据。我们将使用requests库来发送HTTP请求&#xff0c;BeautifulSoup库来解析HTML页面&#xff0c;并将数据存储到CSV文件中。这个爬虫将自动获取豆瓣电影Top250页面的信息&#xff0c;包括电影名…

Node.js的url模块与querystring模块

新书速览|Vue.jsNode.js全栈开发实战-CSDN博客 《Vue.jsNode.js全栈开发实战&#xff08;第2版&#xff09;&#xff08;Web前端技术丛书&#xff09;》(王金柱)【摘要 书评 试读】- 京东图书 (jd.com) 4.3.1 http模块——创建HTTP服务器、客户端 要使用http模块&#xff0…

Vue0-生命周期-03

生命周期 生命周期指定就是一个对象从创建到销毁的整个过程。 Vue也是有的 完整的Vue周期包含8个阶段。 Vue官方生命周期流程图&#xff1a; 那这有什么用呢&#xff1f;我们可以在指定阶段做特殊的事件。 这些方法伴随生命周期的进行自动执行。 <!DOCTYPE html> <…