大数据-玩转数据-Flink-Transform

news2025/4/6 12:50:24

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_map {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);
        s_num.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer values) throws Exception {
                return values*values;
            }
        }).print();

    }
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_filter {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();
        elements.filter(value -> value % 2 != 0).print();
    }
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.concurrent.ExecutionException;

public class flatMap {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);
        dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
                collector.collect(integer*integer);
                collector.collect(integer*integer*integer);
            }
        }).print();
    }
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .sum(1)
                .print();

        env.execute("execute");
    }
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyByReduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L))
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {
                        return new Tuple2<>(values1.f0,values1.f1+values2.f1);
                    }
                })
                .print();
        env.execute();
    }
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Process_s {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);
                }
                if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;

public class connect_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> data2 = env.fromElements("a", "b", "c");
        ConnectedStreams<Integer, String> data3 = data1.connect(data2);
        data3.getFirstInput().print("data1");
        data3.getSecondInput().print("data2");
        env.execute();
    }
}

4.2、union(合并)

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class union_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> data2 = env.fromElements(555,666);
        DataStreamSource<Integer> data3 = env.fromElements(999);
        DataStream<Integer> data = data1.union(data2).union(data3);
        data.print();
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/848632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

研究人员发现特斯拉汽车能被越狱,可免费解锁付费功能

Bleeping Computer 网站披露&#xff0c;柏林工业大学&#xff08;Technical University of Berlin&#xff09;的研究人员开发出一种新技术&#xff0c;可以破解特斯拉近期推出所有车型上使用的基于 AMD 的信息娱乐系统&#xff0c;并使其运行包括付费项目在内的任何软件。 实…

22、springboot的Profile(通过yml配置文件配置 profile,快速切换项目的开发环境)

springboot的Profile ★ 何谓Profile 应用所在的运行环境发生切换时&#xff0c;配置文件常常就需要随之修改。Profile&#xff1a;——就是一组配置文件及组件的集合。可以整个应用在不同的profile之间切换&#xff08;设置活动profile&#xff09;&#xff0c;整个应用都将使…

QGIS二次开发三:显示Shapefile

Shapefile 为 OGR 所支持的最重要的数据格式之一&#xff0c;自然可以被 QGIS 加载。那么该如何显示Shapefile呢&#xff1f; 一、先上代码 #include <qgsapplication.h> #include <qgsproviderregistry.h> #include <qgsmapcanvas.h> #include <qgsvec…

HDFS中snapshot快照机制

HDFS中snapshot快照机制 介绍作用功能实现相关命令和操作相关命令 介绍 snapshot是数据存储的某一时刻的状态记录&#xff0c;备份&#xff08;backup&#xff09;则是数据存储的某一个时刻的副本HDFS snapshot快照是整个文件系统或某个目录在某个时刻的镜像&#xff0c;该镜像…

黑马机器学习day1

1.sklearn数据集 sklearn.datasets datasets.load_*() 获取小规模的数据集 datasets.fetch_*(data_homeNone) 获取大规模数据集 函数的第一个参数是data_home,标识数据集下载目录&#xff0c;默认/scikit_learn_data/ 1.1sklearn小数据集 sklearn.da…

无涯教程-Perl - fcntl函数

描述 该函数是系统fcntl()函数的Perl版本。使用FILEHANDLE上的SCALAR执行FUNCTION指定的功能。 SCALAR包含函数要使用的值,或者是任何返回信息的位置。 语法 以下是此函数的简单语法- fcntl FILEHANDLE, FUNCTION, SCALAR返回值 该函数返回0,但如果fcntl()的返回值为0,则返…

Vue2:组件高级(下)

Vue2&#xff1a;组件高级&#xff08;下&#xff09; Date: May 25, 2023 Sum: 自定义指令、插槽、商品列表、动态组件 目标&#xff1a; 自定义指令 基础概念&#xff1a; 概念&#xff1a; 内置指令&#xff1a;vue 官方提供了 v-for、v-model、v-if 等常用的内置指令。…

整数规划——第七章 分支定界算法

整数规划——第七章 分支定界算法 目前大部分整数规划商业软件如CPLEX&#xff0c;Gurobi和BARON等都是基于分枝定界算法框架的。 7.1 最优性条件和界 考虑下列一般线性整数规划问题&#xff1a; (IP) min ⁡ c T x , s . t . A x ≤ b , x ∈ Z n (7.1) \text{(IP)}\quad…

部署-打包并提交代码到Git服务器

前期准备工作 vue.config.js const { defineConfig } require(vue/cli-service) module.exports defineConfig({// 项目出Bug,点击错误可以跳到对应的位置&#xff0c;实际生成环境是不需要的 默认为trueproductionSourceMap: false,lintOnSave: false,publicPath: process.…

720度全视角!VR直播遇上法院庭审,你体验过吗?

在元宇宙发展背景之下&#xff0c;VR直播技术同样驶入了发展的快车道&#xff0c;以沉浸式、交互式体验为特点的VR技术&#xff0c;将原本就受欢迎的直播变得更加立体、真实、可靠&#xff0c;观众只需要一部手机就可以实现全方位、无死角地观看直播现场&#xff0c;体验宛如身…

小白带你学习linux的Redis基础(三十二)

目录 前言 一、概述 1、NoSQL 2、Redis 二、安装 1、基础配置 2、编译安装 3、RPM安装网络源安装 三、目录结构 1、rpm -ql redis 2、/etc/redis.conf 四、命令解析 1、redis-server 2、redis-cli 2、redis-check-rdb 3、redis-check-aof 五、redis登录更改 …

ES面试题

前言 1、面试突击正确的学习姿势 老师在给你讲面试突击的时候&#xff0c;是有课件的&#xff0c;而且是有准备的。你在面试的时候&#xff0c;是没有笔记课件的&#xff0c;而且问题是由面试官提问的&#xff0c;具有一定的随机性面试突击课程的目标不是听懂&#xff0c;而是…

文件上传漏洞(webshell)

一、防护 1、防护 1、判断文件后缀&#xff0c;为图片的话才让上传成功。 2、解析文件内容&#xff08;文件幻数&#xff09;判断文件头和文件尾部是否一致 幻数 常见的 3、隐藏按钮&#xff08;带上code唯一值&#xff09; 4、二次渲染&#xff08;类似拿着你的图片&#xff…

cannot import name ‘container_abcs‘ from ‘torch._six‘

进行模型训练的时候&#xff0c;报错&#xff1a; 原因是&#xff1a;torch1.8版本之后container_abcs被移除了&#xff0c;所以我们需要修改timm库中调用container_abcs的调用方式&#xff1a; timm库中调用container_abcs的文件是“~\Conda\envs\pytorch\Lib\site-packages\…

Java后台生成ECharts图片,并以Base64字符串返回

前言 通过echarts的jar包&#xff0c;Java后台生成一张图片&#xff0c;并把图片插入到word中。关于word插图片的代码在下一章。 需要用到的工具PhantomJS,Echarts-convert.js,jquery.js,echarts.js。 1.PhantomJS 介绍 PhantomJS是一个不需要浏览器的富客户端。 官方介绍&…

十年后的web渗透(网络安全)前景如何?你想知道的都在这里

前言 web渗透是网络安全大行业里入门板块&#xff0c;就像十年前的软件&#xff0c;前景非常被看好&#xff0c;薪资也很诱人。与软件测试和前端开发只需掌握一定的编程能力不同的是&#xff0c;渗透需要掌握的知识内容较多&#xff0c;花费的时间较长&#xff0c;渗透测试掌握…

值得苦练的100道Python经典练手题,(附详细答案)

嗨喽大家好卷子又来了&#xff0c;100道Python经典练手题奉上 花了一周的时间&#xff0c;整理了100道Python的练习题&#xff0c;如果你是一位初学者&#xff0c;那么这一份练习题将会给你带来极大的帮助&#xff0c;如果你能够完全独立的完成这份练习题&#xff0c;你已经入…

C/C++学习笔记 Vantage Point Trees的C++实现

下面代码是VP 树的 C 实现&#xff0c;递归search()函数决定是搜索左孩子、右孩子还是两个孩子。为了有效地维护结果列表&#xff0c;我们使用优先级队列。 // A VP-Tree implementation, by Steve Hanov. (steve.hanovgmail.com) // Released to the Public Domain // Based o…

【C++】开源:tinyxml2解析库配置使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍tinyxml2解析库配置使用。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;…

路由器中DMZ、UPnP、Port Forwarding等功能介绍与使用

目录 一、DMZ功能1.1 概念1.2 验证测试方式 二、UPnP功能2.1 概念2.2 验证测试方法 三、Port Forwarding功能3.1 概念3.2 验证测试方法3.3 NAT相关 一、DMZ功能 1.1 概念 **DMZ&#xff08;Demilitarized Zone&#xff09;**是指位于防火墙内部网络&#xff08;LAN&#xff0…