Flink04: Flink核心API之DataSet

news2024/11/26 16:48:41

DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。

  • DataSource是程序的数据源输入。
  • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。
  • DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

 一、DataSet API之DataSource

         针对DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。

  • 基于集合。fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。
  • 基于文件。readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。

二、DataSet API之Transformation

 

1. mapPartition

mapPartition算子和spark中的用法一样,mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。

scala代码:

package com.imooc.scala.batch

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

object BatchMapPartitionScala {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //生成数据源数据
    val text: DataSet[String] = env.fromCollection(Array("hello you", "hello me"))

    //每次处理一个分区数据
    text.mapPartition(it => {
      val res: ListBuffer[String] = ListBuffer[String]()

      it.foreach(line => {
        val words: Array[String] = line.split(" ")
        for(word <- words){
          res.append(word)
        }
      })
      res
      //关闭数据库连接
    }).print()
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
    //env.execute("BatchMapPartitionScala")
  }
}

Java代码:

package com.imooc.java.batch;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Iterator;

public class BatchMapPartitionJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 生成数据源数据
        DataSource<String> text = env.fromCollection(Arrays.asList("hello you", "hello me"));
        //每次处理一个分区的数据
        text.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
                //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
                Iterator<String> it = iterable.iterator();
                while(it.hasNext()){
                    String line = it.next();
                    String[] words = line.split(" ");
                    for (String word: words){
                        collector.collect(word);
                    }
                }
            }
        }).print();
        
    }
}

 2. join : 内连接,可以连接两份数据集

 scala代码

package com.imooc.java.batch

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchJoinScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")))

    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
    //对两份数据进行join操作
    //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
    //where:指定左边数据集中参与比较的元素角标,equalTo指定右边数据集中参与比较的元素角标
    text1.join(text2).where(0).equalTo(0) {
      (first, second) => (first._1, first._2, second._2)
    }.print()
  }
}

3. cross : 获取两个数据集的笛卡尔积

scala代码

package com.imooc.scala.batch

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchCrossScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //初始化第一份数据
    val text1 = env.fromCollection(Array(1, 2))
    //初始化第二份数据
    val text2 = env.fromCollection(Array(3, 4))

    //执行cross操作
    text1.cross(text2).print()
  }
}

4. union:返回两个数据集的总和,数据类型需要一致

和DataStreamAPI中的union操作功能一样

5. first-n :获取集合中的前N个元素

6. groupBy :分组

7. sortGroup:分组内排序

8. 实例:获取分组排序后每组的前N个元素

 scala代码

package com.imooc.scala.batch

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

object BatchFirstNScala {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    val data: ListBuffer[(Int, String)] = ListBuffer[Tuple2[Int, String]]()

    data.append((2,"zs"))
    data.append((4,"ls"))
    data.append((3,"ww"))
    data.append((1,"aw"))
    data.append((1,"xw"))
    data.append((1,"mw"))

    import org.apache.flink.api.scala._
    val text: DataSet[(Int, String)] = env.fromCollection(data)

    //获取前三条数据
//    text.first(3).print()

    //根据数据中的第一列进行分组,获取每组的前2个元素
//    text.groupBy(0).first(2).print()

    //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
    //分组排序取TopN
    text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()
  }
}

 Java代码

package com.imooc.java.batch;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class BatchFirstNJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<Integer,String>(2,"zs"));
        data.add(new Tuple2<Integer,String>(4,"ls"));
        data.add(new Tuple2<Integer,String>(3,"ww"));
        data.add(new Tuple2<Integer,String>(1,"aw"));
        data.add(new Tuple2<Integer,String>(1,"xw"));
        data.add(new Tuple2<Integer,String>(1,"mw"));
        DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);

        //获取前3条数据,按照数据插入的顺序
//        text.first(3).print();

//        text.groupBy(0).first(2).print();
        //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
        text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
    }
}

三、DataSet API之DataSink

Flink针对DataSet提供了一些已经实现好的数据目的地
其中最常见的是向HDFS中写入数据

(1)writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
(2)writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
(3)还有一个是print:打印每个元素的toString()方法的值,这个print是测试的时候使用的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/364911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

攻击者查看邮件就被溯源到家?

本文通过分享实际攻防演练中真实案例&#xff0c;防守方在未暴露任何敏感信息的情况下&#xff0c;仅通过邮件往来最终溯源到攻击方相关真实信息。 作为攻击溯源技术的引子&#xff0c;供各位从业和爱好者交流学习。 场景描述 攻击者伪造邮件&#xff0c;称其申请防守方靶标系…

洛谷P5736 【深基7.例2】质数筛 C语言/C++

【深基7.例2】质数筛 题目描述 输入 nnn 个不大于 10510^5105 的正整数。要求全部储存在数组中&#xff0c;去除掉不是质数的数字&#xff0c;依次输出剩余的质数。 输入格式 第一行输入一个正整数 nnn&#xff0c;表示整数个数。 第二行输入 nnn 个正整数 aia_iai​&…

基于 esp-idf SDK ,如何在 .cpp 工程中加入.c 的文件调用?

把外部 .c 文件放到 .cpp 工程下的 main 文件夹然后在 .cpp 工程下声明 .c 文件下的 hello_main 函数同时在 cpp 工程的 CmakeLists.txt 文件下加上 .c 文件最后在 .cpp 工程下调用 hello_main 函数即可 可基于 esp-idf/examples/storage/nvs_rw_value_cxx 例程来测试 &#x…

mysql 8.0.32安装 windows server 超详细

官网下载mysql包&#xff0c;官网地址(中文版)&#xff1a; http://mysql.p2hp.com/cloud/index.html 我是下载的这个(第一个) 内容解压后是这样的&#xff0c;其实windows版本无需安装&#xff0c;只需要配置后启动即可 同时&#xff0c;建议下载下这个Visual Studio&#xf…

Nebula测试

LDBC benchmark 这是官方文档 https://ldbcouncil.org/ldbc_snb_docs/ldbc-snb-specification.pdf 主要有几点 Scale Factors 是生成数据的一个大小&#xff0c;For both workloads, the SF1 data set is 1 GiB, the SF100 is 100 GiB, and the SF10 000 data set is 10 000 G…

关于CSS的简单知识

CSS是什么首先&#xff0c;在之前的html仅仅是写了一个框架&#xff0c;页面并不工整&#xff0c;美观。而CSS正是解决了这一问题。HTML仅仅只是表示页面的结构和内容&#xff0c;而CSS描述的是页面的样式&#xff08;包括大小/位置/字体/颜色/背景等&#xff09;基本语言规范选…

[element plus] 对话框组件再封装使用 - vue

学习关键语句: 饿了么组件dialog组件使用 dialog组件二次封装 vue3中封住的组件使用update触发更新 vue3中封装组件使用v-model:属性值来传值 写在前面 这是我遇到的一个页面需求 , 其中一个对话框的内容是很常用的 , 所以我将它封装出来才写的一篇文章 现在给出如下需求: 封…

Git(分布式版本控制系统)

提到git了&#xff0c;我们先来说一下什么是git? 1、通俗一点&#xff0c;就是一个人工版本控制器 通过人工的复制行为来保存项目的不同阶段的内容&#xff0c;添加适当的一些描述文字加以区分 繁琐、容易出错 产生大量重复数据 2、什么是版本控制&#xff1f; 版本控制是指对…

JVM16命令行

2. JVM 监控及诊断工具-命令行篇 2.1. 概述 简单命令行工具 在我们刚接触 java 学习的时候&#xff0c;大家肯定最先了解的两个命令就是 javac&#xff0c;java&#xff0c;那么除此之外&#xff0c;还有没有其他的命令可以供我们使用呢&#xff1f; 我们进入到安装 jdk 的…

JAVA并发编程面试题合集

1.在Java中守护线程和本地线程的区别&#xff1f; Java中的线程分为两种&#xff1a;守护线程&#xff08;Daemon&#xff09;和用户线程&#xff08;User&#xff09;任何线程都可以设置为守护线程和用户线程&#xff0c;通过方法Thread.setDaemon(boolean)&#xff1b;true表…

框架开发有哪些优势?Java主流框架

什么是框架“框架&#xff08;Framework&#xff09;"一词最早出现在建筑领域&#xff0c;指的是在建造房屋前期构建的建筑骨架。在编程领域&#xff0c;框架就是应用程序的骨架&#xff0c;开发人员可以在这个骨架上加入自己的东西&#xff0c;搭建出符合自己需求的应用系…

mac电脑数据恢复?真正实用的方法(2023最新)

使用电脑的用户都知道&#xff0c;被删除的文件一般都会经过回收站&#xff0c;想要恢复它直接点击“还原”就可以恢复到原始位置。mac电脑同理也是这样&#xff0c;但是“回收站”在mac电脑显示为“废纸篓”。 如果电脑回收站&#xff0c;或者是废纸篓里面的数据被清空了&…

Nginx第一讲

目录 一、Nginx01 1.1 Nginx简介 1.1.1 Nginx介绍 1.1.2 Nginx的应用 1.1.3 关于代理 1.1.4 负载均衡 1.1.5 动静分离 1.2 安装Nginx 1.2.1 安装依赖环境 1.2.2 安装nginx 1.2.3 nginx配置文件(nginx.conf) 1.2.4 反向代理实例1 1.2.5 安装tomcat 1.2.6 反向代理…

插画教育培训机构最新排名

学原画插画在哪里学比较好&#xff0c;最新插画培训班排名&#xff0c;给大家梳理了国内最新5家专业的插画师培训班排名&#xff0c;各有优势和特色&#xff0c;给大家借鉴&#xff01; 一&#xff1a;国内插画培训机构排名 1、轻微课&#xff08;五颗星&#xff09; 主打课程有…

flutter 微信通讯录

Flutter 仿制微信通讯录效果&#xff0c;致效果如下&#xff1a; 有几个技术细节&#xff1a; 总体可滑动&#xff0c;少于屏幕长度也可滑动对于数据的处理。昵称 拼音首字母排序&#xff0c;右侧字母导航&#xff0c;点击/滑动&#xff1b;移动到指定位置当点击/滑动 右侧移动…

大数据实操项目分享:餐饮智能推荐服务在线实习项目

项目背景&#xff1a;在“互联网"背景下&#xff0c;餐饮企业的经营方式发生了很大的变革&#xff1a;团购和020拓宽了销售 渠道&#xff0c;电子点餐、店内WIFI等信息技术提升了服务水平&#xff0c;大数据、私人定制更好地满足了细分市场的需求等。但是与此同时&#xf…

天!转转MySQL机房迁移半小时结束战斗?

文章目录1 背景2 迁移方案选择2.1 方案一&#xff1a;扩容主从切换2.2 方案二&#xff1a;级联切换2.3 方案对比3 如何又快又稳完成MySQL机房迁移3.1 提前搭建级联3.2 停服3.3 批量操作自动化&#xff0c;关键步骤解耦3.4 集群分级3.5 切换前、后置检查3.6 灰度切换验证4 写在最…

rk3288-android8.1-以太网ethernet和蓝牙Bluetooth

遇到一个现象,以太网和蓝牙打不开 经过不断分析和查找发现问题在.config中 CONFIG_MOTORCOMM_PHYy 会导致以太网的eth0注册不成功(现在是双网口,还有个USB网卡) 改成# CONFIG_MOTORCOMM_PHY is not set 后以太网可以正常 # CONFIG_RTC_DRV_RK808 is not set 会导致蓝牙打不…

【分类评价指标】如何评估多(二)分类算法的性能:Acc、Precision、Recall、F1等

【分类评价指标】如何评估多&#xff08;二&#xff09;分类算法的性能&#xff1a;Acc、Precision、Recall、F1等 文章目录【分类评价指标】如何评估多&#xff08;二&#xff09;分类算法的性能&#xff1a;Acc、Precision、Recall、F1等1. 前言2. 二分类任务2.1 混淆矩阵2.2…

工控攻击,黑客组织GhostSec 称入侵以色列55 家Berghof PLC

“巴以冲突”在网络上依然硝烟弥漫。当地时间9月12日消息&#xff0c; 一个名为GhostSec的黑客组织声称入侵了以色列55台Berghof可编程逻辑控制器&#xff08;PLC&#xff09;。该网络攻击行为被视为“解放巴勒斯坦”运动的组成部分。 以色列工业网络安全公司OTORIO对此次事件…