Spark

news2025/1/14 18:16:47

1 Spark作业提交流程

在这里插入图片描述

2 Spark提交作业参数

  1)在提交任务时的几个重要参数

  executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个

  num-executors —— 启动executors的数量,默认为2

  executor-memory —— executor内存大小,默认1G

  driver-cores —— driver使用内核数,默认为1

  driver-memory —— driver内存大小,默认512M

 2)边给一个提交任务的样式

spark-submit \
  --master local[5]  \
  --driver-cores 2   \
  --driver-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --executor-memory 8g \
  --class PackageName.ClassName XXXX.jar \
  --name "Spark Job Name" \
  InputPath      \
  OutputPath

3 RDD五大属性

在这里插入图片描述

4 算子

4.1 Transformation

1)单Value

(1)map
(2)mapPartitions
(3)mapPartitionsWithIndex
(4)flatMap
(5)glom
(6)groupBy
(7)filter
(8)sample
(9)distinct
(10)coalesce
(11)repartition
(12)sortBy
(13)pipe

2)双vlaue

(1)intersection
(2)union
(3)subtract
(4)zip

3)Key-Value

(1)partitionBy
(2)reduceByKey
(3)groupByKey
(4)aggregateByKey
(5)foldByKey
(6)combineByKey
(7)sortByKey
(8)mapValues
(9)join
(10)cogroup

4.2 Action

(1)reduce
(2)collect
(3)count
(4)first
(5)take
(6)takeOrdered
(7)aggregate
(8)fold
(9)countByKey
(10)save
(11)foreach
(12)foreachPartition

4.3 map和mapPartitions区别

  1)map:每次处理一条数据

  2)mapPartitions:每次处理一个分区数据

4.4 Repartition和Coalesce区别

  1)关系:

  两者都是用来改变RDD的partition数量的

  repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)

  2)区别:

  repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle

  一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce

4.5 reduceByKey与groupByKey的区别

  reduceByKey:具有预聚合操作

  groupByKey:没有预聚合

  在不影响业务逻辑的前提下,优先采用reduceByKey。

4.6 reduceByKey、foldByKey、aggregateByKey、combineByKey区别

ReduceByKey没有初始值分区内和分区间逻辑相同
foldByKey有初始值分区内和分区间逻辑相同
aggregateByKey有初始值分区内和分区间逻辑可以不同
combineByKey初始值可以变化结构分区内和分区间逻辑不同

5 Spark任务的划分

  (1)Application:初始化一个SparkContext即生成一个Application;

  (2)Job:一个Action算子就会生成一个Job;

  (3)Stage:Stage等于宽依赖的个数加1;

  (4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

6 Cache

  spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。调用 cache()和 persist()方法即可。

  cache()和 persist()的区别在于, cache()是 persist()的一种简化方式, cache()的底层就是调用 persist()的无参版本persist(MEMORY_ONLY), 将数据持久化到内存中。如果需要从内存中清除缓存, 可以使用 unpersist()方法。 RDD 持久化是可以手动选择不同的策略的。 在调用 persist()时传入对应的 StorageLevel 即可。

  DataFrame的cache默认采用 MEMORY_AND_DISK

  RDD 的cache默认方式采用MEMORY_ONLY

  缓存:(1)dataFrame.cache

     (2)sparkSession.catalog.cacheTable(“tableName”)

  释放缓存:(1)dataFrame.unpersist

       (2)sparkSession.catalog.uncacheTable(“tableName”)

7 Cache和CheckPoint

7.1 CheckPoint

  Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法, 设置一个容错的文件系统的目录, 比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后, 会启动一个单独的 job, 来将checkpoint 过的 RDD 数据写入之前设置的文件系统, 进行高可用、 容错的类持久化操作。

  检查点机制是我们在 spark streaming 中用来保障容错性的主要机制, 它可以使 spark streaming 阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。 具体来说基于以下两个目的服务:

  1. 控制发生失败时需要重算的状态数。 Spark streaming 可以通过转化图的谱系图来重算状态, 检查点机制则可以控制需要在转化图中回溯多远。

  2. 提供驱动器程序容错。 如果流计算应用中的驱动器程序崩溃了, 你可以重启驱动器程序并让驱动器程序从检查点恢复, 这样 spark streaming 就可以读取之前运行的程序处理数据的进度, 并从那里继续。

7.2 Cache和CheckPoint区别

  1)Cache只是将数据保存在 BlockManager 中, 但是 RDD 的lineage(血缘关系, 依赖关系)是不变的。但是 checkpoint 执行完之后, rdd 已经没有之前所谓的依赖 rdd 了, 而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

  2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

  3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

8 累加器

在这里插入图片描述

9 广播变量

在这里插入图片描述

10 RDD、DataFrame、DataSet三者的转换

在这里插入图片描述

11 Spark Streaming消费Kafka数据

11.1 Spark Streaming第一次运行不丢失数据

  kafka参数 auto.offset.reset 设置成`earliest ``从最初始偏移量开始消费数据

11.2 Spark Streaming精准一次消费Kafka

  手动维护偏移量,处理完业务数据后,再进行提交偏移量操作

  极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事物保证精准一次消费

11.3 Spark Streaming控制每秒消费数据的速度

  通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数

11.4 Spark Streaming背压机制

  把spark.streaming.backpressure.enabled 参数设置为ture,开启背压机制后Spark Streaming会根据延迟动态去kafka消费数据,上限由spark.streaming.kafka.maxRatePerPartition参数控制,所以两个参数一般会一起使用。

11.5 Spark Streaming消费Kafka数据默认分区个数

  Spark Streaming默认分区个数与所对接的kafka topic分区个数一致,Spark Streaming里一般不会使用repartition算子增大分区,因为repartition会进行shuffle增加耗时。

11.6 SparkStreaming有哪几种方式消费Kafka中的数据?它们之间的区别是什么?

一、基于Receiver的方式

  这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。

  然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

二、基于Direct的方式

  这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

优点如下:

  简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

  高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

  一次且仅一次的事务机制

三、对比:

  基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

  基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

  在实际生产环境中大都用Direct方式

12 Spark Streaming优雅关闭

  把spark.streaming.stopGracefullyOnShutdown参数设置成ture,Spark会在JVM关闭时正常关闭StreamingContext,而不是立马关闭。

13 Spark性能调优

  Spark性能调优

14 为什么区分宽窄依赖

   对于窄依赖:

    窄依赖的多个分区可以并行计算

    窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

  对于宽依赖:

    划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

  DAG 划分为 Stage 的算法:

    核心算法: 回溯算法

    从后往前回溯/反向解析, 遇到窄依赖加入本 Stage, 遇见宽依赖进行 Stage 切分。Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推, 首先会为最后一个 RDD 创建一个 Stage, 然后继续倒推, 如果发现对某个 RDD 是宽依赖, 那么就会将宽依赖的那个 RDD 创建一个新的 Stage, 那个 RDD 就是新的 Stage的最后一个 RDD。 然后依次类推, 继续倒推, 根据窄依赖或者宽依赖进行 Stage的划分, 直到所有的 RDD 全部遍历完成为止。

15 Spark 主备切换机制原理

  Master 实际上可以配置两个, Spark 原生的 standalone 模式是支持 Master主备切换的。 当 Active Master 节点挂掉以后, 我们可以将 Standby Master 切换为 Active Master。

  Spark Master 主备切换可以基于两种机制, 一种是基于文件系统的, 一种是基于 ZooKeeper 的。

  基于文件系统的主备切换机制, 需要在 Active Master 挂掉之后手动切换到Standby Master 上;

  而基于 Zookeeper 的主备切换机制, 可以实现自动切换 Master。

注: Master 切换需要注意 2 点:

  1、 在 Master 切换的过程中, 所有的已经在运行的程序皆正常运行! 因为 SparkApplication 在运行前就已经通过Cluster Manager 获得了计算资源, 所以在运行时 Job 本身的 调度和处理和 Master 是没有任何关系。

  2、 在 Master 的切换过程中唯一的影响是不能提交新的 Job: 一方面不能够提交新的应用程序给集群, 因为只有 Active Master 才能接受新的程序的提交请求; 另外一方面, 已经运行的程序中也不能够因 Action 操作触发新的 Job 的提交请求。

Tips: Spark Master 使用 Zookeeper 进行 HA, 有哪些源数据保存到Zookeeper 里面?

  spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在zookeeper 中保存的位置, 包括 Worker, Driver 和 Application 以及Executors。 standby 节点要从 zk 中, 获得元数据信息, 恢复集群运行状态,才能对外继续提供服务, 作业提交资源申请等, 在恢复前是不能接受请求的。

16 如何保证数据不丢失?

  flume 那边采用的 channel 是将数据落地到磁盘中, 保证数据源端安全性;

  sparkStreaming 通过拉模式整合的时候, 使用了 FlumeUtils 这样一个类,该类是需要依赖一个额外的 jar 包( spark-streaming-flume_2.10)

  要想保证数据不丢失, 数据的准确性, 可以在构建 StreamingConext 的时候, 利用 StreamingContext.getOrCreate( checkpoint, creatingFunc:() => StreamingContext) 来创建一个 StreamingContext

   传入的第一个参数是 checkpoint 的存放目录, 第二参数是生成StreamingContext 对象的用户自定义函数。

   如果 checkpoint 的存放目录存在, 则从这个目录中生成 StreamingContext 对象; 如果不存在, 才会调用第二个函数来生成新的 StreamingContext 对象。 在 creatingFunc函数中, 除了生成一个新的 StreamingContext 操作, 还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint 功能, 最后再返回StreamingContext 对象。这样, 在StreamingContext.getOrCreate 之后, 就可以直接调用 start()函数来启动( 或者是从中断点继续运行) 流式应用了。如果有其他在启动或继续运行都要做的工作, 可以在 start()调用前执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/79227.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【OpenCV学习】第9课:形态学操作的应用-提取水平线丶垂直线

仅自学做笔记用,后续有错误会更改 理论 图像在进行形态学操作的时候, 可以通过自定义的结构元素实现结构元素对输入图像的一些对象敏感丶对另外一些对象不敏感, 这样就会让敏感的对象改变而不敏感的对象保留输出。 通过使用两个最基本的形态学操作 - 膨…

华为云服务-运维篇-弹性负载均衡

文章目录一、什么是负载均衡二、我们为什么需要负载均衡1、生活中需要它的类似场景2、生活场景中协调者(负载均衡)作用3、协调者(负载均衡)引入后的变化三、华为云平台-如何做负载均衡弹性负载均衡-ELB四、总结一、什么是负载均衡 负载均衡构建在原有网…

【数据挖掘】薪酬分段对应工作经验/学历画柱状图【招聘网站的职位招聘数据预处理】

文章目录一.需求背景1.1 需求分析二.数据处理(对给定职位,汇总薪酬分段对应工作经验要求数据,画柱状图;)2.1 事前准备2,1 处理开始三.数据处理(对给定职位,汇总薪酬分段对应学历要求数据,画柱状图;)四.附源…

吉林大学 超星慕课 高级语言程序设计 实验08 结构化程序设计(2022级)

本人能力有限,发出只为帮助有需要的人。 建议同学们自己写完后再进行讨论。 其中的代码均没能在oj上进行测试,因此可能有误,请谅解。 除此以外部分题目设计深度优先搜索,因此可以分别用递归和堆栈实现,堆栈方法为了…

JavaScript进阶教程——异步编程、封装Ajax

异步编程 什么是同步与异步: 同步:一件事没做完,只能等待,完成之后再去做另一件事 异步: 两件事可以同时进行 前端开发中最常见的两种异步情况: ajax: 向后台请求数据计时器: setInterval se…

Python学习基础笔记四十一——sys模块

sys模块是与Python解释器交互的一个接口。 sys.argv 命令行参数List,第一个元素是程序本身路径 sys.exit(n) 退出程序,正常退出时exit(0),错误退出sys.exit(1) sys.version 获取Python解释程序的版本信息 sys.path 返…

ARM Cortex M3处理器概述

Cortex-M3概述 2004年ARM发布作为新型Corex处理器内核系列首款的Cortex-M3处理器。 STM32系列基于专为高性能、低成本、低功耗的嵌入式应用专门设计的ARM Cortex-M内核。 STM32命名规则 STMF103xx系统结构 1.使用高性能的ARM Cortex-M3 32位RISC内核 2.工作频率为72MHZ 3.内…

shell脚本监控文件夹文件实现自动上传数据到hive表

sh createtb.sh “tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2” 数据库名:observation (脚本里写死了) 表名:tablename 指定名:field1,field2,field3,field4,field5,field6,f…

分别使用Alpine、Docker制作jdk镜像

目录 制作 jdk 1.0 镜像 ——Docker 1.创建文件夹上传jdk的安装包,和在同级目录下编写Dockerfile文件 2.编写 Dockerfile 文件 3.执行Dockerfile文件,初次依赖镜像的时候会下载相应镜像 优化制作jdk镜像(缩小内存大小)——使用alpine …

【致敬世界杯】球迷(我)和足球的故事

目录 一、第一次接触足球 二、回味无穷的2018世界杯 三、致敬世界杯 3.1 源代码 3.2 思路 3.3 关于图片 一、第一次接触足球 踢足球是一项优秀的运动,它可以锻炼身体,增强团队合作精神,并为人们带来快乐和满足感。回忆起小学时候第一次…

OpenCV和RTSP的综合研究

一、RTSP是什么?用来干什么? RTSP(Real Time Streaming Protocol),RFC2326,实时流传输协议,是TCP/IP协议体系中的一个应用层协议,由哥伦比亚大学、网景和RealNetworks公司提交的IET…

四旋翼无人机学习第14节--PCB Editor简单绘制封装-1

文章目录1 前言1.1 网络获取1.2 封装软件生成1.3 立创商城封装转化1 前言 在之前的博客中,我们绘制了封装所需的焊盘,有了焊盘我们就可以绘制封装啦。当然封装的获取有很多途径,下面我来总结一下。 1.1 网络获取 (有需要的可以下载哦&…

华为eNSP模拟器配置MSTP多实例生成树

传统的stp、rstp有其必然的缺陷 1.统一局域网内所有的vlan共享一个生成树,无法在vlan间实现数据流量的负载均衡。 2.链路利用率低,被阻塞的冗余链路不承载任何流量,造成了带宽的浪费,还可能造成部分vlan报文无法转发。MSTP在它们…

计算机毕业设计springboot+vue基本微信小程序的学习资料共享小程序

项目介绍 前台为用户使用的,包括下面一些功能: ① 资料发布:用户可以将想要共享的资料发布到小程序,供他人购买。 ②搜索 :分为按名称搜索和分类搜索,用户可选择其中一种方式,检索自己所需要的资料。 ③ 查看资料详情:用户可以…

学委必备小工具——筛选未提交人数【python小工具】

问题描述 作为一个学委,通常的任务就是收取班级作业,然后向老师报告当前未交人员的名单 JS版本:实现以一个表格数据查询另一个表格【JS】 之前我已经尝试通过用JS实现了,本质上差别其实也不是很大,只是对于JS来说&…

Java基础之《netty(11)—netty模型》

一、简单说明 1、工作原理示意图 netty主要基于主从Reactors多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。 2、说明 (1)BossGroup线程维护selector,只关注Accept事件。 (2)当接收到…

[附源码]Node.js计算机毕业设计出版社样书申请管理系统Express

项目运行 环境配置: Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术: Express框架 Node.js Vue 等等组成,B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境:最好是Nodejs最新版,我…

第十一章 特征选择与稀疏学习

11.1 子集搜索与评价 我们将属性称为特征,对当前学习任务有用的属性称为相关特征、没什么用的属性称为无关特征。还有一类特征称为冗余特征,它们所包含的信息能从其他特征中推演出来,冗余特征在很多时候不起作用,去除它们会减轻学…

redis之如何应对并发访问问题

写在前面 本文一起看下Redis的并发访问控制。 1:单线程的Redis为什么会有并发问题 我们知道,Redis是单线程的,为什么还是会有并发问题呢?没错,如果是单命令操作的话肯定没有并发问题,但考虑事务的场景&a…

nodejs+vue人事管理系统30n9o

开发语言:nodejs 框架:Express 数据库:mysql 数据库工具:Navicat11 开发软件:VS code 浏览器:谷歌浏览器 目录 1 绪论 1 1.1课题背景 1 1.2课题研究现状 1 1.3初步设计方法与实施方案 2 1.4本文研究内容 …