初识Spark

news2024/11/17 4:46:10

一、简介

官网:Apache Spark™ - Unified Engine for large-scale data analytics

Apache的顶级项目,用于大规模数据处理的统一分析引擎。

支持语言:Java、Scala、Python和R (源码为Scala)

高级工具:

        1、SparkSQL用于SQL和结构化数据处理        

        2、提供Pandas API 可提供在 Apache Spark 上运行的、与 Pandas 等效的 API,从而填补这Pandas 不会横向扩展到大数据的空白

        3、MLlib用于机器学习

        4、GraphX用于图形处理, 和结构化流 用于增量计算和流处理

二、术语

Application基于Spark构建的用户程序。由集群上的驱动程序和执行程序组成
Application jar包含用户的Spark应用程序的jar。在某些情况下,用户希望创建一个包含其应用程序及其依赖项的“uber jar”。用户的jar永远不应该包含Hadoop或Spark库,但是,这些库将在运行时添加。下·下·下·
Driver program运行应用程序main()函数并创建SparkContext的进程
Cluster manager用于获取集群上资源的外部服务(例如独立管理器、Mesos、YARN)
Deploy mode区分驱动程序进程运行的位置。在“cluster”模式下,框架在集群内部启动驱动程序。在“client”模式下,提交者在集群外启动驱动程序。
Worker node任何可以在集群中运行应用程序代码的节点
Executor为工作节点上的应用程序启动的进程,该进程运行任务并将数据保存在内存或磁盘存储中。每个应用程序都有自己的执行器。
Task将发送到一个执行器的工作单元
Job由响应Spark操作而产生的多个任务组成的并行计算 (例如save、collect);
Stage每个作业被分成称为阶段的较小任务集,这些任务相互依赖(类似于MapReduce中的map和duce阶段);

三、架构

我看下官方的架构图:

 SparkContext 连接到 ClusterManager(可以是Spark自己的独立集群管理器、Mesos或YARN), ClusterManager在应用程序之间分配资源。一旦连接,Spark就会在集群中的WorkerNode上获取Executor,WorkerNode上会为应用程序启动一个可以计算和存储数据的进程,并把应用程序代码发送给Executor。最后,SparkContext将任务发送给Executor运行。

注意:

        1、不同的应用程序之间要想共享数据必须写入外部存储系统

        2、Driver program会一直监听Executor的执行情况

四、开发环境构建

        选择File>New>Project

        

         选择Maven,搜索scala,找到图中选中的模板

        

        选择路径并填写项目名称

        

        设置本地maven

        

        修改pom.xml文件,添加对spark的支持,完整的pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.study</groupId>
  <artifactId>spark</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.10.4</scala.version>
    <spark.version>2.2.0</spark.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>



    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>


    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>


  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

        同步maven

         

五、入门程序WordCount

        1、数据制作

        

        2、代码编写

package org.study

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //可以通过 SparkConf 为 Spark 绝大多数配置设置参数,且这些参数的优先级要高于系统属性
    //注意:一旦 SparkConf 传递给 Spark 后,就无法再对其进行修改,因为Spark不支持运行时修改
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    //Spark 的主要入口点 SparkContext 表示到Spark集群的连接,用于在该集群上创建RDD、累加器、广播变量
    //每个JVM只能有一个 SparkContext 处于活动状态
    val sc = new SparkContext(conf)
    //从HDFS、本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI读取文本文件,并将其作为字符串的RDD返回。
    val sourceRdd = sc.textFile("file/word_count_data.txt")
    //原始一行数据:js,c,vba,json,xml
    //flatMap将每行数据按照逗号分割,得到每个单词 形成 (单词1) (单词2) (单词1) ... 的格式
    //map将每个单词的次数都赋值成1形成 (单词1,1) (单词2,1) (单词1,次数) ... 的格式
    //reduceByKey将相同单词中的次数进行累加
    val resultRdd = sourceRdd.flatMap(_.split(",")).map(x=>{(x,1)}).reduceByKey(_+_)
    //打印结果
    resultRdd.foreach(println)
    //停止SparkContext
    sc.stop()

  }

}

        3、下载源码

           

        4、本地运行

        

六、运行模式

        1、本地运行

                通过SparkConf的setMaster方法设置成local或者local[n](表示本地起n个核跑任务)

                一般用于本地开发调试程序

        2、Standalone

                Spark自带的任务调度模式(不常用)

        3、Spark on Yarn (常用)

                通过spark-submit 中的 --deploy-mode 指定,默认为client

                a、client模式

                        Driver program 运行在执行spark-submit脚本的机器上,并接收集群上各个Executor的汇报,因此压力较大(本机挂了任务就失败了),但日志都会在本节点打印,适用于调试。

                b、cluster模式

                        Driver program 运行在集群环境中,如果Driver程序挂了还可以利用Yarn的失败重试机制重新运行,且大大降低和Executor通信的网络开销。

七、监控

        默认情况下,每个SparkContext都会在端口4040上启动一个Web UI,该UI显示有关应用程序的有用信息。这包括:

        1、Job、Stage、Task详细信息

        2、RDD大小和内存使用情况摘要

        3、环境信息

        4、可视化的DAG

        如果多个SparkContext在同一主机上运行,它们将绑定到连续的端口 从4040(4041、4042等)

        注意:此信息仅在应用程序期间可用。 若要在事后查看Web UI,请在启动之前将其(spark.eventLog.enabled )设置为true

        启动历史服务器,默认端口为18080

        ./sbin/start-history-server.sh

         

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1892084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA开发必备的插件,实测非常好用

1、Lombok -- 简化Java代码开发 推荐指数&#xff1a; ★★★★★ Lombok&#xff1a;首当其冲的非常推荐的当然是Lombok Lombok能以简单的注解形式来简化Java代码&#xff0c;提高开发人员的开发效率。例如开发中经常需要写的JavaBean&#xff0c;都需要花时间去添加相应的ge…

如何在忘记密码的情况下删除华为ID激活锁

当您手中拥有最新的华为手机时&#xff0c;您会忍不住探索新的可能性&#xff0c;以从您的设备中获得最大价值。您可以下载新的应用程序、Android 启动器等&#xff0c;但这些应用程序中的大多数都会给您的手机带来错误和安全威胁&#xff0c;如果不恢复出厂设置&#xff0c;可…

对标 GPT-4o 的开源实时语音多模态模型:Moshi

是由法国的 AI 实验室 Kyutai 推出的实时语音多模态模型&#xff0c;支持听、说、看&#xff0c;最关键的是你现在就可以在浏览器中使用&#xff0c;如果这个链接延迟高&#xff0c;可以试试这个, 无需输入邮箱&#xff0c;点击 Join queue 即可。 简单体验了下&#xff0c;比…

自学新标日第十七课(已完结)

第十七课 单词 单词假名声调词义洋服ようふく0西服セーター&#xff11;毛衣ノートバソコン&#xff14;笔记本电脑バイク1摩托车お汁粉おしるこ2年糕小豆汤天ぷらてんぷら&#xff10;天麩羅初詣はつもうで&#xff13;新年后首次参拜健康けんこう0健康恋愛れんあい0恋爱相手…

springboot 整合 mybatis-plus

一.前言 1. mybatis-plus是什么 mybatis-plus是一个对mybati框架的拓展框架&#xff0c;它在mybatis框架基础上做了许多的增强&#xff0c;帮助我们快速的进行代码开发。目前企业开发中&#xff0c;使用mybati的项目基本会选择使用mybatis-plus来提升开发效率。 2.官网地址&…

机器学习:预测评估8类指标

机器学习&#xff1a;8类预测评估指标 R方值、平均值绝对误差值MAE、均方误差MSE、均方误差根EMSE、中位数绝对误差MAD、平均绝对百分误差MAPE、可解释方差分EVS、均方根对数误差MLSE。 一、R方值 1、说明&#xff1a; R方值&#xff0c;也称为确定系数或拟合优度&#xff…

3099.力扣每日一题7/3 Java(击败100%)

博客主页&#xff1a;音符犹如代码系列专栏&#xff1a;算法练习关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 目录 思路 解题方法 时间复杂度 空间复杂度 Code 思路 首先要解决如何…

MobileVitv1替换yolov8主干网络

一、原理介绍 MobileViT模型是为移动设备设计的轻量级、通用目的视觉变换器。它融合了卷积神经网络&#xff08;CNN&#xff09;和视觉变换器&#xff08;ViT&#xff09;的优势&#xff0c;旨在在保持高效性能的同时减少模型参数和降低延迟。以下是关于MobileViT模型的主要原理…

2024企业数据资产化及数据资产入表方案梳理

01 数据资产入表&#xff1a;是一个将组织的各类数据资产进行登记、分类、评估和管理的流程。 数据资产包括&#xff1a;客户信息、交易记录、产品数据、财务数据等。 做个比喻吧&#xff1a;数据资产入表就像是给公司的数据资产做“人口普查”—— ①找出公司有哪些数据找…

在uni-app使用vue3使用vuex

在uni-app使用vue3使用vuex 1.在项目目录中新建一个store目录&#xff0c;并且新建一个index.js文件 import { createStore } from vuex;export default createStore({//数据&#xff0c;相当于datastate: {count:1,list: [{name: 测试1, value: test1},{name: 测试2, value: …

【DataSophon】DataSophon1.2.1服务组件开启 kerberos

目录 一、DataSophon是什么 1.1 DataSophon概述 1.2 架构概览 1.3 设计思想 二、集成组件 三、环境准备 四、安装kerberos服务 4.1 Zookeeper 4.2 HDFS 4.3 HBase 4.4 YARN 4.5 hive 【DataSophon】大数据管理平台DataSophon-1.2.1安装部署详细流程-CSDN博客 【Da…

Qt中udp指令,大小端,帧头帧尾实际示例

前言 虽然QT中&#xff0c;udp发送和接收&#xff0c;其实非常简单&#xff0c;但是实际工作中&#xff0c;其实涉及到帧头帧尾&#xff0c;字节对齐&#xff0c;以及大小端序的问题。比如网络中&#xff0c;正规的一般都是大端序&#xff0c;而不是小端序&#xff0c;大多数的…

2024鲲鹏昇腾创新大赛集训营Ascend C算子学习笔记

异构计算架构&#xff08;CANN&#xff09; 对标英伟达的CUDA CuDNN的核心软件层&#xff0c;向上支持多种AI框架&#xff0c;向下服务AI处理器&#xff0c;发挥承上启下的关键作用&#xff0c;是提升昇腾AI处理器计算效率的关键平台。主要包括有各种引擎、编译器、执行器、算…

mac磁盘工具如何合并分区 macos 磁盘工具 无法抹除 磁盘管理软件哪个使用率最高

一、什么是NTFS格式分区 NTFS格式分区是微软公司开发的诸多文件系统中的一种。NTFS格式分区是一种文件系统&#xff0c;磁盘只有在安装了文件系统后才能被正常使用&#xff0c;文件系统的格式有非常多&#xff0c;常见的有FAT 32和NTFS。 作为常见文件系统&#xff0c;NTFS格式…

Conmi的正确答案——ESP32-C3开启安全下载模式

IDF版本&#xff1a;4.4.7 注意事项&#xff1a;一旦烧录“安全下载模式”&#xff0c;模组将无法被读取或清理&#xff0c;只能通过eclipse原项目烧录程序进行重新烧录&#xff0c;无法再烧录其他固件。 20240703110201——追加解法&#xff0c;暂时无法解安全下载模式 &…

Python数据分析-股票数据分析(GARCH模型)

一、研究背景 随着金融市场的不断发展和全球经济的日益复杂&#xff0c;市场波动性和风险管理成为投资者和金融机构关注的焦点。波动率是衡量市场风险的重要指标&#xff0c;准确预测和评估波动率对于资产定价、风险控制和投资决策具有重要意义。在金融时间序列分析中&#xf…

上海会议论坛可以邀请哪些媒体?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 上海会议论坛可以邀请的媒体包括电视媒体、报纸媒体、网络媒体、视频媒体以及专业的媒体邀约机构。具体介绍如下&#xff1a; 电视媒体&#xff1a;上海的第一财经频道和东方财经频道等&…

《python程序语言设计》2018版第5章第51题利用turtle画18x18的格子

05.51.01version 先从第一一个格子来做 turtle.right(45) turtle.circle(18, steps4) turtle.hideturtle() turtle.done()这个代码很简单的现实出格子的样式。 现在的问题是循环的话。首先角度45度怎么处理 随着45度一次一次迭代。他是应该转4590呢还是4545呢&#xff1f;&…

【Mac】Boxy SVG for Mac(矢量图编辑器)及同类型软件介绍

软件介绍 Boxy SVG 是一款功能强大的矢量图形编辑器&#xff0c;专门为 macOS 平台设计开发。它主要用于创建和编辑 SVG&#xff08;可缩放矢量图形&#xff09;文件&#xff0c;是设计师和开发者们制作矢量图形的理想工具。 以下是关于 Boxy SVG 的主要特点和功能&#xff1a…

权限维持Linux---监控功能Strace后门命令自定义Alias后门

免责声明:本文仅做技术交流与学习... 目录 监控功能Strace后门 1、记录 sshd 明文 监控 筛选查看 2、记录sshd私钥 命令自定义Alias后门 1、简单粗鲁实现反弹&#xff1a; 靶机替换命令 攻击机监听上线 2.升级(让命令正常) 将反弹命令进行base64编码 替换alias命令 …