Flink基本转换算子

news2025/1/24 2:24:37

文章目录

    • 1.映射(map)
    • 2.过滤(filter)
    • 3.扁平映射(flatMap)
    • 4.按键分区(keyBy)
    • 5. 简单聚合(sum,min,max等)
    • 6.归约聚合(reduce)
    • 7.自定义函数

💎💎💎💎💎

更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

image-20221116164847350

1.映射(map)

package com.fang.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素中读取数据
        //3.从元素读取数据
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./home", 1000L));
        //1.使用自定义类,实现mapFunction接口
        SingleOutputStreamOperator<String> map = stream3.map(new MyMapper());

        //2.使用匿名了实现MapFunction接口
        SingleOutputStreamOperator<String> map1 = stream3.map(new MapFunction<Event, String>() {
            public String map(Event value) throws Exception{
                return value.user;
            }
        });
        
        //3.传入lambda表达式
        SingleOutputStreamOperator<String> map2 = stream3.map(data -> data.user);


        map.print();
        map1.print();
        map2.print();
        env.execute();
    }
    public static class MyMapper implements MapFunction<Event,String>{

        @Override
        public String map(Event event) throws Exception {
            return event.user;
        }
    }
}

image-20221215125247502

2.过滤(filter)

package com.fang.chapter05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformFilterTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素中读取数据
        //3.从元素读取数据
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./home", 1000L));


        //1.传入一个实现了FilterFunction的类的对象
        SingleOutputStreamOperator<Event> result1 = stream3.filter(new MyFilterTest());

        //2.入一个匿名类
        SingleOutputStreamOperator<Event> mary = stream3.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event e) throws Exception {
                return e.user.equals("Marry");
            }
        });

        //3.传入Lambda表达式
        stream3.filter(data->data.user.equals("Marry")).print();

        result1.print();
        mary.print();
        env.execute();
    }
    public static class MyFilterTest implements FilterFunction<Event> {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Marry");
        }
    }
}

image-20221215125602332

3.扁平映射(flatMap)

package com.fang.chapter05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class TransFlatmapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1.实现FlatMapFunction接口
        stream.flatMap(new MyFlatMap()).print("1");


        //2.传入一个lambda表达式
        stream.flatMap((Event value, Collector<String> out) -> {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }).returns(new TypeHint<String>() {}).print("2");
        
        env.execute();


    }

    public static class MyFlatMap implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception
        {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }
    }

}

image-20221215125822572

4.按键分区(keyBy)

keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

基于不同的 key,流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransKeyByTest {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(1);
             DataStreamSource<Event> stream = env.fromElements(
             new Event("Mary", "./home", 1000L),
             new Event("Bob", "./cart", 2000L)
         );
         
         // 使用 Lambda 表达式进行分组
         KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
         
         // 使用匿名类实现 KeySelector
         KeyedStream<Event, String> keyedStream1 = stream.keyBy(new 
            KeySelector<Event, String>() {
                 @Override
                 public String getKey(Event e) throws Exception {
                 return e.user;
            }
         });
         env.execute();
     }
}

5. 简单聚合(sum,min,max等)

⚫ sum():在输入流上,对指定的字段做叠加求和的操作。

⚫ min():在输入流上,对指定的字段求最小值。

⚫ max():在输入流上,对指定的字段求最大值。

⚫ minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。

⚫ maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransTupleAggreationTest {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.getExecutionEnvironment();
        
         env.setParallelism(1);
         DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
             Tuple2.of("a", 1),
             Tuple2.of("a", 3),
             Tuple2.of("b", 3),
             Tuple2.of("b", 4)
         );
         
         stream.keyBy(r -> r.f0).sum(1).print();
         stream.keyBy(r -> r.f0).sum("f1").print();
         
         stream.keyBy(r -> r.f0).max(1).print();
         stream.keyBy(r -> r.f0).max("f1").print();
         
         stream.keyBy(r -> r.f0).min(1).print();
         stream.keyBy(r -> r.f0).min("f1").print();
         
         stream.keyBy(r -> r.f0).maxBy(1).print();
         stream.keyBy(r -> r.f0).maxBy("f1").print();
         
         stream.keyBy(r -> r.f0).minBy(1).print();
         stream.keyBy(r -> r.f0).minBy("f1").print();
         
         env.execute();
     }
}

6.归约聚合(reduce)

  如果说简单聚合是对一些特定统计需求的实现,那么 reduce 算子就是一个一般化的聚合统计操作了。从大名鼎鼎的 MapReduce 开始,我们对 reduce 操作就不陌生:它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

package com.fang.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Marry", "./home", 1200L),
                new Event("Bob", "./prod?id=1", 1000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3200L)
        );


        //1.统计每个用户的访问频次
        SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream3.map(new MapFunction<Event, Tuple2<String, Long>>() {

            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                return Tuple2.of(value.user, 1L);
            }
        }).keyBy(data -> data.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        });

        //2.选取当前最活跃的用户
        SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return value1.f1 > value2.f1 ? value1 : value2;
            }
        });

         clicksByUser.print("1");
         result.print("2");
         env.execute();
    }
}

image-20221215131152255

7.自定义函数

1.函数类(Function Classes)

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransFunctionUDFTest {
 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
     
     env.setParallelism(1);
  
     DataStreamSource<Event> clicks = env.fromElements(
         new Event("Mary", "./home", 1000L),
         new Event("Bob", "./cart", 2000L)
     );
     
     DataStream<Event> stream = clicks.filter(new FlinkFilter());
     stream.print();
     env.execute();
   }
  public static class FlinkFilter implements FilterFunction<Event> {
   
     public boolean filter(Event value) throws Exception {
         return value.url.contains("home");
     }
  }
}

2.匿名函数(Lambda)

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransFunctionLambdaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //map 函数使用 Lambda 表达式,返回简单类型,不需要进行类型声明
        DataStream<String> stream1 = clicks.map(event -> event.url);
        stream1.print();

        env.execute();
    }
}

image-20221215131431053

3.富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function 有生命周期的概念。典型的生命周期方法有:

⚫ open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。。

⚫ close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

package com.fang.chapter05;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunctionTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);

        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L)
        );

        clicks.map(new MyRichMapper()).print();
        env.execute();

    }

    //实现一个自定义的复函数类
    public static class MyRichMapper extends RichMapFunction<Event,Integer>{

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
        }

        @Override
        public Integer map(Event value) throws Exception {
            return value.url.length();
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务结束");
        }
    }

}

image-20221117100708249

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/91448.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

m基于LDPC+QPSK通信链路误码率matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 LDPC ( Low-density Parity-check&#xff0c;低密度奇偶校验&#xff09;码是由 Gallager 在1963 年提出的一类具有稀疏校验矩阵的线性分组码 (linear block codes)&#xff0c;然而在接下来的 …

第五届“强网”拟态防御国际精英挑战赛在南京举行——开辟网络安全新赛道 引领网络弹性新优势

12月15日&#xff0c;第五届“强网”拟态防御国际精英挑战赛在南京紫金山实验室隆重开幕&#xff0c;来自国内外60支顶尖战队将通过云上和线下相结合的方式展开72小时的高强度对抗。大赛组委会主席、紫金山实验室首席科学家、中国工程院邬江兴院士指出&#xff0c;本届挑战赛瞄…

jvm内存模型+类加载机制+垃圾手机器

1、类加载器分类 1、引导类加载器&#xff0c;负责加载支撑Jre/lib目录下的核心类库 2、扩展类加载器&#xff1a;负责加载Jre/lib目录下的ext扩展类jar包 3、应用程序类加载器&#xff1a;负责加载classpath下的类包 4、自定义类加载器&#xff1a;负责加载用户自定义路径下的…

值得思索的:ArrayList和线性表,你确定错过这次机会

线性表&#xff1a; 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结 构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列... 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条…

Go项目实战:01-聊天室+map竞争需要上锁

实现一个聊天室&#xff08;群&#xff09;&#xff1a; 功能分析&#xff1a; 1、上线下线2、聊天&#xff1a;其他人和自己都可以看到聊天消息3、查询当前的聊天室用户所有人的名字4、可以修改自己的名字5、超时潜水踢出机制 技术点分析&#xff1a; 1、socket tcp编程2、…

Adam算法及python实现

文章目录算法介绍代码实现结果展示参考算法介绍 Adam算法的发展经历了&#xff1a;SGD->SGDM->SGDNA->AdaGrad->AdaDelta->Adam->Adamax的过程。它是神经网络优化中的常用算法&#xff0c;在收敛速度上比较快&#xff0c;比SGD对收敛速度的纠结上有了很大的…

单商户商城系统功能拆解46—应用中心—足迹气泡

单商户商城系统&#xff0c;也称为B2C自营电商模式单店商城系统。可以快速帮助个人、机构和企业搭建自己的私域交易线上商城。 单商户商城系统完美契合私域流量变现闭环交易使用。通常拥有丰富的营销玩法&#xff0c;例如拼团&#xff0c;秒杀&#xff0c;砍价&#xff0c;包邮…

基于微信小程序的课程分享平台-计算机毕业设计

项目介绍 随着社会的发展&#xff0c;社会的方方面面都在利用信息化时代的优势。互联网的优势和普及使得各种系统的开发成为必需。 本文以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法&#xff0c;它主要是采用java语言技术和mysql数据库来完成对系统的设计。整…

[附源码]Node.js计算机毕业设计高校就业管理信息系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

学生竞赛网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 模块划分&#xff1a;通知类型、通知信息、学院信息、学生信息、学科信息、竞赛信息、报名信 息、成果上传、评分排名 管…

YOLOv5小目标切图检测

当我们在检测较大分辨率的图片时&#xff0c;对小目标的检测效果一直是较差的&#xff0c;所以就有了下面几种方法&#xff1a; 将图片压缩成大尺寸进行训练&#xff08; 想法&#xff1a;没显存&#xff0c;搞不来&#xff09;添加小检测头&#xff08;想法&#xff1a;P5模型…

【爬虫实战项目】Python爬虫批量下载相亲网站数据并保存本地(附源码)

前言 今天给大家介绍的是Python爬虫批量下载相亲网站图片数据&#xff0c;在这里给需要的小伙伴们代码&#xff0c;并且给出一点小心得。 首先是爬取之前应该尽可能伪装成浏览器而不被识别出来是爬虫&#xff0c;基本的是加请求头&#xff0c;但是这样的纯文本数据爬取的人会…

数据结构---树和二叉树

树和二叉树定义二叉树二叉树的物理结构链式存储数组二叉树应用查找维持相对顺序二叉树的遍历深度优先遍历前序遍历中序遍历后序遍历二叉树广度优先遍历层序遍历定义 有且仅有一个特定的称为根的节点。当n>1时&#xff0c;其余节点可分为m&#xff08;m>0&#xff09;个互…

数据结构与算法——Java实现栈、逆波兰计算器(整数加减乘除)

目录 一、栈 1.1 基本介绍 1.2 栈的思路分析 1.3 栈的代码实现 二、栈实现综合计算器 2.1 思路分析 2.2 代码实现&#xff08;中缀表达式实现&#xff09; 三、栈的前缀&#xff08;波兰&#xff09;、中缀、后缀&#xff08;逆波兰&#xff09;表达式 3.1 表达式的介绍…

访问pcie总线地址内容

调用代码如下&#xff1a; uint32_t value;void * addr;printk("------1--------\n");addr0x2730000;struct resource *res;char const *name dev_name(&pdev->dev);printk("dev_name%s\n", name);res request_mem_region(addr, 16, "name1&…

【语音之家公开课】SRD: A Dataset and Benchmark Perspective

本次语音之家公开课邀请到陈果果进行分享Speech Recognition Development: A Dataset and Benchmark Perspective。 公开课简介 主题&#xff1a;Speech Recognition Development: A Dataset and Benchmark Perspective 时间&#xff1a;12月15日&#xff08;周四&#xff09…

web网页设计期末课程大作业:美食餐饮文化主题网站设计——HTML+CSS+JavaScript美食餐厅网站设计与实现 11页面

&#x1f468;‍&#x1f393;静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计&#x1f469;‍&#x1f393;,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等&#xff0c;用的最多的还是DW&#xff0c;当然不同软件写出的…

C# IO及文件管理

一 System.IO ① System.IO名字空间&#xff1b; ② 提供了许多用于&#xff1b; ③ 文件和数据流进行读写操作的类&#xff1b; 二 流的分类 1 Stream类 按存取位置分&#xff1a;FileStream,MemeryStream,BufferedStream; 2 读写类 BinaryReader和BinaryWriter; TextRe…

从 0 到 1 搞一个 Compose Desktop 版本的玩天气之打包

从 0 到 1 搞一个 Compose Desktop 版本的玩天气之打包 大家好&#xff0c;前两篇文章大概介绍了下上手 Compose Desktop 和自定义绘制时遇到的一些问题&#xff0c;项目的最终实现效果如下&#xff1a; 视频代码写好了&#xff0c;该弄的动画也弄了&#xff0c;该请求的网络数…

【数据结构】八大排序算法详解

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《数据结构》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; 排序前言一…