[spark] RDD 编程指南(翻译)

news2024/11/16 13:36:32

Overview

从高层次来看,每个 Spark 应用程序都包含一个driver program,该程序运行用户的main方法并在集群上执行各种并行操作。

Spark 提供的主要抽象是 resilient distributed dataset(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统中的文件开始创建的。用户还可以要求 Spark 将 RDD 持久保存在内存中,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。

Spark 中提供的第二抽象是 shared variables ,他可以用在并行操作中。默认情况下,当 Spark 将函数作为一组任务(task)在不同节点上并行运行时,它会将函数中使用的每个变量的副本携带给每个任务。有时,变量需要在任务之间共享或者在driver program和任务之间共享。Spark 支持两种类型的 shared variables,一是 broadcast variables 可用于在所有节点的内存中缓存一个值,二是 accumulators

,他是仅“added”的变量,例如counters和sums。

Resilient Distributed Datasets (RDDs)

Spark围绕 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。有两种方法可以创建RDD:在driver program中并行化现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。

Parallelized Collections

并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行化集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,分布式数据集(distData)就可以并行操作。并行集合的一个重要参数是将数据集划分为的分区数量。 Spark 将为集群的每个分区(partition)运行一个任务(task),任务将分配给节点执行。可以手动指定分区数或使用默认值。

RDD Operations

RDD 支持两种类型的操作:

  • transformations(从现有dataset创建新dataset)。例如,map 是一种transformations,它将每个dataset每个元素传递给函数并返回表示结果的新 RDD
  • actions(在对dataset运行计算后将值返回给driver program)。例如,reduce 是一个使用某个函数聚合 RDD 的所有元素并将最终结果返回给driver program的操作

Spark中的所有transformations都是 lazy 的,因为它们不会立即计算结果。相反,它们只记住应用于某些基本 dataset(例如文件)的transformations。只有当操作需要将结果返回给driver program时,transformations 才会被计算。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到,通过map创建的dataset将在reduce中使用,并且只将reduce的结果返回给driver program,而不是更大的映射dataset。

默认情况下,每次对transform后的RDD运行操作时,都可能会被重新计算。但是,您也可以使用持久(或缓存)方法将RDD持久化在内存中,在这种情况下,Spark将保留集群中的元素,以便在您下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点复制。如下图所示,如果不cache/persist 任何内容,那么每次您需要输出时(当您调用诸如“count”之类的操作时),都会从磁盘读取数据并完成操作。您可以在读取后进行缓存,然后所有其他操作都会跳过读取并从缓存的数据开始。

在这里插入图片描述

为了说明 RDD 基础知识,请看下面的简单程序:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行定义了来自外部文件的基本RDD。该数据集没有加载到内存中,也没有以其他方式对其进行操作:行只是指向文件的指针。第二行将lineLengths定义为map transformation的结果。同样,由于 lazy,lineLengths不会立即计算。最后,我们运行 reduce,这是一个操作。此时,Spark将计算分解为在不同机器上运行的任务,每台机器都运行其 map 和本地数据的 reduce,只将其答案返回给driver program。

如果 lineLengths 可能被再次使用,可以增加下面代码

lineLengths.persist(StorageLevel.MEMORY_ONLY());

在reduce之前,这会导致lineLengths在第一次计算后保存在内存中。

Understanding closures

关于Spark,更难的事情之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能是混淆的常见来源。在下面的示例中,我们将查看使用foreach()增加计数器的代码,但其他操作也可能出现类似的问题。

Example

考虑下面简单的计算,将RDD元素sum。根据是否在同一JVM中执行,它的行为可能会有所不同。一个常见的例子是在本地模式下运行Spark(–master=local[n])与将Spark应用程序部署到集群(例如,通过Spark提交到YARN):

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);
Local vs. cluster modes

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行器执行。在执行之前,Spark计算任务的 closures。closures 是执行器在RDD上执行计算(在本例中为foreach())时必须可见的变量和方法。此closures 被序列化并发送给每个执行器。

发送给每个excutor的closure中的变量现在是副本,因此,当在foreach函数中引用counter时,它不再是driver program上的count。driver program的内存中仍然有一个counter,但不再对excutor可见!

在本地模式下,在某些情况下,foreach 函数实际上将在与driver program相同的 JVM 中执行,并且将引用相同的原始counter,并且可能会更新它。

为了确保在这些场景中定义良好的行为,应该使用 Accumulator.。Spark中的 Accumulator专门用于提供一种机制,用于在集群中跨worker node 执行时安全地更新变量.

一般来说,closure——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark不定义或保证对从闭包外部引用的对象的更改行为。执行此操作的一些代码可能在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请使用Accumulator。

Printing elements of an RDD

另一个常见的习惯用法是尝试使用rdd. foreach(println)或rdd.map(println)打印出RDD的元素。在单台机器上,这将生成预期的输出并打印RDD的所有元素。但是,在集群模式下,excutor 调用的stdout的输出现在写入执行程序的stdout,而不是driver program上的stdout,因此driver program上的stdout不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 首先将RDD带到 driver program 节点,因此使用:rdd.collect().foreach(println).

Working with Key-Value Pairs

虽然大多数Spark操作适用于包含任何类型对象的RDD,但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作,例如通过键对元素进行分组或聚合,reduceByKey和sortByKey等。

Shuffle operations

Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区之间以不同的方式分组。这通常涉及跨executor和机器复制数据,这使得shuffle成为一项复杂且成本高昂的操作。

为了理解在shuffle过程中会发生什么,我们可以考虑一个例子,这个例子中有一个reduceByKey 操作,它生成一个新的RDD,其中一个键的所有值都被组合成一个tuple,这个tuple就是键和对与该键相关的所有值执行一个reduce函数的结果。挑战在于,单个键的所有值不一定都位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。

对于大多数操作,Spark不会自动地将数据重新分布到特定的节点或分区以满足特定操作的需要。相反,每个任务通常只处理一个分区内的数据。然而,对于像reduceByKey这样的操作,Spark需要将具有相同键(key)的所有值(value)聚合在一起以进行计算。这意味着,如果这些值分布在不同的分区中,Spark必须执行一个全局的重组操作(all-to-all operation),这个过程被称为shuffle。在shuffle过程中,Spark会执行以下步骤:

  1. 读取所有分区的数据,以找出每个键对应的所有值。
  2. 将具有相同键的值跨分区传输到相同的节点,以便可以对它们进行聚合。
  3. 在每个节点上,对每个键的所有值进行最终的聚合计算,得到每个键的最终结果。

可能导致随机播放的操作包括repartition operations like repartition and coalesce‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

Shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织shuffle的数据,Spark生成一组任务——map 任务来组织数据,以及一组reduce任务来聚合数据。这个术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

从内部来看,单个map任务的结果保存在内存中直到内存放不下。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序block

某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存中的数据结构在传输数据之前或之后组织数据。具体来说,reduceByKey和aggregateByKey在map端创建这些结构,而’ByKey操作在reduce端生成这些结构。当数据在内存放不下时,spark会将这些数据spill到磁盘,从而导致磁盘IO的额外开销和垃圾回收机制的增加。

Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾收集掉。这样做是为了在重新计算lineage时不需要重新创建随机文件。垃圾收集可能只在很长一段时间后发生,如果应用程序保留对这些RDD的引用,或者如果GC不经常启动。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录在配置Spark上下文时由park. local.dir配置参数指定。

RDD Persistence

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)dataset。当您持久化RDD时,每个节点都将其计算的任何分区存储在内存中,并在该dataset(或从该dataset派生的dataset)的其他操作中重用它们。这使得未来的操作更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用RDD上的persist() cache()方法将其标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的RDD都可以使用不同的storage level,来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的Java对象(以节省空间),跨节点复制它。cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。

Spark还会在shuffle操作中自动持久化一些中间数据(例如,reduceByKey),即使用户没有调用persist。这样做是为了避免在shuffle期间节点发生故障时重新计算整个input。如果用户计划重用新生成的RDD,我们仍然建议他们在生成的RDD上调用persist。

Which Storage Level to Choose?

Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个:

  • 如果您的RDD适合默认存储级别(MEMORY_ONLY),请保持这样。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
  • 如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但访问速度仍然相当快。(Java和Scala)
  • 不要spill到磁盘,除非计算数据集的函数很重,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果您想要快速故障恢复(例如,如果使用Spark处理来自Web应用程序的请求),请使用replicated 的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

Removing Data

Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果您想手动删除RDD而不是等待它从缓存中删除,请使用RDD.unpersist()方法。请注意,此方法默认不阻塞。要在释放资源之前阻塞,请在调用此方法时指定blocking=true。

Shared Variables

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且远程机器上的变量的更新不会传播回driver program。跨任务支持通用的读写共享变量将是低效的。然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:broadcast variables and accumulators.

Broadcast Variables

广播变量允许程序员将只读变量缓存在每台机器上,而不是将其副本与task一起发送。例如,它们可以用来以有效的方式为每个节点提供大型输入数据集的副本,减少了数据传输的开销从task粒度下降到节点粒度。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark action通过一组stage执行,由分布式“shuffle”操作分隔。Spark自动广播每个stage内task所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个stage的task需要相同数据或以反序列化形式缓存数据很重要时才有用。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法访问它的值。下面的代码显示了这一点:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

创建广播变量后,应该在集群上运行的任何函数中使用它而不是值v,这样v就不会多次发送到节点。此外,对象v在广播后不应该被修改,以确保所有节点都获得相同的广播变量值(例如,如果变量稍后传送到新加入的节点)。

要释放广播变量复制到执行器上的资源,请调用.unpersist()。如果广播之后再次使用,它将被重新广播。要永久释放广播变量使用的所有资源,请调用.destroy()。之后广播变量就不能使用了。请注意,这些方法默认情况下不会阻塞。要阻塞直到资源被释放,请在调用它们时指定blocking=true。

Accumulators

Accumulators是仅通过关联和交换运算“added”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。如下图所示,修改该累加器的阶段将在Web UI中显示一个命名累加器(在本例中为计数器)。Spark在“任务”表中显示由任务修改的每个累加器的值。

Accumulators in the Spark UI

然后,在集群上运行的task可以使用add方法add到Accumulators。但是,他们无法读取其值。只有driver program可以使用其value方法读取累加器的值。

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

对于仅在action中执行的Accumulators更新,Spark保证每个任务对Accumulators的更新只会应用一次,即重新启动的任务不会更新值。在transformations中,用户应该知道,如果重新执行任务或作业阶段,每个任务的更新可能会应用不止一次。

累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为action的一部分进行计算时,它们的值才会更新。因此,在像 map() 这样的惰性转换中进行累加器更新时,不能保证执行。下面的代码片段演示了这个属性:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

reference

https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1476813.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多个版本的Python如何不冲突?

转载文章&#xff0c;防止忘记或删除 转载于&#xff1a;电脑中存在多个版本的Python如何不冲突&#xff1f; - 知乎 (zhihu.com) 如何安装多版本的Python并与之共存&#xff1f; 如果你的工作涉及到Python多版本之间开发或测试&#xff0c;那么请收藏本文&#xff0c; 如果你…

RabbitMQ讲解与整合

RabbitMq安装 类型概念 租户 RabbitMQ 中有一个概念叫做多租户&#xff0c;每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器&#xff0c;这些虚拟的消息服务器就是我们所说的虚拟主机&#xff08;virtual host&#xff09;&#xff0c;一般简称为 vhost。 每一个 vhos…

【Java程序设计】【C00331】基于Springboot的驾校预约学习系统(有论文)

基于Springboot的驾校预约学习系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的驾校预约学习系统&#xff0c;本系统有管理员、用户和教练三种角色&#xff1b; 管理员&#xff1a;个人中心、管理员管理、教练…

哈夫曼树的介绍

定义 路径长度&#xff1a;从根结点到该结点所经过的边数。 叶子结点的带权路径长度&#xff1a;叶子结点的权值*路径长度 树的带权路径长度&#xff1a;所有叶子结点的带权路径长度之和 哈夫曼树&#xff1a;带权路径长度最小的树&#xff0c;也称最优二叉树。 构造 反复选…

Scala Intellij编译错误:idea报错xxxx“is already defined as”

今天写scala代码时,Idea报了这样的错误&#xff0c;如下图所示&#xff1a; 一般情况下原因分两种&#xff1a; 第一是我们定义的类或对象重复多次出现&#xff0c;编译器无法确定使用哪个定义。 这通常是由于以下几个原因导致的&#xff1a; 重复定义&#xff1a;在同一个文件…

LNMP架构的源码编译环境下部署Discuz!社区论坛与Wordpress博客

一.编译安装Nginx 1.关闭防火墙 systemctl stop firewalld systemctl disable firewalld setenforce 0 2.安装依赖包 yum -y install pcre-devel zlib-devel gcc gcc-c make 3.创建运行用户 nginx 服务程序默认 以 nobody 身份运行&#xff0c;建议为其创建专门的用户账户&…

飞天使-学以致用-devops知识点4-SpringBoot项目CICD实现

文章目录 代码准备创建jenkins 任务测试推送使用项目里面的jenkinsfile 进行升级操作 代码准备 推送代码到gitlab 代码去叩叮狼教育找 k8s 创建jenkins 任务 创建一个k8s-cicd-demo 流水线任务 将jenkins 里面构建时候的地址还有token&#xff0c; 给到gitlab里面的webhooks…

MySQL 的数据库操作,利用Spring Boot实现MySQL数据库的自动创建

执行 show databases; 命令可以查看当前数据库的所有数据库。 注意在 MySQL 客户端执行 SQL 语句的时候要带上分号 ; 并按下 enter 键&#xff0c;不然 MySQL 会认为你还没有输入完&#xff0c;会换一行继续等待你输入。 OK&#xff0c;像上面截图中的 information_schema、mys…

2024/02/28

绘制思维导图 将今天的模拟面试内容进行整合并上传作业 1、什么是回调函数? 回调函数是一种作为参数传递给其他函数的函数&#xff0c;在 C 语言中&#xff0c;函数指针允许我们将函数作为参数传递给其他函数&#xff0c;从而实现回调函数的功能&#xff0c;例如线程的创建函…

【Vue】插槽-slot

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Vue ⛺️稳中求进&#xff0c;晒太阳 插槽 作用&#xff1a;让组件内部一些 结构 支持 自定义 插槽的分类&#xff1a; 默认插槽。具名插槽。 基础语法 组件内需要定制的结构部分&…

如何利用HubSpot出海营销CRM实现品牌建设与传播的有效管理?

利用HubSpot出海营销CRM优化客户互动和沟通可以通过以下方式实现&#xff1a; 个性化客户管理&#xff1a; 利用HubSpot的客户管理功能&#xff0c;集中管理客户信息&#xff0c;并根据客户的行为、偏好和历史数据等信息进行个性化分类和标记。这样可以更好地了解客户需求&am…

[CSS]文字旁边的竖线以及布局知识

场景&#xff1a;文字前面常见加竖线。 .center-title { 常见内容color: #FFF;font-family: "Source Han Sans CN";font-size: 50px;font-style: normal;font-weight: 700;line-height: normal;position: relative; 要定位left: 16px; 这里是想拉开间距margin-b…

力扣-跳跃游戏

问题 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 解答 class Solu…

CUMT---图像处理与视觉感知---期末复习重点

文章目录 一、概述 本篇文章会随课程的进行持续更新中&#xff01; 一、概述 1. 图像的概念及分类。  图像是用各种观测系统以不同形式和手段观测客观世界而获得的、可以直接或间接作用于人的视觉系统而产生的视知觉实体。  图像分为模拟图像和数字图像&#xff1a;(1) 模拟图…

开源的 Python 数据分析库Pandas 简介

阅读本文之前请参阅-----如何系统的自学python Pandas 是一个开源的 Python 数据分析库&#xff0c;它提供了高性能、易用的数据结构和数据分析工具。Pandas 特别适合处理表格数据&#xff0c;例如时间序列数据、异构数据等。以下是对 Pandas 的简明扼要的介绍&#xff0c;包括…

基于React, Redux实现的俄罗斯方块游戏及源码

分享一个俄罗斯方块游戏游戏框架使用的是 React Redux&#xff0c;其中再加入了 Immutable&#xff0c;用它的实例来做来Redux的state。&#xff08;有关React和Redux的介绍可以看 安装 npm install运行 npm start浏览自动打开 http://127.0.0.1:8080/ 打包编译 npm run …

Vue源码系列讲解——生命周期篇【七】(模板编译阶段)

目录 1. 前言 2. 模板编译阶段分析 2.1 两种$mount方法对比 2.2 完整版的vm.$mount方法分析 3. 总结 1. 前言 前几篇文章中我们介绍了生命周期的初始化阶段&#xff0c;我们知道&#xff0c;在初始化阶段各项工作做完之后调用了vm.$mount方法&#xff0c;该方法的调用标志…

mongoDB 优化(1)索引

1、创建复合索引&#xff08;多字段&#xff09; db.collection_test1.createIndex({deletedVersion: 1,param: 1,qrYearMonth: 1},{name: "deletedVersion_1_param_1_qrYearMonth_1",background: true} ); 2、新增索引前&#xff1a; 执行查询&#xff1a; mb.r…

第3部分 原理篇2去中心化数字身份标识符(DID)(4)

3.2.3. DID解析 3.2.3.1. DID解析参与方 图3-5 DID 解析过程 本聪老师&#xff1a;我们之前提到过&#xff0c;DID 解析过程是将 DID 转换为对应的 DID 文档。这样做的目的是验证 DID 所代表的主体的身份。那么解析过程会涉及哪些概念呢&#xff1f;我们看图3-&#xff0c;DI…

uniapp 微信小程序使用高德地图Vue3不兼容Vue2问题

1. uniapp 微信小程序使用高德地图Vue3不兼容Vue2问题 1.1. 问题 uniapp Vue3项目引用高德地图报错 import amapPlugin from ‘…/…/…/js_sdk/js_amap/amap-wx.130’; "default" is not exported by "../../../MyProject/Base/Szy/js_sdk/js_amap/amap-wx.1…