Spark---RDD(Key-Value类型转换算子)

news2024/9/25 7:17:01

文章目录

  • 1.RDD Key-Value类型
      • 1.1 partitionBy
      • 1.2 reduceByKey
      • 1.3 groupByKey
          • reduceByKey和groupByKey的区别
          • 分区间和分区内
      • 1.4 aggregateByKey
          • 获取相同key的value的平均值
      • 1.5 foldByKey
      • 1.6 combineByKey
      • 1.7 sortByKey
      • 1.8 join
      • 1.9 leftOuterJoin
      • 1.10 cogroup

1.RDD Key-Value类型

Key-Value类型的算子即对键值对进行操作。

1.1 partitionBy

将数据按照指定的 Partitioner(分区器) 重新进行分区。Spark 默认的分区器为HashPartitioner,Spark除了默认的分区器外,常见的分区器还有:RangePartitioner、Custom Partitioner、SinglePartitioner等。

函数定义:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

	//使用HashPartitioner分区器并设置分区个数为2
    val data1: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
    data1.partitionBy(new HashPartitioner(2));
    data1.collect().foreach(println)

在这里插入图片描述

1.2 reduceByKey

可以将数据按照相同的 Key 对 Value 进行聚合

函数定义:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

	//将数据按照相同的key对value进行聚合
    val data1: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))
    val data2: RDD[(String, Int)] = data1.reduceByKey((x: Int, y: Int) => {
      x + y
    })
    data2.collect().foreach(println)

在这里插入图片描述

1.3 groupByKey

将数据源的数据根据 key 对 value 进行分组

函数定义:
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

    val dataRDD1 = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))
    val data1 = dataRDD1.groupByKey()
    //指定分区个数为2
    val data2 = dataRDD1.groupByKey(2)
    //指定分区器和分区个数
    val data3 = dataRDD1.groupByKey(new HashPartitioner(2))
    
    data1.collect().foreach(println)
    println("-------------------->")
    data2.collect().foreach(println)
    println("-------------------->")
    data3.collect().foreach(println)

在这里插入图片描述

reduceByKey和groupByKey的区别

从功能的角度来看:reduceByKey包含了分组和聚合功能,而groupByKey只包含了分组功能。
从shuffle的角度来看:为了避免占用过多的内存空间,reduceByKey和groupByKey在执行的过程中,都会执行shuffle操作,将数据打散写入到磁盘的临时文件中,而reduceByKey在进行shuffle前会对数据进行预聚合的操作,致使shuffle的效率得到的提升,因为减少了落盘的数据量。但是groupByKey在shuffle前不会进行预聚合操作。所以,reduceByKey在进行分组的时候,效率相对groupByKey来说较高。

reduceByKey:
在这里插入图片描述

groupByKey:

在这里插入图片描述

分区间和分区内

分区间: 顾名思义,分区间就是指的多个分区之间的操作。如reduceByKey在shuffle操作后将不同分区的数据传输在同一个分区中进行聚合。
分区内: 分区内字面意思指的是单个分区内之间的操作。如reduceByKey的预聚合功能就是在分区内完成

1.4 aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算,如reduceByKey中分区间和分区内都是聚合操作,而使用aggregateByKey可以设置分区间和分区内执行不同的操作。

函数定义:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

    //取出每个分区内相同 key 的最大值然后分区间相加
    // aggregateByKey 算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    // 2. 第二个参数列表中含有两个参数
    // 2.1 第一个参数表示分区内的计算规则
    // 2.2 第二个参数表示分区间的计算规则
    val data1 = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)
    val data2 = data1.aggregateByKey(0)(
      (x,y)=>{Math.max(x,y)},
      (x,y)=>{x+y}
    )
    data2.collect().foreach(println)**

在这里插入图片描述
注意:最终的结果会受到设置的初始值的影响,返回结果的值的类型和初始值保持一致。

获取相同key的value的平均值
    val data1:RDD[(String,Int)] = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4),("b",5),("a",6)),2)
    //设置初始值,初始值为一个元组,元组第一个元素表示value,第二个表示出现次数,初始默认都为0
    val data2:RDD[(String,(Int,Int))] = data1.aggregateByKey((0,0))(
      (t, v)=> {
        (t._1 + v, t._2 + 1)
      } ,//分区内计算
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }//分区间计算
    )
       //和除以次数求出平均值
    val data3 = data2.mapValues({
      case (sum, count) => sum / count
    })
    data3.collect().foreach(println)

在这里插入图片描述

1.5 foldByKey

当分区内和分区间的计算规则相同的时候,aggregateByKey 就可以简化为 foldByKey

函数定义:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("a",3)))
    val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
    dataRDD2.collect().foreach(println)

在这里插入图片描述

1.6 combineByKey

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

函数定义:

def combineByKey[C](
createCombiner: V => C,//对数据进行转换
mergeValue: (C, V) => C, //分区内合并
mergeCombiners: (C, C) => C): RDD[(K, C)] //分区间合并

//将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个 key 的平均值
val rddSource: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
val combinRdd: RDD[(String, (Int, Int))] = rddSource.combineByKey(
      ((x:Int)=>{
        (x,1)
      }),//对每个value进行转换,转换后为(value,1),第一个元素为值,第二个元素为出现的次数
      ((t1:(Int,Int),v)=>{
        (t1._1+v,t1._2+1)
      }),//分区内合并
            ((t1,t2)=>{
        (t1._1+t2._1,t1._2+t2._2)
      })//分区间合并
)
    //mapValues算子是在key保持不变的时候对value进行操作
    val mapRdd: RDD[(String, Int)] = combinRdd.mapValues({
      case ((sum: Int, count: Int)) => sum / count
    })

    mapRdd.collect().foreach(println)

在这里插入图片描述
由此看出,combineByKey和aggreateByKey的不同之处在于,combineByKey可以不设置初始值,只需要对第一个元素进行转换,转换到合适的计算格式即可。

1.7 sortByKey

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

函数定义:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

//升序排序
    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("c",3)))
    val sortRdd: RDD[(String, Int)] = dataRDD1.sortByKey()

    sortRdd.collect().foreach(print)

在这里插入图片描述
sortByKey默认为升序排序,如果想要降序排序,只需要将sortByKey第一个参数修改为false即可。
在这里插入图片描述

1.8 join

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

函数定义:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

	//join操作相当于数据库中的内连接,在连接的时候自动去除两边的悬浮元组
    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5), (3, 6)))

    rdd0.join(rdd1).collect().foreach(print)
    //修改rdd1,使其少了key=3的这个元素
        val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5)))

    rdd0.join(rdd1).collect().foreach(print)

在这里插入图片描述
在这里插入图片描述

1.9 leftOuterJoin

类似于 SQL 语句的左外连接

函数定义:

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5),(3, 6)))

    val rddRes = rdd0.leftOuterJoin(rdd1)
    rddRes.collect().foreach(print)

在这里插入图片描述

1.10 cogroup

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,即先对

函数定义:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b"),(3,"c")))
    val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5)))

    val rddRes = rdd0.cogroup(rdd1)
    rddRes.collect().foreach(print)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380308.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通过代理连接sftp

通过nginx代理连接sftp 1.问题描述2.代码实现3.nginx配置3.1 创建sftp.stream文件3.2 修改nginx配置 4.重启nginx生效 1.问题描述 问题是这样的。我们现在需要在微服务所在内网的A机器连接到外网的sftp,但是网络又不能直接到达。然后A机器到B机器是通过的&#xff…

设计模式之策略模式【行为型模式】

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档> 学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某…

新手入门:软件在测试过程中可能出现哪些问题?走,去看看~

对于很多测试新手来说,想要把自己的测试技术练得更精进,扎实自己的理论知识是必不可少的一门功课。下面,我们就一起来复习一下,那些让我们一知半解或者记不全的理论知识吧。 01 什么是软件测试? 最老套,但…

不要再搞混标准化与归一化啦,数据标准化与数据归一化的区别!!

数据标准化与归一化 1. 数据的标准化(Standardization):2. 数据的归一化(Normalization):总结(数据标准化和数据归一化的不同之处和相同之处) 1. 数据的标准化(Standardi…

【数据结构】二叉树问题总结

目录 1.二叉树前序遍历,中序遍历和后序的实现 2.层序遍历 3.求二叉树中的节点个数 4.求二叉树中的叶子节点个数 5.求二叉树的高度 6.求二叉树第k层节点个数 7.二叉树查找值为x的节点 8.单值二叉树 9.二叉树最大深度 10.翻转二叉树 11. 检查两颗树是否相同…

【Linux实用篇】Linux常用命令(2)

目录 1.3 拷贝移动命令 1.3.1 cp 1.3.2 mv 1.4 打包压缩命令 1.5 文本编辑命令 1.5.1 vi&vim介绍 1.5.2 vim安装 1.5.3 vim使用 1.6 查找命令 1.6.1 find 1.6.2 grep 1.3 拷贝移动命令 1.3.1 cp 作用: 用于复制文件或目录 语法: cp [-r] source dest ​ 说明: …

数据分析概述2(详细介绍机器学习

目录 1.名词解释:1.1算法和模型1.2参数和超参数 2.基础算法:3.高级算法:4.数据准备5.常用python包小结: 1.名词解释: 1.1算法和模型 算法:用于训练模型的方法,分为有监督学习、无监督学习、半…

Centos7安装K8S

Centos7安装K8S 安装过程中没有出现的错误可以往下 根据以前一些博主写的博客,在小阳翻了不下几十篇博客之后,我果断是放弃了,于是找到了官网地址,然后也有坑 1. 关闭防火墙 systemctl stop firewalld systemctl disable firew…

算法通关村番外篇-LeetCode编程从0到1系列四

大家好我是苏麟 , 今天带来算法通关村番外篇-LeetCode编程从0到1系列四 . 矩阵 1672. 最富有客户的资产总量 描述 : 给你一个 m x n 的整数网格 accounts ,其中 accounts[i][j] 是第 i​​​​​ 位客户在第 j 家银行托管的资产数量。返回最富有客户所拥有的 资产…

【UEFI基础】EDK网络框架(IP4)

IP4 IP4协议说明 IP全称Internet Protocol,它属于网络层,对其下各种类型的数据链路层进行了包装,这样网络层可以跨越不同的数据链路,即使是在不同的数据链路上也能实现两端节点之间的数据包传输。 IP层的主要作用就是“实现终端…

java应用CPU过高查找原因

用top查到占用cpu最高的进程pid 根据进程ID找到占用CPU高的线程 ps -mp 60355 -o THREAD,tid | sort -r 用 printf "%x \n" 将tid换为十六进制:xid printf "%x \n" 6036 根据16进制格式的线程ID查找线程堆栈信息 jstack 60355 |grep ebcb -A…

Fiddler怎么抓请求做接口

第一步:安装fiddler 可以在官网下载最新版本 VIP小伙伴可以在课前准备下载,如果已经安装,请忽略 打开fiddler开始抓取测试对象的请求,以教管系统登录为例 打开fiddler,准备开始抓取,清空会话,开启抓取状态,因为fiddler默认抓取http协议,所以这里不需要设置什么。 打开系统登…

Windows项目部署流程

一、部署前需要的工作 ①配置环境变量:将所需的软件和工具的安装路径添加到系统的环境变量中,以便在命令行中可以直接使用。 ②部署项目文件:将项目的文件和代码复制到服务器上的指定目录中,例如Web服务器的网站根目录。 ③配置…

idea使用谷歌翻译 有道翻译 百度翻译

中文版:文件——设置——工具——Translation,然后选择需要配置的翻译 英文版:File——settings——Tools——Translation,然后选择需要配置的翻译 谷歌翻译大家可以度娘直接搜索Google_translation_win 有道和百度翻译得去注册申…

什么是有机搜索引擎优化以及如何入门

什么是有机搜索引擎优化? 有机搜索引擎优化,简称 SEO,是指从搜索引擎的无偿搜索结果中增加网站流量的做法。 未付费搜索结果是获得的列表,而不是付费的。 这样做的目的是让您的网页在与您业务相关的未付费搜索结果中排名靠前。…

Spring自带分布式锁你用过吗?

环境:SpringBoot2.7.12 本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。 1. 简介 Spring Integration 是一个框架,用于构建事件驱动的应用程序。在 Spring Integration 中,LockRegistry 是一个接口,用于管理…

无需任何三方库,在 Next.js 项目在线预览 PDF 文件

前言: 之前在使用Vue和其它框架的时候,预览 PDF 都是使用的 PDFObject 这个库,步骤是:下载依赖,然后手动封装一个 PDF 预览组件,这个组件接收本地或在线的pdf地址,然后在页面中使用组件的车时候…

黑马程序员 Docker笔记

本篇学习笔记文档对应B站视频: 暂时无法在飞书文档外展示此内容 同学们,在前两天我们学习了Linux操作系统的常见命令以及如何在Linux上部署一个单体项目。大家想一想自己最大的感受是什么? 我相信,除了个别天赋异禀的同学以外&a…

学习笔记-mysql基础(DDL,DML,DQL)

一.DDL DDL,Data Definition Language,数据库定义语言,该语言包括以下内容: 对数据库的常用操作对表结构的常用操作修改表结构 1.对数据库的常用操作 -- 查看所有的数据库 show databases -- 创建数据库 create database [if not exists] test [charsetutf8] -- 切换 选择 …

如何判断 vite 的运行环境是开发模式还是生产模式 production? development?

如何判断 vite 的运行环境是开发模式还是生产模式 production? development? vite 有两种获取当前运行环境模式的方法: 官方说明: 完整说明地址: https://cn.vitejs.dev/guide/env-and-mode.html#node-env-and-modes…