大数据-玩转数据-Flink-Transform(上)

news2025/1/11 0:38:17

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_map {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);
        s_num.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer values) throws Exception {
                return values*values;
            }
        }).print();

    }
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_filter {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();
        elements.filter(value -> value % 2 != 0).print();
    }
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.concurrent.ExecutionException;

public class flatMap {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);
        dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
                collector.collect(integer*integer);
                collector.collect(integer*integer*integer);
            }
        }).print();
    }
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .sum(1)
                .print();

        env.execute("execute");
    }
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyByReduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L))
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {
                        return new Tuple2<>(values1.f0,values1.f1+values2.f1);
                    }
                })
                .print();
        env.execute();
    }
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Process_s {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);
                }
                if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

四、下节合流算子、用户自定义函数、分区算子

待续。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/845142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Spring Boot】Thymeleaf模板引擎 — Thymeleaf的高级用法

Thymeleaf的高级用法 主要介绍Thymeleaf的内联、内置对象、内置变量等高级用法。 1.内联 虽然通过Thymeleaf中的标签属性已经几乎满足了开发中的所有需求&#xff0c;但是有些情况下需要在CSS或JS中访问后台返回的数据。所以Thymeleaf提供了th:inline"text/javascript/…

java+springboot+mysql日程管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的日程管理系统&#xff0c;系统包含超级管理员、管理员、用户角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff1b;用户管理&#xff1b;反馈管理&#xff1b;系统公告&#xff1b;个人信息…

解析器模式(C++)

定义 给定一个语言&#xff0c;定义它的文法的一种表示&#xff0c;并定义一种解释器&#xff0c;这个解释器使用该表示来解释语言中的句子。 应用场景 在软件构建过程中&#xff0c;如果某一特定领域的问题比较复杂&#xff0c;类似的结构不断重复出现&#xff0c;如果使用…

JavaWeb三大组件——Filter

目录 过滤器作用 注册Filter 父pom文件 pom文件 注解方式 TokenFilter LogFilter Logout 启动类 使用FilterRegistrationBean注册 结果 过滤器作用 过滤器用途主要包括&#xff1a;对用户请求进行统一认证、对用户的访问请求进行记录和审核、对用户发送的数据进行过…

UE4/5 GAS技能系统入门2 - AttributeSet

在GAS系统中对属性进行修改需要用到GE&#xff08;Gameplay Effect&#xff09;&#xff0c;而这又涉及到AttributeSet这样的概念。 AttributeSet用于描述角色的属性集合&#xff0c;如攻击力、血量、防御力等&#xff0c;与GAS系统整合度较高&#xff0c;本文就来讲一讲Attri…

【1++的数据结构】之二叉搜索树

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的数据结构】 文章目录 一&#xff0c;什么是二叉搜索树二&#xff0c;二叉搜索树的操作及其实现2.1 插入操作及其实现2.2 查找操作及其实现2.3 删除操作及其实现 三&#xff0c;构造及其析构四…

docker版jxTMS使用指南:使用jxTMS采集数据之一

本文讲解了如何jxTMS的数据采集与处理框架并介绍了如何用来采集数据&#xff0c;整个系列的文章请查看&#xff1a;docker版jxTMS使用指南&#xff1a;4.4版升级内容 docker版本的使用&#xff0c;请查看&#xff1a;docker版jxTMS使用指南 4.0版jxTMS的说明&#xff0c;请查…

【搭建PyTorch神经网络进行气温预测】

import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch import torch.optim as optim import warnings warnings.filterwarnings("ignore") %matplotlib inlinefeatures pd.read_csv(temps.csv)#看看数据长什么样子 features.head…

图像 分割 - Fast-SCNN: Fast Semantic Segmentation Network (arXiv 2019)

Fast-SCNN: Fast Semantic Segmentation Network - 快速语义分割网络&#xff08;arXiv 2019&#xff09; 摘要1. 引言2. 相关工作2.1 语义分割的基础2.2 DCNN的效率2.3 辅助任务预训练 3. 提议的Fast-SCNN3.1 动机3.2 网络架构3.2.1 学习下采样3.2.2 全局特征提取器3.2.3 特征…

侯捷 C++面向对象编程笔记——10 继承与虚函数

10 继承与虚函数 10.1 Inheritance 继承 语法&#xff1a;:public base_class_name public 只是一种继承的方式&#xff0c;还有protect&#xff0c;private 子类会拥有自己的以及父类的数据 10.1.1 继承下的构造和析构 与复合下的构造和析构相似 构造是由内而外 Container …

试图将更改推送到 GitHub,但是远程仓库已经包含了您本地没有的工作(可能是其他人提交的修改)

这通常是由于其他人或其他仓库推送到了相同的分支上&#xff0c;导致您的本地仓库和远程仓库之间存在冲突。 错误信息&#xff1a; To github.com:8upersaiyan/CKmuduo.git ! [rejected] main -> main (fetch first) error: failed to push some refs to github.com:8upers…

Webots与MATLAB联合仿真环境配置

1. 版本 系统&#xff1a;Win10 matlab版本&#xff1a;2023a webots版本&#xff1a;R2020b 2.安装 MATLAB MinGW-w64 C/C Compiler 在使用matlab写控制器之前&#xff0c;需要给matlab安装 MATLAB MinGW-w64 C/C Compiler&#xff0c;因为需要matlab与c进行交互。 下载地址…

K8S系列文章之 Kind 部署K8S的 服务发布

安装kind 下载 https://github.com/kubernetes-sigs/kind/releases/download/0.17.0/kind-linux-amd64 执行以下命令&#xff1a; mv kind-linux-amd64 /usr/local/bin/kind chmod 777 /usr/local/bin/kind 之前需要先在本地主机安装好docker yum -y install yum-utils d…

门面模式(C++)

定义 为子系统中的一组接口提供一个一致(稳定) 的界面&#xff0c;Facade模式定义了一个高层接口&#xff0c;这个接口使得这一子系统更加容易使用(复用)。 应用场景 上述A方案的问题在于组件的客户和组件中各种复杂的子系统有了过多的耦合&#xff0c;随着外部客户程序和各子…

无锚框原理 TOOD:Task-aligned One-stage Object Detection

无锚框原理 TOOD&#xff1a;Task-aligned One-stage Object Detection 一 摘要二 引言TOOD设计 三 具体设计Task-aligned Head任务对齐的预测器 TAP预测对齐 TAL 任务对齐学习Task-aligned Sample Assignment多任务损失 一 摘要 一阶段目标检测通常通过优化两个子任务来实现&…

计算机毕业设计题目大全(论文+源码)_kaic

图书信息管理系统的设计与实现(论文源码)_kaic 基于Spring Boot的学院宿舍管理系统的设计与实现(论文源码)_kaic 在线考试系统设计与实现(论文源码)_kaic 基于javaee的就业管理系统设计与实现(论文源码)_kaic 基于VUE和SpringBoot的微信小程序商城的设计与实现(论文源码)_kaic …

iOS - 开发者账号续订会员资格更换订阅的账号

文章目录 前言开发环境续订会员资格转让账户持有人验证身份1. 实名认证2. 联系信息 更换订阅的账号最后 前言 公司有一个开发者账号快到期了需要续订会员资格&#xff0c;刚注册时是用我自己的个人账号完成的订阅购买。现在想来有点不妥&#xff0c;于是尝试更换用于订阅的账号…

STM32 CubeMX USB_MSC(存储设备U盘)

STM32 CubeMX STM32 CubeMX USB_MSC(存储设备U盘&#xff09; STM32 CubeMX前言 《使用内部Flash》——U盘一、STM32 CubeMX 设置USB时钟设置USB使能UBS功能选择FATFS功能 二、代码部分修改代码"usbd_storage_if.c"修改代码"user_diskio.c"main函数初始化插…

CVE-2022-23134(Zabbix setup 访问控制登录绕过)

目录 一、题目 二、进入题目 一、题目 靶标介绍&#xff1a; Zabbix Sia Zabbix是拉脱维亚Zabbix SIA&#xff08;Zabbix Sia&#xff09;公司的一套开源的监控系统。该系统支持网络监控、服务器监控、云监控和应用监控等。 Zabbix 存在安全漏洞&#xff0c;该漏洞源于在初始…

【VSCode】报错:出现段错误解决办法 (Segmentation fault)

VScode报错&#xff1a;Segmentation fault (core dumped)的解决办法 解决Program received signal SIGSEGV, Segmentation fault.的辛酸 Linux环境下段错误的产生原因及调试方法小结 Linux下的段错误Segmentationfault产生的原因及调试方法经典.pdf 解决办法&#xff1a;