Spark核心名词解释与编程

news2025/4/8 16:22:59

Spark核心概念

名词解释

1)ClusterManager:在Standalone(上述安装的模式,也就是依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager(国内spark主要基于yarn集群运行,欧美主要基于mesos来运行)。

2)Application:Spark的应用程序,包含一个Driver program和若干Executor。

3)SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。

4)SparkContext:Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor。

5)Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。

6)Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。

7)Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。

8)RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

9)RDD是弹性式分布式数据集,理解从3个方面去说:弹性、数据集、分布式。是Spark的第一代的编程模型。

10)DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Taskset放到TaskScheduler中。DAGScheduler就是Spark的大脑,中枢神经。

11)TaskScheduler:将任务(Task)分发给Executor执行。

12)Stage:一个Spark作业一般包含一到多个Stage。

13)Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。task的个数由rdd的partition分区决定,spark是一个分布式计算程序,所以一个大的计算任务,就会被拆分成多个小的部分,同时进行计算。一个partition对应一个task任务。

14)Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

15)Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

Spark官网组件说明

官网组件说明如图-18所示:

图-18 Spark组件通信架构图

Spark应用程序作为集群上的独立进程集运行,由主程序(称为驱动程序)中的SparkContext对象协调。

具体来说,要在集群上运行,SparkContext可以连接到几种类型的集群管理器(Spark自己的独立集群管理器、Mesos或YARN),这些管理器可以跨应用程序分配资源。一旦连接,Spark将获取集群中节点上的执行器,这些执行器是为应用程序运行计算和存储数据的进程。接下来,它将应用程序代码(由传递给SparkContext的JAR或Python文件定义)发送给执行器。最后,SparkContext将任务发送给执行器以运行。

Spark编程体验

项目依赖管理

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.12.10</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.2.1</version>
    </dependency>
</dependencies>

<build>
    <finalName>chapter1.WordCount</finalName>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

项目编码

spark入门程序wordcount:

package com.fesco.bigdata.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * scala版本的wordcount
 */
object ScalaWordCountApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${ScalaWordCountApp.getClass.getSimpleName}")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//加载数据
val file: RDD[String] = sc.textFile("file:/E:/data/spark/hello.txt")

   //按照分隔符进行切分
val words:RDD[String] = lines.flatMap(line => line.split("\\s+"))

//每个单词记为1次
val pairs:RDD[(String, Int)] = words.map(word => (word, 1))

//聚合数据
val ret:RDD[(String, Int)] = pairs.reduceByKey(myReduceFunc)
//export data to external system
ret.foreach(println)

}
sc.stop()
}
def myReduceFunc(v1: Int, v2: Int): Int = {
v1 + v2
}
}

Master URL说明

首先在编程过程中,至少需要给spark程序传递一个参数master-url,通过sparkConf.setMaster来完成。改参数,代表的是spark作业的执行方式,或者指定的spark程序的cluster-manager的类型。

表-1 模式选择

master

含义

local

程序在本地运行,同时为本地程序提供一个线程来处理

local[M]

程序在本地运行,同时为本地程序分配M个工作线程

来处理

local[*]

程序在本地运行,同时为本地程序分配机器可用的CPU core的个数工作线程来处理

local[M, N]

程序在本地运行,同时为本地程序分配M个工作线程来处理,如果提交程序失败,会进行最多N次的重试

spark://ip:port

基于standalone的模式运行,提交撑到ip对应的master上运行

spark://ip1:port1,ip2:port2

基于standalone的ha模式运行,提交撑到ip对应的master上运行

yarn/启动脚本中的deploy-mode配置为cluster

基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面,在yarn集群中

yarn/启动脚本中的deploy-mode配置为client

基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面,不在yarn集群中

spark程序的其他提交方式

加载hdfs中的文件:

object RemoteSparkWordCountOps {

def main(args: Array[String]): Unit = {
    //创建程序入口
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //加载数据
    val file = sc.textFile("hdfs://hadoop101:8020//wordcount//words.txt")
    //切分
    val spliFile: RDD[String] = file.flatMap(_.split(" "))
    //每个单词记为1次
    val wordAndOne: RDD[(String, Int)] = spliFile.map((_, 1))
    //聚合
    val wordAndCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    //打印输出
    wordAndCount.foreach(println)
    //释放资源
    sc.stop()
}

}

提交spark程序到集群中

首先需要将spark-core模块进行打包,其次上传到集群中,才可以进行提交作业到spark或者yarn集群中运行。

1)Client:

bin/spark-submit \

--class chapter1.WordCount \

--master spark://hadoop101:7077 \

/root/word.jar \

hdfs://hadoop101:8020/wordcount/words.txt

2)Cluster:

bin/spark-submit \

--class chapter1.WordCount \

--master spark://hadoop101:7077 \

/root/word.jar \

hdfs://hadoop101:8020/wordcount/words.txt \

hdfs://hadoop101:8020/wordcount/output1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1631866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32定时器的OC比较和PWM

系列文章目录 STM32单片机系列专栏 C语言术语和结构总结专栏 文章目录 1. 输出比较(OC) 2. PWM 3. PWM的输出 3.1 高级定时器 3.2 通用定时器 4. PWM的输出结构 5. 代码示例 5.1 PWM.c 5.2 PWM.h 5.3 main.c 这篇文章解释了TIM定时器的内部时钟和外部时钟的使用&a…

AI图书推荐:ChatGPT写论文的流程与策略

论文一直是任何学术学位的顶峰。它展示了学生在研究领域的兴趣和专业知识。撰写论文也是一个学习经验&#xff0c;为学术工作以及专业研究角色做好准备。但是&#xff0c;论文工作总是艰苦的&#xff0c;通常是充满乐趣和创造性的&#xff0c;但有时也是乏味和无聊的。生成式人…

【kettle005】kettle访问Oracle数据库并处理数据至execl文件(已更新)

1.一直以来想写下基于kettle的系列文章&#xff0c;作为较火的数据ETL工具&#xff0c;也是日常项目开发中常用的一款工具&#xff0c;最近刚好挤时间梳理、总结下这块儿的知识体系。 2.熟悉、梳理、总结下Oracle数据库相关知识体系 3.欢迎批评指正&#xff0c;跪谢一键三连&am…

IT廉连看——UniApp——样式绑定

IT廉连看——UniApp——样式绑定 一、样式绑定 两种添加样式的方法&#xff1a; 1、第一种写法 写一个class属性&#xff0c;然后将css样式写在style中。 2、第二种写法 直接把style写在class后面 添加一些效果&#xff1a;字体大小 查看效果 证明这样添加样式是没有问题的…

数字签名学习

1 基本概念 数字签名是一种加密技术&#xff0c;用于验证信息来源的身份和数据的完整性。 就是对一个东西签上自己的名&#xff1b;收到的人可以验证这东西是你发的&#xff1b;这里是用数字的方式&#xff1b; 对字符串也可以签名&#xff0c;签名以后&#xff0c;还是一个…

OpenHarmony 项目实战:智能体重秤

一、简介 本 demo 基于 OpenHarmony3.1Beta 版本开发&#xff0c;该样例能够接入数字管家应用&#xff0c;通过数字管家应用监测体重秤上报数据&#xff0c;获得当前测量到的体重&#xff0c;身高&#xff0c;并在应用端形成一段时间内记录的体重值&#xff0c;以折线图的形式…

deepflow grafana plugin 编译问题解决

修改tsconfig.js 增加"noImplicitAny": false&#xff0c;解决代码类型没有指定&#xff0c;显示Any 错误 To solve the error, explicitly set the parameters type to any, use a more specific type or set noImplicitAny to false in tsconfig.json. https://b…

白话机器学习1:分类问题中的评价指标

机器学习中的评价指标非常多&#xff0c;它们用来衡量模型的性能和预测能力。不同类型的机器学习任务可能需要不同的评价指标。以下是一些常见的评价指标&#xff0c;按照不同类型的机器学习任务分类&#xff1a; 对于分类问题&#xff1a; 准确率&#xff08;Accuracy&#…

机器学习:基于Sklearn、XGBoost框架,使用XGBClassifier、支持向量分类器和决策树分类器预测乳腺癌是良性还是恶性

前言 系列专栏&#xff1a;机器学习&#xff1a;高级应用与实践【项目实战100】【2024】✨︎ 在本专栏中不仅包含一些适合初学者的最新机器学习项目&#xff0c;每个项目都处理一组不同的问题&#xff0c;包括监督和无监督学习、分类、回归和聚类&#xff0c;而且涉及创建深度学…

【Camera KMD ISP SubSystem笔记】CAM SYNC与DRQ②

DRQ的作用&#xff1a; DRQ负责调度管理pipeline里的node处理逻辑(通过node之间的dependency依赖机制) 利用多线程并行处理Pipeline中并行的node&#xff0c;加快处理速度 DRQ运转流程&#xff1a; DRQ先告诉node fill dependency&#xff0c; 此时seq id 为0…

BetterDisplay Pro for Mac:显示器校准软件

BetterDisplay Pro for Mac是一款出色的显示器校准软件&#xff0c;旨在提升你的视觉体验。它提供了准确的显示器参数调整&#xff0c;包括亮度、对比度、色温和色域等&#xff0c;让你的显示器呈现更真实、清晰、细腻的图像。此外&#xff0c;软件还提供多种预设模式和自定义选…

【PyTorch 实战3:YOLOv5检测模型】10min揭秘 YOLOv5 检测网络架构、工作原理以及pytorch代码实现(附代码实现!)

YOLOv5简介 YOLOv5&#xff08;You Only Look Once, Version 5&#xff09;是一种先进的目标检测模型&#xff0c;是YOLO系列的最新版本&#xff0c;由Ultralytics公司开发。该模型利用深度学习技术&#xff0c;能够在图像或视频中实时准确地检测出多个对象的位置及其类别&…

pycharm 安装“通义灵码“并测试

过程&#xff1a;“File>setting>Plugins” 提示&#xff1a; 翻译之后&#xff1a; 点击"接受"之后&#xff0c;提示一下图片&#xff0c;点击ok 安装完成&#xff1a; 安装完"通义灵码"之后&#xff0c;需要登陆&#xff0c;登陆后测试 参考…

Springboot + MySQL + html 实现文件的上传、存储、下载、删除

实现步骤及效果呈现如下&#xff1a; 1.创建数据库表&#xff1a; 表名&#xff1a;file_test 存储后的数据&#xff1a; 2.创建数据库表对应映射的实体类&#xff1a; import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.Table…

进程的概念(2)

进程优先级 1.什么的优先级 概念&#xff1a;指定进程获取某种资源&#xff08;CPU&#xff09;的先后顺序 本质&#xff1a;优先级的本质是优先级数字的大小&#xff0c;Linux中优先级数字越小&#xff0c;优先级越高 task_struct 进程控制快-> struct -> 内部字段 -&g…

MT3608B 航天民芯代理 1.2Mhz 24V输入 升压转换器

深圳市润泽芯电子有限公司为航天民芯一级代理商 技术支持欢迎试样~Tel&#xff1a;18028786817 简述 MT3608B是恒定频率的6针SOT23电流模式升压转换器&#xff0c;用于小型、低功耗应用。MT3608B开关频率为1.2MHz&#xff0c;允许使用微小、低电平成本电容器和电感器高度不…

UE5 GAS开发P40 周期性效果,持续治疗

Periodic Gameplay Effects周期性的游戏效果 它们在一段时间内以固定的间隔重复应用到目标上。这种效果通常用于表示持续性伤害、治疗或其他影响&#xff0c;例如中毒、灼烧或回复效果。 修改GE_CrystalHeal,在Period改为每0.1秒执行一次 假如同时有三个持续时间在进行,那么这…

万兆以太网MAC设计(11)完整UDP协议栈仿真

文章目录 前言一、模块接口二、IP模块与ARP模块之间的联系三、整体协议栈仿真总结&#xff1a; 前言 目前除了巨帧处理逻辑之外&#xff0c;所有的准备工作都已经结束了&#xff0c;先进行整体的功能验证。 一、模块接口 所有模块接口皆采用AXIS数据流的形式&#xff0c;其中…

机器学习:基于Sklearn框架,使用逻辑回归对由心脏病引发的死亡进行预测分析

前言 系列专栏&#xff1a;机器学习&#xff1a;高级应用与实践【项目实战100】【2024】✨︎ 在本专栏中不仅包含一些适合初学者的最新机器学习项目&#xff0c;每个项目都处理一组不同的问题&#xff0c;包括监督和无监督学习、分类、回归和聚类&#xff0c;而且涉及创建深度学…

小程序AI智能名片商城系统直连:打造用户与企业无缝对接的新时代!

在高度不确定性的商业环境中&#xff0c;企业如何快速响应市场变化&#xff0c;实现与用户的零距离接触&#xff1f;答案就是——小程序AI智能名片商城系统直连&#xff01;这一创新工具不仅为企业打开了与用户直接连接的大门&#xff0c;更为企业提供了持续收集用户反馈、快速…