reduceByKey 和 groupByKey 的分析与区别

news2024/11/17 17:51:09

reduceByKey

在这里插入图片描述

源码

在这里插入图片描述

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
   */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

实例演示

package spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark15_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("a", 3), ("b", 4)
        ))

        // reduceByKey : 相同的key的数据进行value数据的聚合操作
        // scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
        // 【1,2,3】
        // 【3,3】
        // 【6】
        // reduceByKey中如果key的数据只有一个,是不会参与运算的。
        val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
            println(s"x = ${x}, y = ${y}")
            x + y
        } )

        reduceRDD.collect().foreach(println)




        sc.stop()

    }
}

总结

reduceByKey是一个RDD的转换操作,用于按键对RDD中的值进行聚合。它将具有相同键的值使用指定的聚合函数进行合并,返回一个新的RDD,其中每个键对应一个聚合结果。

reduceByKey的语法如下:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

其中,func是一个接受两个相同类型的值并返回一个合并结果的聚合函数。它被应用于具有相同键的所有值。K表示键的类型,V表示值的类型。

使用reduceByKey的示例:

val rdd = sparkContext.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
val reducedRDD = rdd.reduceByKey((x, y) => x + y)
reducedRDD.foreach(println)

输出结果为:

(a, 4)
(b, 6)

在上述示例中,初始RDD rdd 包含了键值对的序列。通过应用reduceByKey操作,使用加法函数将具有相同键的值进行合并。最终得到的reducedRDD 包含了每个键对应的聚合结果。

需要注意的是,reduceByKey操作是按照键进行本地聚合,并且可以利用Spark的并行处理能力。它在进行全局聚合之前先在各个分区内进行本地聚合,从而减少了数据传输和Shuffle操作的开销。聚合函数应该满足结合律,以确保最终结果的准确性。

此外,还有一些其他的聚合操作可以用于键值对的RDD,如groupByKeyaggregateByKeyfoldByKey,每个操作都有不同的特点和适用场景。根据具体的需求,选择合适的聚合操作可以提高代码的效率和性能。

groupByKey

在这里插入图片描述

源码

在这里插入图片描述

  /**
   * Group the values for each key in the RDD into a single sequence. Allows controlling the
   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
   * The ordering of elements within each group is not guaranteed, and may even differ
   * each time the resulting RDD is evaluated.
   *
   * @note This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
   *
   * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
   * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
   */
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

  /**
   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
   * resulting RDD with into `numPartitions` partitions. The ordering of elements within
   * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
   *
   * @note This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
   * or `PairRDDFunctions.reduceByKey` will provide much better performance.
   *
   * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
   * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
   */
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }

实例演示

package spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark16_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("a", 3), ("b", 4)
        ))

        // groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
        //              元组中的第一个元素就是key,
        //              元组中的第二个元素就是相同key的value的集合
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

        groupRDD.collect().foreach(println)

        val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)




        sc.stop()

    }
}

总结

groupByKey是一个RDD的转换操作,用于按键对RDD中的值进行分组。它将具有相同键的所有值放置在同一个分区,并将它们作为一个键值对序列返回,每个键对应于一个由该键的所有值组成的迭代器。

groupByKey的语法如下:

def groupByKey(): RDD[(K, Iterable[V])]

其中,K表示键的类型,V表示值的类型。

使用groupByKey的示例:

val rdd = sparkContext.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
val groupedRDD = rdd.groupByKey()
groupedRDD.foreach(println)

输出结果为:

(a, CompactBuffer(1, 3))
(b, CompactBuffer(2, 4))

在上述示例中,初始RDD rdd 包含了键值对的序列。通过应用groupByKey操作,将具有相同键的值进行分组,并返回一个新的RDD groupedRDDgroupedRDD中的每个键对应一个迭代器,包含了该键的所有值。

需要注意的是,groupByKey操作会导致Shuffle操作,因为它需要将具有相同键的值放置在同一个分区中。在处理大规模数据时,Shuffle操作可能成为性能瓶颈,因此需要谨慎使用。如果只是需要对具有相同键的值进行聚合操作,通常更推荐使用reduceByKey操作,因为它可以在分区内进行本地聚合,减少Shuffle操作的开销。

此外,groupByKey返回的值是一个键值对的RDD,其中每个键对应于一个迭代器。在对迭代器中的值进行处理时,需要谨慎处理内存使用,特别是当每个键的值很大时。可以使用mapValues等方法进一步处理每个键的值。

reduceByKey 和 groupByKey 的区别

当涉及到键值对RDD的聚合操作时,reduceByKeygroupByKey之间的区别如下所示:

  1. 数据传输和Shuffle操作的开销:

    • reduceByKey:在进行聚合操作之前,reduceByKey会在各个分区内先进行本地聚合,然后再进行全局聚合。它会将具有相同键的值进行合并,并通过并行化地在每个分区内进行本地聚合,从而减少数据传输和Shuffle操作的开销。
    • groupByKeygroupByKey会将具有相同键的所有值都放置在同一个分区中,并将它们作为迭代器返回。这意味着所有具有相同键的数据都会经过Shuffle操作,不进行本地聚合。这可能导致大量的数据传输和高昂的Shuffle开销。
  2. 内存使用和性能:

    • reduceByKey:由于reduceByKey在每个分区内进行本地聚合,因此它可以更有效地利用内存,尤其是当数据集很大时。它通过将每个键的值进行迭代并应用聚合函数,产生一个最终的结果,因此输出的RDD的每个键都对应一个聚合后的值。这可以减少内存占用并提高性能,尤其在处理大规模数据时。
    • groupByKeygroupByKey将具有相同键的所有值放入内存中的迭代器中,并将它们作为一个键值对序列返回。由于它不进行本地聚合,因此可能会占用大量内存。此外,如果每个键的值非常大,将所有值放入内存中可能导致OutOfMemoryError。此外,由于groupByKey返回的值是迭代器,因此需要遍历整个迭代器才能访问每个键的所有值。

3. 结果类型:

  • reduceByKeyreduceByKey的输出是一个具有唯一键和聚合值的键值对RDD,其中每个键对应于一个聚合结果。
  • groupByKeygroupByKey的输出是一个键值对RDD,其中每个键对应于一个由该键的所有值组成的迭代器。因此,对于具有相同键的所有值,可能需要进一步操作才能得到聚合结果。

综上所述,如果目标是对键值对进行聚合操作,并且数据集很大,通常建议使用reduceByKey。它可以在分区内进行本地聚合,减少Shuffle操作的开销和内存使用。而groupByKey适用于需要获取具有相同键的所有值的场景,但要注意它可能占用大量内存并导致性能问题。

4.从 shuffle 的角度:

  • reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

5.从功能的角度:

  • reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/700322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

保姆级教程:带你体验华为云测试计划CodeArts TestPlan

华为云测试计划(CodeArts TestPlan)是面向软件开发者提供的一站式云端测试平台,覆盖测试管理、接口测试,融入DevOps敏捷测试理念,帮助您高效管理测试活动,保障产品高质量交付。 登录华为云账号&#xff1a…

使用JMeter安装RabbitMQ测试插件的步骤

整体流程如下:先下载AMQP插件源码,可以通过antivy在本地编译成jar包,再将jar包导入JMeter目录下,重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序,使开发人员更轻…

【2023年江西省研究生数学建模竞赛】题目一 蒸汽发生器倒U型管内液体流动 建模方案及参考文献

代码与结果如下:完整文档见文末 完整思路”请点击这里“到原文章获取 题目: PACTEL压水堆整体测试设备在2009年建造,用于带有垂直倒U型管蒸汽发生器的压水堆热液压相关的安全性研究,参见图1。 PACTEL压水堆设施包括一个反应堆压力容器模型…

Redis如何统计一个亿的keys?

前言 不知你大规模的用过Redis吗?还是仅仅作为缓存的工具了?在Redis中使用最多的就是集合了,举个例子,如下场景: 签到系统中,一天对应一系列的用户签到记录。 电商系统中,一个商品对应一系列的…

ARM-SWI 和未定义指令异常中断处理程序的返回(七)

文章目录 处理流程示例代码实现SWI未定义指令 附录源码 处理流程 SWI 和未定义指令异常中断是由当前执行的指令自身产生的,当 SWI 和未定义指令异常中断产生时,程序计数器的 PC 的值还未更新,它指向当前指令后面第 2 条指令(对于…

Git的常见操作

Git版本控制 开发难题 在实际开发中我们会遇到一些问题,电脑蓝屏,代码丢了,懊悔不? 时间长了,文件找不到了。懊悔不?手欠,之前代码运行好好的,非要去优化下。结果还 不如以前&am…

京东天猫数据查询与分析:2023年厨电细分市场数据分析

随着消费者对生活品质的追求持续提高,我国厨房电器产品的需求也日趋多样化,市场中厨房电器的品类越来越多,我国厨房电器的市场规模也不断扩大。 根据鲸参谋电商数据显示,2023年1月至4月,天猫平台上厨房电器的销量为670…

搭建个人hMailServer 邮件服务实现远程发送邮件

文章目录 1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 转载自cpolar极点云文章:搭建个人hMailServer 邮件服务实现远程发送邮件 hM…

ChatGPT微调系列一:总述 微调 的基本流程

文章目录 前言一、啥叫微调二、为啥要微调三、不是所有模型都可以微调的四、总述微调的基本流程,以及涉及的主要函数,参数1. 安装2. 准备训练数据3. openai.api_key os.getenv() 进行一个说明4. 通过API 调用模型 常用函数5. 微调模型 常用函数6. OpenA…

Maven 使用详细教程

目录 Maven 介绍 Maven 安装 1、安装JDK 2、下载Maven安装文件 3、配置环境变量 4、检测安装成功 Maven 标准工程结构 Maven 版本要素 Maven仓库 1、本地仓库: 2、中央仓库 3、其他远程仓库 创建Maven工程 使用命令方式创建Maven工程 Eclipse中创建…

智能大棚自动控制系统 实现传统农业精细化管理

新型农业经营主体管理系统是指为了适应农村经济发展需求,提高农业生产组织化、规模化、现代化程度,促进农业产业结构调整和农村产业转型升级,推动农村经济社会持续健康发展而建立的一套管理体系。 该系统主要包括农产品生产、种植、养…

JavaWeb学习路线(8)——登录

一、基本登录功能 (一)需求: 根据账号与密码判别用户是否可以登录 (二)实现步骤 Controller接收传递的JSON格式数据,使用RequestBody实体类进行接收,调用Service具体处理。Service创建登录接…

循环购应运而生,让老百姓敢于消费、有钱消费、愿意消费

​小编介绍:10年专注商业模式设计及软件开发,擅长企业生态商业模式,商业零售会员增长裂变模式策划、商业闭环模式设计及方案落地;扶持10余个电商平台做到营收过千万,数百个平台达到百万会员,欢迎咨询。 无论…

7 植物背景分离、RGB、HSV特征提取案例(matlab程序)

学习目标:背景分离和RGB等特征提取 1.简述 叶片RGB图像背景精确分离的方法,包括以下图像背景分离方法:S1:选取叶片,所得到的原始图像;S2:采用MATLAB 2016R软件将RGB图像转化为HSV图像,以饱和度0.190.21为界限,将小于界限的图像明度调整为0,并转化为灰度图;S3:用edg…

Redis高并发分布式锁

文章目录 高并发场景秒杀抢购超卖Bug高并发场景秒杀抢购Demo测试结果 JVM级别锁使用nginx对本地服务进行负载均衡 Redis实现分布式锁Redis分布式锁实现DemoRedis分布式锁有关问题 分布式锁性能的提升减少锁的粒度使用异步处理 高并发场景秒杀抢购超卖Bug 在今天的数字化世界中&…

供应商索赔(金税数据)导入并创建凭证(ALV长篇备忘三)

情境/背景:供应商三包索赔款项源起QMS质量系统,联动金税系统完成发票开具,最终在SAP系统中创建完成财务凭证。该流程为手工操作,费时费力且效率低下容易出错。 目标/任务:把QMS供应商三包索赔业务搬上线,同SAP FI顾问梳理功能说明书&#xf…

2023-06-29:redis中什么是热点Key?该如何解决?

2023-06-29:redis中什么是热点Key?该如何解决? 答案2023-06-29: 在Redis中,经常被访问的key被称为热点key。 产生原因和危害 原因 热点key问题产生的原因可以归纳为以下两种情况: 用户对于某些数据的…

安卓弹出popup之XPopup

弹窗自己写的话。虽然很简单。但不够丝滑。如果要优雅点的。又要添加动画。但是。。。如果用上了XPopup,动画别人帮你写。爽不爽?丝滑不丝滑。。? 丝滑第一步。先引入依赖 implementation com.github.li-xiaojun:XPopup:2.9.19如果没有这些…

git版本回退操作

本文 git 相关命令: git reset:回退版本,可指定某一次提交的版本。git reset [--soft | --mixed | --hard] commitId。git revert:撤销某个提交,做反向操作,生成新的commitId,原有提交记录保留…

基于java+swing+mysql图书管理系统V7.0

基于javaswingmysql图书管理系统V7.0 一、系统介绍二、功能展示1.项目骨架2.数据库表3.项目内容4.主界面5.登陆6、借阅管理7、修改读者信息8、图书验收9、新书订购 四、其它1.其他系统实现五.获取源码 一、系统介绍 项目类型:Java SE项目(awtswing&…