Flink系列-10、Flink DataStream的Transformation

news2024/11/26 12:40:21

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

大数据系列文章目录

官方网址:https://flink.apache.org/

学习资料:https://flink-learning.org.cn/
在这里插入图片描述

目录

  • 官网所有的Transformation操作
  • KeyBy
  • Reduce
  • Aggregations
  • Union
  • Connect
  • Split和select

官网所有的Transformation操作

DataStream包括一系列的Transformation操作:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html

KeyBy

在这里插入图片描述
按照指定的key来进行分流,类似于SQL中的groupBy。

示例
自定义数据源, 进行单词的计数

开发步骤

  • 获取流处理运行环境
  • 设置并行度
  • 获取数据源
  • 转换操作
    (1) 以空白进行分割
    (2) 给每个单词计数1
    (3) 根据单词分组
  • 求和
  • 打印到控制台
  • 执行任务
package batch.transformation;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lwh
 * @date 2023/4/25
 * @description 演示keyBy方法 也使用了aggregate方法的sum求和
 **/
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置全局并行度为1
        env.setParallelism(1);
        // Get source
        DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        // 分组加聚合计算, 类似SQL的group by 后加聚合函数求每个组的数据
        // 有一点要注意的是, SQL中是对分组后的每个组的全量数据做聚合计算, 是批计算
        // 在流计算内,是来一条计算一条,也就是每个组的数据,挨个进行计算,求和累加,所以结果中最后一个打印的数据才是最终的求和结果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(0).sum(1);

        // 如果不分组的话, sum的结果是 1+2+3+3+2+3 = 14 分组后是 篮球 6  足球 8
        sum.print();
        env.execute();

    }
}

在这里插入图片描述

Reduce

在这里插入图片描述
可以对一个datastream或者一个group来进行聚合计算,最终聚合成一个元素

数据源

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

示例
读取本地文件,根据IP地址统计计数

步骤

  • 获取ExecutionEnvironment运行环境
  • 使用fromCollection构建数据源
  • 使用reduce执行聚合操作
  • 打印测试
package batch.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;

/**
 * @author lwh
 * @date 2023/4/25 
 * @description 
 **/
public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> logSource = env.readTextFile("data/input/apache.log");

        // 提取IP, 后面都跟上1(作为元组返回)
        MapOperator<String, Tuple2<String, Integer>> ipWithOne = logSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String ip = value.split(" ")[0];
                return Tuple2.of(ip, 1);
            }
        });

        // 分组 + reduce聚合
        UnsortedGrouping<Tuple2<String, Integer>> grouped = ipWithOne.groupBy(0);

        // reduce 聚合
        ReduceOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        });

        result.print();
    }
}

在这里插入图片描述

Aggregations

按照内置的方式来进行聚合。例如:SUM/MIN/MAX…

示例
请将以下元组数据,使用aggregate操作进行单词统计

("java" , 1) , ("java", 1) ,("scala" , 1)

步骤

  • 获取StreamExecutionEnvironment运行环境
  • 使用fromCollection构建数据源
  • 使用groupBy按照单词进行分组
  • 使用aggregate对每个分组进行SUM统计
  • 打印测试
package batch.transformation;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * @author lwh
 * @date 2023/4/25
 * @description 使用内置的方式进行聚合数据 , aggregate只能将数据作用到元组上,例如:sum\Max\min
 **/
public class StreamAggregateDemo {

    public static void main(String[] args) throws Exception {
        //1:获取ExecutionEnvironment运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2: 使用`fromCollection`构建数据源
        DataStreamSource<Tuple2<String, Integer>> tupleDataStream = env.fromCollection(
                Arrays.asList(Tuple2.of("java", 1), Tuple2.of("java", 1), Tuple2.of("scala", 1)));

        //3. 使用groupBy按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyByStream = tupleDataStream.keyBy(0);

        //4. 使用aggregate对每个分组进行SUM统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = keyByStream.sum( 1);

        resultDataStream.printToErr();

        env.execute();

    }
}

在这里插入图片描述

Union

在这里插入图片描述
将两个DataStream取并集,不会去重。
两个DS的泛型需要一致

示例
将以下数据进行取并集操作
数据集1

"hadoop", "hive", "flume"

数据集2

"hadoop", "hive", "spark"

步骤

  • 构建流处理运行环境
  • 使用fromCollection创建两个数据源
  • 使用union将两个数据源关联在一起
  • 打印测试
package batch.transformation;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

/**
 * @author lwh
 * @date 2023/4/25 
 * @description 合并多个流,类型必须要一致
 **/
public class UnionDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source1 = env.fromElements("hadoop", "hive", "flume");
        DataSource<String> source2 = env.fromElements("yarn", "hive", "spark");

        source1.union(source2).print();
        /*
        Union算子 会进行合并, 不会进行重复判断
        Union算子 必须进行 同类型元素的合并, 哪怕是顶级类Object也不行, 必须是实体类(撇除继承关系)的类型一致才可以
         */
    }
}

在这里插入图片描述

Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立, 作为对比,Union后是真的变成一个流了。

示例
读取两个不同类型的数据源,使用connect进行合并打印。
在这里插入图片描述
开发步骤

  • 创建流式处理环境
  • 添加两个自定义数据源
  • 使用connect合并两个数据流,创建ConnectedStreams对象
  • 遍历ConnectedStreams对象,转换为DataStream
  • 打印输出,设置并行度为1
  • 执行任务
package batch.transformation;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

/**
 * @author lwh
 * @date 2023/4/25
 * @description connect连接2个流,流类型可以不一致
 **/
public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source1 = env.addSource(new IncreaseByOneSource());
        DataStreamSource<Integer> source2 = env.addSource(new IncreaseByOneSource());

        ConnectedStreams<Integer, Integer> connectedStreams = source1.connect(source2);

        /*
            针对connect后的Stream, 所有方法都会传入特殊的处理, 比如:
            map方法需要实现CoMapFunction 而非我们前面用的MapFunction
            这一类特殊的处理, 比如CoMapFunction, 内就有两个map方法
            1个处理连接的第一个Stream
            另一个处理连接的另一个Stream
         */
        // CoMapFunction的3个泛型: 泛型1: 第一个Stream的输入类型 泛型2: 第二个Stream的输入类型 泛型3: 返回的类型(要求两个map方法的返回值类型一致)
        SingleOutputStreamOperator<String> map = connectedStreams.map(new CoMapFunction<Integer, Integer, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "map1:" + value;
            }

            // map1和map2的返回值类型需要一致, 因为类的第三个泛型约束着
            @Override
            public String map2(Integer value) throws Exception {
                return "map2:" + value;
            }
        });
        map.print("》》》》》》");
        System.out.println();
        env.execute();
    }

    static class IncreaseByOneSource implements SourceFunction<Integer> {
        private boolean isRun = true;       // Flag for run
        private final Random random = new Random();

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (isRun) {
                ctx.collect(random.nextInt(999));
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            this.isRun = false;
        }

    }
}

在这里插入图片描述

Split和select

在这里插入图片描述
Split就是将一个DataStream分成两个或者多个DataStream
Select就是获取分流后对应的数据
简单认为就是, Split会给数据打上标记
然后通过Select, 选择标记来划分出不同的Stream
效果类似KeyBy分流,但是比KeyBy更自由些,可以自由打标记并进行分流。

在这里插入图片描述

示例
加载本地集合(1,2,3,4,5,6), 使用split进行数据分流,分为奇数和偶数. 并打印奇数结果

开发步骤

  • 创建流处理环境
  • 设置并行度
  • 加载本地集合
  • 数据分流,分为奇数和偶数
  • 获取分流后的数据
  • 打印数据
  • 执行任务
package batch.transformation;

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @author lwh
 * @date 2023/4/25
 * @description 使用split分流打标记,select在根据标记获取分流后的数据
 **/
public class SplitSelectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6);

        // 通过Split + Select的组合, 区分出偶数和奇数
        // SplitStream被标记为弃用了, 但是源码中没有相关解释, 可能以后的新版本会有对应的替代对象, 目前先继续使用
        SplitStream<Integer> splitedStream = source.split(new OutputSelector<Integer>() {

            // 可以看出返回值是Iterable<String> 也就是泛型是String的可迭代集合
            // 意思就是将字符串标记放入可迭代集合中返回即可
            @Override
            public Iterable<String> select(Integer value) {
                ArrayList<String> flagList = new ArrayList<>();
                if (value % 2 == 0) {
                    flagList.add("even");        // 偶数标记为even
                } else {
                    flagList.add("odd");        // 奇数标记为odd
                    flagList.add("jishu");      // 标记可以打多个, 比如奇数就打了两个标记
                }
                return flagList;    // 可以看出, 返回的是flag, 和value没关系
                // 这个方法就是纯用来打标记的, 对value不做处理
            }
        });

        // 偶数DataStream
        DataStream<Integer> evenStream = splitedStream.select("even");
        // 通过Select取得奇数DataStream
        DataStream<Integer> oddStream = splitedStream.select("odd");
        // 通过奇数的另一个标记取得奇数内容
        DataStream<Integer> odd2Stream = splitedStream.select("jishu");
        // 如果select的内容不存在
        DataStream<Integer> hahaha = splitedStream.select("hahaha");

        evenStream.print("even>>>");

        oddStream.print("odd>>>");
        odd2Stream.print("odd2>>>");

        // hahaha标记不存在, 所以无法得到内容, 这个DataStream是空的
        hahaha.print("hahaha>>>");

        env.execute();

    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/459619.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探析Android中的四类性能优化

作者&#xff1a;Yj家的孺子牛 流畅性优化 主线程模型 了解 Android 的流畅性优化之前&#xff0c;我们需要先了解Android的线程结构。在 Android 中&#xff0c;有一个主线程模型&#xff0c;其中所有的绘制以及交互都是在主线程中进行的&#xff0c;所以&#xff0c;当我们…

【LaTex】Elsevier投稿系统到底何时整顿?‘expl3.sty‘ aborted!

前言 两年前&#xff0c;我在投稿Elsevier旗下的Knoeldeg-based systems时就被这个投稿系统整得是头昏脑胀&#xff0c;直接肝爆。首先&#xff0c;第一次提交手稿时可以接受PDF&#xff0c;很方便。然而&#xff0c;后面大修时提交可编辑的源文件时给我狠狠的打脸了。记得当时…

快速入门量化交易

本文首发自「慕课网」&#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注"慕课网"&#xff01; 原作者&#xff1a;袁霄|慕课网讲师 近来“量化交易”这个词听得越来越频繁&#xff0c;多数人对量化交易的第一印象是“高大上的技术”…

堆的原理解析

看这篇文章需要对比较器有一定的了解&#xff0c;可以看我的这篇文章&#xff1a; 认识比较器_鱼跃鹰飞的博客-CSDN博客 堆的实际存储方式是数组&#xff0c;但是脑海中应该把他想象成一种树的结构 依次加入下标0-8的9个数&#xff08;添加过程中会不断的和父节点大小进行比…

舰船交流电网绝缘监测及故障定位的研究及产品选型

摘要&#xff1a;交流电网和电气设备的绝缘状况直接影响舰船电力系统安全&#xff0c;其绝缘电阻的下降是一个不可避免的过程&#xff0c;成为了电网安全的严重隐患。电气设备绝缘材料的劣化过程是不可逆的&#xff0c;对舰船交流电网进行绝缘在线监测及快速定位绝缘故障支路&a…

浅谈:JVM垃圾回收

一、四种类加载器(双亲委托/全盘委托机制) 1.启动类加载器: 加载 Java 核心类库,无法被 Java 程序直接引用。 2.扩展类加载器: 加载 Java 的扩展库。Java 虚拟机的实现会提供一个扩展库目录。该类加载器在此目录里面查找并加载 Java 类。 3.系统类加载器: 它根据 Java 应用的类…

seleniumUI自动化登录失败案例重新尝试WhileTrue

一个用户每次登录失败&#xff0c;失败N次&#xff0c;无法进入下一url时&#xff0c;怎样会重新尝试N次重新登录呢 &#xff1f; 我们可以使用wihile true判断&#xff0c;并使用currenturl判断&#xff0c;下面就介绍以下个人的方法 currenturlEGTconfigFile.driver.curren…

Opencv识别车牌

Opencv识别车牌 #encoding:utf8 import cv2 import numpy as np Min_Area 50 #定位车牌 def color_position(img,output_path): colors [#([26,43,46], [34,255,255]), # 黄色 ([100,43,46], [124,255,255]), # 蓝色 ([35, 43, 46], [77, 255, 255]) # 绿色 ] hsv cv2.cvtCo…

推荐 7 个超牛的 Spring Cloud 实战项目

个 把一个大型的单个应用程序和服务拆分为数个甚至数十个的支持微服务&#xff0c;这就是微服务架构的架构概念&#xff0c;通过将功能分解到各个离散的服务中以实现对解决方案的解耦。 关于微服务相关的学习资料不多&#xff0c;而 GitHub 上的开源项目可以作为你微服务之旅…

STM32平衡小车 mpu6050学习

MPU6050简介 MPU6050是一款性价比很高的陀螺仪,可以读取X Y Z 三轴角度,X Y Z 三轴加速度,还有内置的温度传感器,在姿态解析方面应用非常广泛。 二、硬件连接 由于采用IIC通信,最基本的只需要采用四根线就可以了。分别VCC,GND,SCL,SDA连接到单片机 SCL-----PB6 SDA---…

23种设计模式之观察者模式(黑马程序员)

观察者模式 一、概述二、结构三、实现四、总结在最后 一、概述 观察者模式又被称为发布-订阅模式(Publish/Subscribe)模式&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时&#xff0c;会通知所有…

java 键值对详解及java键值对代码

在 Java中&#xff0c;对象可以理解为一个列表。这个列表里面的每个元素都是一个“键”&#xff0c;而每个“键”都是一个值。 键值对的概念&#xff0c;并不是在 Java中第一次出现&#xff0c;在 java 1.x中就已经有了。那时候它的意思是在一个命名空间中建立两个名字相同的对…

利用三维CNN对阿尔茨海默病进行多模态研究

文章目录 Is a PET All You Need? A Multi-modal Study for Alzheimer’s Disease Using 3D CNNs摘要方法实验结果讨论结论 Is a PET All You Need? A Multi-modal Study for Alzheimer’s Disease Using 3D CNNs 摘要 提出了一个系统评估多模态dnn的框架重新评估基于FDG-P…

Android-实现一个登录页面(kotlin)

准备工作 首先&#xff0c;确保你已经安装了 Android Studio。如果还没有安装&#xff0c;请访问 Android Studio 官网 下载并安装。 前提条件 - 安装并配置好 Android Studio Android Studio Electric Eel | 2022.1.1 Patch 2 Build #AI-221.6008.13.2211.9619390, built …

如何使用命令行添加配置码云仓库SSH秘钥-git仓库也一样

使用命令行添加配置码云仓库SSH秘钥 为什么要如何使用命令行添加配置码云仓库SSH秘钥&#xff1f;生成密钥你可以按如下命令来生成 sshkey:可以参考下图执行指令 添加密钥登录你的码云&#xff0c;鼠标移入头像&#xff0c;设置。点击 SSH公钥&#xff0c;打开配置页面&#x…

新型数字智慧城市综合趋势解决方案(ppt可编辑)

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除 新型智慧城市解决方案总体架构 新型智慧城市顶层规划&#xff08;咨询&#xff09;服务概述 服务定义&#xff1a;提供面向城市及其产业的智慧化咨询服务&#xff0c;涵盖需求…

linux——进程的概念与状态

大家好&#xff0c;我是旗帜僵尸。今天我将带领大家学习进程的概念。 本篇文章将继续收录于我的linux专栏中&#xff0c;若想查看关于linux其它知识的文章也可以点击右方链接。旗帜僵尸——linux 文章目录 一、进程概念冯诺依曼体系结构OS&#xff08;操作系统Operator System&…

突破传统监测模式:业务状态监控HM的新思路

作者&#xff1a;京东保险 管顺利 一、传统监控系统的盲区&#xff0c;如何打造业务状态监控。 在系统架构设计中非常重要的一环是要做数据监控和数据最终一致性&#xff0c;关于一致性的补偿&#xff0c;已经由算法部的大佬总结过就不在赘述。这里主要讲如何去补偿&#xff…

电子阅读器市场角力,AI成为关键变量

配图来自Canva可画 近年来&#xff0c;随着国家“书香型社会”建设政策的出台&#xff0c;公众的阅读需求正在逐年增加&#xff0c;各类读书产品和读书活动&#xff0c;也如同雨后春笋般涌现&#xff0c;人们的阅读体验日益得到丰富。比如&#xff0c;昨天世界读书日举行的“不…

Photoshop在启动时出现读取计算机特定首选项时出错,或者提示暂存盘已满导致打不开该如何处理

上午还能用&#xff0c;下午打开Photoshop时就报此错误 点击确定后&#xff0c;出现下图错误 首先&#xff0c;先试试删除设置文件。在长按shiftctrlalt的情况下用鼠标右键点击Photoshop图标&#xff0c;点击打开&#xff08;此间别松手&#xff09;&#xff0c;就会出现下图&a…