大数据Spark面试题2023

news2024/7/6 19:11:00

文章目录

    • Spark核心——RDD
      • 概念
      • 特点
      • 创建方式
      • RDD的分区依赖关系
    • Spark的shuffle介绍
    • Spark的 Partitioner 分区器都有哪些?
    • Spark中的算子都有哪些
    • RDD工作流📌
    • Spark运行模式(资源调度框架的使用,了解)📌
    • 讲一下Spark 的运行架构
    • 一个spark程序的执行流程
    • spark的stage是如何划分的
    • Spark的 RDD容错机制。
    • checkpoint 检查点机制?
    • Spark为什么快,Spark SQL 一定比 Hive 快吗
    • RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么
    • RDD 持久化原理?
    • checkpoint 和持久化机制的区别?
    • RDD懒加载是什么意思
    • spark 解决了 hadoop 的哪些问题(spark VS MR)?
    • spark有哪几种join
    • hadoop 和 spark 的相同点和不同点?
    • 如何将spark-sql的Row转成Java对象?

Spark核心——RDD

概念

RDD就是 Resillient Distributed Dataset,即弹性分布式数据集
RDD在抽象上来讲是一种抽象的分布式的数据集合、简单的理解成一种数据结构它是被分区的,每个分区分布在集群中的不同的节点上。从而可以让数据进行并行的计算

它是Spark 框架上的通用货币。所有算子都是基于 RDD 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。RDD执行过程中会形成 DAG图,然后形成 lineage 保证容错性等。

从物理的角度来看 RDD 存储的是 blocknode 之间的映射。
从逻辑的角度来看是一个 hdfs 文件,在抽象上是一种元素集合集合。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD 中的数据可以被并行操作(分布式数据集)

特点

它主要特点就是弹性和容错性。

  • 弹性RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘
  • 容错性RDD可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者都是透明的。

创建方式

  • 接受一个已经存在的集合,进行并行计算
  • 引用HDFS或者HIVE 表来创建

RDD的分区依赖关系

分区之间的依赖关系——宽、窄依赖;

区分方法:

  • 区别宽窄依赖的核心点是 子RDDpartition与父RDDpartition是否是1对多的关系
  • 存在shuffle就是宽依赖,否则就是窄依赖

窄依赖:Narrow Dependency
RDD和子RDD是一对一的依赖关系,如mapfilter

宽依赖:Shuffle Dependency
本质就是shuffle。如reduceByKeygroupyByKey,父RDD一个分区数据给了子RDD的多个分区

Spark的shuffle介绍

Shuffle描述的是一个过程,表现多对多的依赖关系,是MapReduce两个阶段的纽带,是对数据重新分区的过程,将经过mapTask后,key值相同的数据重新划分到同一个partition中。

Shuffle实现分为HashShuffleManagerSortShuffleManager,也可以自定义。

shuffle简介:在 DAG 阶段以shuffle为界,划分 stage,
上游 stage做 map task,每个maptask将计算结果数据分成多份,每一份对应到下游stage 的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;
下游stage 做reduce task,每个reduce task通过网络拉取上游 stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。

Shuffle版本也随着spark不断进步和优化:
从2.0开始,把 Sort Based ShuffleTungsten-Sort全部统一到 Sort Based Shuffle中,Hash Based Shuffle退出历史舞台。
目前spark2.1,直接把SortBased Shuffle的writer分为三种:BypassMergeSortShuffleWriter,SortShuffleWriterUnsafeShuffle Writer

  1. BypassMergeSortShuffle Writer: Hash Shuffle 中的HashShuffle Writer 实现基本一致,唯一的区别在于,map端的多个输出文件会被汇总为一个文件。所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access某个partition 的所有数据。

  2. SortShuffleWriter:会对分区内进行排序或者全局排序。
    处理步骤:使用 PartitionedAppendOnlyMap或者 PartitionedPairBuffer 在内存中进行排序,排序的K是(partitionld, hash (key))这样一个元组。如果超过内存limit,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionld 时排厅,如宋 partona相同,再根据hash (key)进行比较排序。如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行merge sort,实现全局排序。
    最终读取的时候,从整个全局merge后的读取迭代器中读取的数据,就是按照partitionld 从小到大排序的数据,误取过在中使用丹仅州刀区力权,并且记录每个分区文件的起始写入位置,把这些位置数据写入索引文件中。

  3. UnsafeShuffleWriter:优化部分是在shuffle write进行序列化写入过程中,直接对二进制进行排序,减少了内存消耗,最终只是 partition 级别的排序。
    但是这种需要一定条件:对单条记录、shuffle数量有限制,而且不能带有聚合函数。排序实现:利用一个LongArray存储分区 ID、pageNumber、offset in page,并对这个数组排序。每次插入一条 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer
    为一个元素插入到 LongArray中。要想反向获得 record,
    定义的数据结构就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根据该指针可以拿到真实的record。

Spark的 Partitioner 分区器都有哪些?

Partitionershuffle过程中key重分区时的策略,即计算key决定k-v属于哪个分区,Transformation是宽依赖的算子时,父RDD和子RDD之间会进行shuffle操作,shuffle涉及到网络开销,由于父RDD和子RDD中的partition是多对多关系,所以容易造成partition中数据分配不均匀,导致数据的倾斜。

从概念上讲,分区器(Partitioner)定义了如何分布数据,决定一个RDD可以分成多少个分区,每个分区的数据量有多大,从而决定了每个Task将处理哪些数据。

一般来说分区器是针对key-value值的RDD,并通过对key的运算来划分分区。非key-value形式的RDD无法根据数据特征来进行分区,也就没有设置分区器,此时Spark会把数据均匀的分配到执行节点上。

1、HashPatitioner分区:对于给定的key值,计算其 hashcode除以分区数取余,最后这个值就是分区的id。可能会出现数据不均匀,因为同一 key值都在同一分区。

2、RangerPartitoner分区:将一定范围的数映射到某一分区内。

3、自定义分区器

第一步:确定边界,从所有的RDD当中随机抽取样本并进行排序,依次来确定分区的rangerBounds(边界)(这里因为RDD如果很大的话,没法按照计算总量,所以需要用到蓄水池抽样)。
第二步:计算key值在rangerBounds所处的范围,得出分区id。

额外:蓄水池抽样算法,不知总量的随机抽样。找个 leetcode敲下理解。


leetcode382.Linked List Random Node(算法随机采样)
Init : a reservoir with the size: k   //k is the sample number
fori=k+1 to N
	M=random(1, i);
	if(M<k)
	SWAP the Mth value and ith value
end for

Spark中的算子都有哪些

总的来说,spark分为三大类算子:

  • Transformation 转换算子:一个RDD 转换生成一个新的 RDD 的操作。 一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,不触发提交作业,需要等到有 Action 操作的时候才会真正触发运算。通过谱系图(lineage)这个DAG图进行操作,进行恢复。
函数作用
map()(常用)将函数应用于RDD中每个元素,将返回值构成新的RDD
flatMap()(常用)将函数引用于RDD中每个元素,将返回的迭代器的所有内容取出重新构成新的RDD
filter()(常用)filter()的参数为布尔函数,返回满足该布尔函数的元素构成新的RDD
distinct()去重
sample(withReplacement,[seed])对RDD采样,以及是否替换
union()生成一个包含两个RDD中所有元素的RDD,不去重。类似并集
intersection()将两个RDD共同的元素构成新的RDD,去重。类似交集
substract()在左边RDD中移除右边RDD中的内容,类似左连接
cartesian()与另一个RDD笛卡尔积
  • Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业; Action 算子会触发 Spark 提交作业(Job),对RDD 进行实际计算,并将求得的结果返回到驱动程序中或者写入到外部存储系统中。
函数作用
collect()(常用)返回RDD中全部元素
count()(常用)返回RDD中元素个数
countByValue()(常用)返回各元素在RDD中出现的次数,返回类型为元组的集合
take(num)(常用)返回RDD中num个元素
top(num)返回RDD中最前面的num个元素
takeOrder(num)(ordering)从RDD中按照提供的顺序返回最前面的num个元素
takeSample(withReplacement,num,[seed])从RDD中返回任意些元素
reduce(func)(常用)并行整合RDD中所有数据,类似sum
fold(zero)(func)和reduce()一样,但是需要提供初始值
aggregate(zeroValue)(seq0p, comb0p)和reduce()相似,但是通常返回不同类型的函数
foreach(func)(常用)遍历RDD中每个元素使用传入的函数
  • Controller 控制操作:Spark中控制算子也是懒执行的,当我们每次调用行动操作时,都会重算RDD的所有依赖,如果多次行动操作使用同一个RDD,就会导致大量的重复运算。为避免这种现象,可以对数据进行持久化,也就是存储该RDD,保存在各自的分区中。

控制算子有三种,cache,persist(RDD 持久化原理)checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition
cachepersist都用于将一个RDD进行缓存,这样在以后使用的过程中就不需要重复计算 。cache 默认缓存级别是MEMORY_ONLY
checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

级别使用的空间CPU时间是否在内存是否在磁盘备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果数据在内存放不下,溢写到磁盘上
MEMORY_AND_DISK_SER部分部分如果数据在内存放不下,溢写到磁盘上,在内存中放序列化后的数据
DISK_ONLY

RDD工作流📌

👉输入:定义RDD,在 Spark 程序运行中,数据从外部数据空间(例如, HDFSScala 集合或数据)输入到 Spark,数据就进入了 Spark 运行时数据空间,会转化为 Spark 数据块,形成RDD

👉运行:在 Spark 数据输入形成 RDD 后,进行相应的操作,通过行动(Action)算子,触发 Spark 提交作业,提交作业。如果数据需要复用,可以通过 Cache 算子,将数据缓存到内存。

👉输出:程序运行结束数据会输出 Spark 运行时空间,存储到分布式存储中(如saveAsTextFile 输出到 HDFS)或Scala数据或集合中( collect 输出到 Scala 集合,count 返回 Scala Int 型数据)。

Spark运行模式(资源调度框架的使用,了解)📌

  1. Local模式: 是用单机的多线程或者多进程模拟Spark分布式计算,通常用来 单机调试,验证开发出来的应用程序逻辑上有没有问题。

该方式下,在程序执行过程中,只会生成一个SparkSubmit进程。这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色。其中N代表可以使用N个线程,每个线程拥有一个core。如果不指定N,则默认是1个线程(该线程有1个core)。如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.

  1. 分布式部署模式:

👉Standalone模式: 独立模式,自带完整的模式 , 该方式适用master和worker进程来模拟集群形式,不需要启动hadoop。在架构上和 MapReduce1比较,具有一致性,都是由Master、worker组成(只是名称不一样),资源抽象为粗粒式的slot,多少slot多少task。

👉Spark on YARN:因为现在企业用到 hadoop是基于YARN 的,为了融合Spark,该方式是使用YARN来做集群管理,为Spark应用分配资源 , 进行统一资源管理。有两种方式, YARN-client(用于交互,client当中运行sparkContext进程进行任务分发监控),YARN-cluster 任务的分发和监控放在 MRAPPmaster当中。

👉 Spark on mesosYARNmesos都是统一资源管理和调度系统。mesos支持粗粒式和细粒式调度,前者节省了资源调度时间的开销,后者是不存在资源的浪费,但是资源调度延迟较大。

讲一下Spark 的运行架构

👉Driver:这是监督 Spark 作业或程序端到端执行的主程序。 它与集群的资源管理器进行资源的协商,并将程序编排成尽可能小的数据本地并行编程单元。

👉Executors:在任何 Spark 任务中,可以有一个或多个 executor,即执行由 drive 委派的较小任务的进程。 executor 处理数据,最好是本地节点的,并将结果存储在内存和 / 或磁盘中。

👉MasterApache Spark 已经在主 / 从架构中实现,因此 master 指的是执行驱动程序的集群节点。

👉Slave(已经改名为 Worker):在分布式集群模式下,slave 是指运行执行程序的节点,因此在群集中可以有多个从机(而且大部分情况都是这样)。

👉Job:这是对任何一组数据执行的操作的集合。 典型的 word count job 涉及从任意来源读取文本文件,然后分离 (splitting) 并聚合 (aggregating) 这些字。

👉DAGSpark 引擎中的任何 Spark 工作都由 DAG 的操作代表。 DAG 按顺序表示 Spark 操作的逻辑执行。 在发生故障的情况下由 DAG 重新计算可能的血统(lineage)。

👉Task:一个 job 可以拆分成更小的单位,以被称为 task 的孤立任务进行操作。 每个 task executor 在一个数据分区上执行。

👉StagesSpark 作业可以按逻辑划分为多个 stage,每个stage代表一组具有相同的洗牌(shuffle)依赖关系的任务,即发生数据洗牌 (shuffle) 的任务。 在洗牌映射(shuffle map)阶段,任务结果被输入到下一个阶段;在结果阶段,task 计算 action,开始对 Spark job 的赋值,例如take()foreach()collect()

(* 注:shuffle 是划分 DAGstage 的标识, 同时影响 Spark 执行速度的关键步骤。RDDTransformation 函数中, 又分为窄依赖 (narrow dependency) 和宽依赖 (wide dependency) 的操作. 窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作. 宽依赖会发生shuffle操作. 窄依赖是子RDD的各个分片 (partition) 不依赖于其他分片, 能够独立计算得到结果, 宽依赖指子 RDD 的各个分片会依赖于父 RDD 的多个分片, 所以会造成父RDD的各个分片在集群中重新分片)

一个 Spark job 怎么执行的?

  • Spark job 可以包含一系列对一组数据执行的操作。

不管 Spark job 的大小,Spark job 都需要 SparkContext 来执行。 在之前使用 REPL 的例子中,人们会注意到使用了一个名为 sc 的环境变量,这就是在 REPL 环境中如何访问一个 SparkContext。

👉 SparkContext 创建一个由不同 transformation 组成的 operator graph,一旦在某个 transformation 之上执行 action,这个 group 会被提交给 DAGScheduler。 根据 RDD 的性质或由窄变换(narrow transformation)或宽变换(wide transformation)(需要 shuffle 操作的变换)产生的结果。DAGScheduler 会创建 stage。
👉DAGScheduler 以这样的方式拆分 DAG:每个 stage 都由在通用洗牌边界(common shuffle boundaries,实在不知道怎么翻译合适)下的相同洗牌依赖(shuffle dependency)组成。 此外,stage 可以是洗牌映射(shuffle map)阶段,在这种情况下,其任务结果将是另一个阶段的输入;也可以是结果阶段(result stage),在这种情况下,其任务直接计算启动作业的 action,例如 count()。
👉 然后 stage 被 DAGScheduler 作为任务集(TaskSets)提交给 TaskScheduler。 TaskScheduler 通过集群管理器(YARN,Mesos 和 Spark standalone)调度 TaskSet 并监视其执行情况。 如果任何任务失败,则重新运行,最后将结果发送到 DAGScheduler。 如果结果输出文件丢失,那么 DAGScheduler 将重新提交这些阶段给 TaskScheduler 以重新运行。
👉然后在满足资源和数据局部性约束的指定执行器(executor)(slave(worker)节点上运行的 JVM)上安排 task。 每个 executor 还可以分配多个 task。

一个spark程序的执行流程

1、启动:用户程序启动SparkContext,是程序的总入口,初始化过程中启动DAGScheduler作业调度和 TaskScheduler任务调度。
2、生成作业:DAGScheduler:根据shuffleDependency将作业划分为不同的stage,根据 RDD之间的依赖关系,宽依赖和窄依赖,划分原则就是遇见窄依赖就放进当前stage,遇到宽依赖则断开。(相当于shuffle是前后的stage分界线)每一个stage里面都会划分一个taskset,也就是数据集,而DAGSchedule的下一个任务就是将这个TaskSet传给TaskSchedule(在最后一个 stage划分结束,就会触发作业的提交)。
3、提交任务集: TaskScheduler:分配 Task到哪一个executor上去执行,SchedulerBackend配合TaskScheduler 完成具体任务的资源分配。
4、任务执行:Executor:实际任务的运行最终都 Execter 类来执行,对每个任务创建一个TaskRunner类,交给线程池去实现。


  1. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGSchedulerTaskScheduler
  2. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
  3. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。
  4. Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
  5. 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
  6. DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。
  7. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
  8. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

spark的stage是如何划分的

stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage.

Spark的 RDD容错机制。

两个方法:利用“血缘(Lineage)容错”和检查点(checkpoint)机制。

  1. “血缘”容错:利用依赖关系进行数据恢复,在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。
    在窄依赖中,在子RDD 的分区丢失、重算父RDD分区时,父RDD相应分区计算所得到的数据都是子RDD分区的数据(一对一),并不存在冗余计算。
    而宽依赖需要父RDD的所有分区都存在,重算花销大。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的(其他子分区也会获得父亲重新计算的数据),会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点。
  2. 检查点机制:在宽依赖当中,如果 Lineage过长,重算开销会很大,通过设定检查点,在依赖关系中,对关系中间预算的结果进行数据冗余条份,所以数据恢复时,就可以l从检查点开始进行重新计算Lineage,减少开销。

checkpoint 检查点机制?

应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。

原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint 首先会调用 SparkContext setCheckPointDIR() 方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  1. 控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
  2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

Spark为什么快,Spark SQL 一定比 Hive 快吗

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

  1. 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。
  2. 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。
  3. JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。

RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列 (Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。

RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache() 和 persist() 方法即可。cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用 persist() 的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用 unpersist() 方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist() 时传入对应的 StorageLevel 即可。

checkpoint 和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系) 是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低

RDD懒加载是什么意思

Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Acion 操作的时候才会真正触发运算,这也就是懒加载.

spark 解决了 hadoop 的哪些问题(spark VS MR)?

  1. MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;

    Spark:Spark 采用 RDD 计算模型,简单容易上手。

  2. MR:只提供 map 和 reduce 两个操作,表达能力欠缺;

    Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;

  3. MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;

    Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;

  4. MR:中间结果存放在 hdfs 中;

    Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;

  5. MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;

    Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。

  6. MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;

    Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

spark有哪几种join

Spark 中和 join 相关的算子有这几个joinfullOuterJoinleftOuterJoinrightOuterJoin

  • join
    join函数会输出两个RDD中key相同的所有项,并将它们的value联结起来,它联结的key要求在两个表中都存在,类似于SQL中的INNER JOIN。但它不满足交换律,a.join(b)与b.join(a)的结果不完全相同,值插入的顺序与调用关系有关。

  • leftOuterJoin
    leftOuterJoin会保留对象的所有key,而用None填充在参数RDD other中缺失的值,因此调用顺序会使结果完全不同。如下面展示的结果,

  • rightOuterJoin
    rightOuterJoin与leftOuterJoin基本一致,区别在于它的结果保留的是参数other这个RDD中所有的key。

  • fullOuterJoin
    fullOuterJoin会保留两个RDD中所有的key,因此所有的值列都有可能出现缺失的情况,所有的值列都会转为Some对象。

hadoop 和 spark 的相同点和不同点?

Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。

但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。

Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量 “很大” 的情景,而 Spark 则适用于数据量不是很大的情景。

  1. 一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会 “很大”,因此可以优先考虑使用 Spark。
  2. 业务通常认为 Spark 更适用于机器学习之类的 “迭代式” 应用,80GB 的压缩数据(解压后超过 200GB),10 个节点的集群规模,跑类似 “sum+group-by” 的应用,MapReduce 花了 5 分钟,而 spark 只需要 2 分钟。

如何将spark-sql的Row转成Java对象?

Dataset转POJO

  1. 将查询出的结果转为RDD
  2. 将RDD创建为DataFrame,并传入schema参数
  3. 调用as方法,将Dataset转为相应的POJO Dataset
  4. 调用collectAsList()方法
SparkSession spark = CloudUtils.getSparkSession();

// 查询原始数据
Dataset<Row> student = spark.sql("select * from `event`.`student`");
// 生成schema
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);

// 转换查询结果为POJO List
List<Student> students = spark.createDataFrame(student.toJavaRDD(), schema)
        .as(Encoders.bean(Student.class))
        .collectAsList();
System.out.println(students);


Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码

// 查出数据并转为json集合
List<String> jsonList = spark.sql("select * from `event`.`user`")
        .toJSON()
        .collectAsList();
// 将json转为pojo,这里使用的是FastJSON        
List<User> users = jsonList.stream()
        .map(jsonString -> JSON.parseObject(jsonString, User.class))
        .collect(Collectors.toList());
System.out.println(users);

POJO转Dataset

// 获取users列表
  List<User> users = createUsers();
  // 使用createDataFrame转为dataset
  Dataset<Row> ds = spark.createDataFrame(users, User.class);
  // 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索
  String[] columns = ds.columns();
  String[] newColumns = Arrays.stream(columns)
          .map(column -> camelToUnderline(column))
          .toArray(String[]::new);
  // 转为新的df(重命名后的)
  ds.toDF(newColumns);
  ds.show();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28627.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

常用的框架技术-08 ElasticSearch分布式、高扩展、高实时的搜索与数据分析引擎

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录1.ElasticSearch 概述1.1 ElasticSearch介绍1.2 全文搜索引擎1.3 lucene介绍1.4 倒排索引1.5 elasticsearch、solr对比2.ElasticSearch安装2.1 下载软件2.2 windows环…

【web渗透思路】框架敏感信息泄露(特点、目录、配置)

目录 一、挖掘思路 1、方法&#xff1a; 二、框架之信息泄露 1、Webpack 1.1、简述 1.2、.js.map文件泄露 1.3、源码审计 2、Spring boot 1.1、简述 1.2、利用 1.3、框架识别 &#xff08;基本分析方法都是一样&#xff0c;这里就举2个框架关于信息泄露方面的&#x…

Mybatis分页功能

1. 功能分析 如图所示分页功能&#xff0c;包括上一页、下一页、中间显示的当前页前后页码、全部页码以及跳转到XX页。手写的话实现起来很难&#xff0c;Mybatis给我们提供了插件&#xff0c;所提供的方法&#xff0c;直接包含了上述分页的相关数据。 2. 分页插件的使用及其相关…

虚拟环境下把python代码打包成exe(小白教程)

本教程适用于小白&#xff0c;本人也是小白&#xff0c;不妥之处还请包涵。 1、系统环境下安装 virtualenv 可以理解为 直接打开 系统的cmd安装 pip32 install virtualenv我之所以用pip32因为我电脑上装了两个版本的python 一个是32位一个是64位&#xff0c;如果你电脑上只有一…

为什么选择快速应用开发

如今&#xff0c;企业想要持续蓬勃发展&#xff0c;就需要具备快速满足客户期望的能力。无论是十几年历史的重要市场占有者推出新的APP&#xff0c;还是在疫情期间从线下转向线上电商营销&#xff0c;企业都需要主动适应市场。随着为客户提供新的服务方式&#xff0c;员工也需要…

如何轻松部署快解析 + WAMP

快解析是由北京金万维公司自主研发的域名解析工具&#xff0c;服务器端简单&#xff0c;通过快速部署就能实现在任何地域、任何时间、任何网络环境下快速访问到局域网内搭建的各类办公系统和各种应用。以发布网站服务为例&#xff0c;给大家演示下如何通过快解析实现外网访问WA…

一文带你看透短信验证码

短信验证码应用于我们生活、工作的方方面面&#xff0c;比如注册登录账号、支付订单、修改密码等等。验证码短信主要出于安全的考虑&#xff0c;防止应用/网站被恶意注册&#xff0c;恶意攻击&#xff0c;对于网站、APP而言&#xff0c;大量的无效注册&#xff0c;重复注册&…

Java8中的Stream流

定义 什么是Stream流&#xff0c;Java doc中是这样写的 A sequence of elements supporting sequential and parallel aggregate operations 翻译一下就是一个支持顺序和并行聚合操作的元素序列。 可以把它理解成一个迭代器&#xff0c;但是只能遍历一次&#xff0c;就像是流水…

Nodejs核心模块之Events

核心模块之Events 通过EventEmitter类实现事件统一管理 events与EventEmitter node.js是基于事件驱动的异步操作架构&#xff0c;内置events模块events模块提供了EventEmitter类node.js中很多内置核心模块集成EventEmitter EventEmitter常见Api on 添加实现被触发时调用的…

学生静态HTML个人博客主页【Web大学生网页作业成品】HTML+CSS+JavaScript

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

【没用的小知识又增加了--CCS】

1.CCS中导入工程时提示overlaps the location of another project问题 ​ ​ 工作区要选择最外面的文件夹 ​ 2. error #131: expected a "{" error: #130: expected a "{"_kuyoungest的博客-CSDN博客如果该提示定位到文件开头的语句&#xff0c;则应在…

【Spring(四)】Spring基于注解的配置方式

有关Spring的所有文章都收录于我的专栏&#xff1a;&#x1f449;Spring&#x1f448; 目录 一、前言 二、基于注解需要的依赖 三、通过注解来配置Bean 四、注解配置Bean再补充 五、基于注解的自动装配 六、泛型依赖注入 相关文章 【Spring&#xff08;一&#xff09;】如何获取…

企业知识管理难?选对系统可解决90%的问题

编者按&#xff1a;知识管理是企业加强竞争优势和核心竞争力的保证。本文分析了企业知识管理中遇到的困难&#xff0c;并进一步提出了解决方案——天翎KMS群晖云盘一体机。 关键词&#xff1a;在线预览&#xff0c;在线编辑&#xff0c;权限管理&#xff0c;水印设置&#xff…

macOS Ventura13.0.1解决office缺少“宋体”等问题。安装微软雅黑、宋体等字体。

最近在弄项目验收文档&#xff0c;文档格式要求宋体&#xff0c;用微软的Word打开文件保存时经常提示&#xff0c;系统不存在宋体字体&#xff0c;查了下是是Mac系统本身不存在该字体导致的&#xff0c;下载该字体&#xff0c;然后通过字体册安装就行。 我打包成压缩包了具体有…

【易错小点记录】坑人的for循环与逻辑或

目录 1.题目 1.1.以下for循环的执行次数是&#xff08; &#xff09; 1.1.1.题目分析 1.1.2.题目答案 1.2.下列main()函数执行后的结果为&#xff08;&#xff09; 1.2.题目分析 1.3.题目答案 2.题目 2.1.下面程序输出是什么&#xff1f;&#xff08; &#xff09; 2.…

非线性海洋捕食者算法(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

树表的查找

二叉排序树 二叉排序树&#xff08;BST&#xff09;又称二叉搜索树&#xff0c;其满足以下性质&#xff1a; &#xff08;1&#xff09;若根节点的左子树非空&#xff0c;则左子树上的所有节点关键字均小于根节点的关键字。 &#xff08;2&#xff09;若根节点的右子树非空&a…

补充(二)古典密码两张思维导图速通

目录 目录 古典密码思维导图 古典密码分析思维导图 唯密文分析古典密码 单表代替密码 棋盘密码 曾公密码 置换密码的代表&#xff1a;斯巴达人的密码棒 古典密码思维导图 古典密码分析思维导图 唯密文分析古典密码 最困难的分析条件通常需要用到英文字母的频率分析和反…

【微服务】SpringCloud中Ribbon集成Eureka实现负载均衡

&#x1f496; Spring家族及微服务系列文章 ✨【微服务】SpringCloud轮询拉取注册表及服务发现源码解析 ✨【微服务】SpringCloud微服务续约源码解析 ✨【微服务】SpringCloud微服务注册源码解析 ✨【微服务】Nacos2.x服务发现&#xff1f;RPC调用&#xff1f;重试机制&#xf…

Maven打Jar包,启动报NoClassDefFoundError错误

今天准备将游戏服务器的压测机器人打包分发给其他人来运行对服务器进行压力测试。打成的jar包发现运行报错了。找了半天才找到最终原因。下面是原因和一些分析的情况。 原因 java -jar .\robot.jar发现错误如下 看到这个错误就知道jvm找不到对应的类。但是为什么找不到对应的…