SparkCore

news2024/9/24 3:22:56

Spark是一种快速、通用、可扩展的大数据分析引擎

Spark的特点

  • Speed:快速高效

  • 性能比Hadoop MapReduce快100倍。即便是不将数据cache到内存中,其速度也是MapReduce10 倍以上。

  • Ease of Use:简洁易用

  • Spark支持 Java、Scala、Python和R等编程语言编写应用程序

  • Generality:通用、全栈式数据处理

  • Runs Everywhere:可以运行在各种资源调度框架和读写多种数据源

  • Spark支持的多种部署方案:Standalone是Spark自带的资源调度模式;Spark可以运行在Hadoop的YARN上面;Spark 可以运行在Mesos上(Mesos是一个类似于YARN的资源调度框架);Spark还可以Kubernetes实现容器化的资源调度

  • 丰富的数据源支持。Spark除了可以访问操作系统自身的本地文件系统和HDFS之外,还可以访问 Cassandra、HBase、Hive、Alluxio(Tachyon)以及任何 Hadoop兼容的数据源。这极大地方便了已经 的大数据系统进行顺利迁移到Spark。

面试题:MapReduce和Spark的本质区别:

  1. MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。

  1. spark既可以做离线计算,又可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)

有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用,shuffle时,数据可以不排序

MR和Spark在Shuffle时数据都落本地磁盘

  1. Spark架构体系

StandAlone模式是spark自带的集群运行模式,不依赖其他的资源调度框架,部署起来简单。

StandAlone模式又分为client模式和cluster模式,本质区别是Driver运行在哪里,如果Driver运行在SparkSubmit进程中就是Client模式,如果Driver运行在集群中就是Cluster模式

  1. Spark中重要角色

  • Master :是一个Java进程,接收Worker的注册信息和心跳、移除异常超时的Worker、接收客户端提交的任务、负责资源调度、命令Worker启动Executor。

  • Worker :是一个Java进程,负责管理当前节点的资源管理,向Master注册并定期发送心跳,负责启动Executor、并监控Executor的状态。

  • SparkSubmit :是一个Java进程,负责向Master提交任务。

  • Driver :是很多类的统称,可以认为SparkContext就是Driver,client模式Driver运行在SparkSubmit进程中,cluster模式单独运行在一个进程中,负责将用户编写的代码转成Tasks,然后调度到Executor中执行,并监控Task的状态和执行进度。

  • Executor :是一个Java进程,负责执行Driver端生成的Task,将Task放入线程中运行。

  • 启动Spark Shell程序

  • 什么是Spark Shell

  1. spark shell是spark中的交互式命令行客户端,可以在spark shell中使用scala编写spark程序,启动后默认已经创建了SparkContext,别名为sc

/opt/apps/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \
--master spark://node-1.51doit.cn:7077 --executor-memory 1g \
--total-executor-cores 3

 如果Master配置了HA高可用,需要指定两个Master(因为这两个Master任意一个都可能是Active状态)

/bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \
--master spark://node-1.51doit.cn:7077,node-2.51doit.cn:7077 \
--executor-memory 1g \
--total-executor-cores 3

参数说明:

--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口

--executor-memory 指定每一个executor的使用的内存大小

--total-executor-cores指定整个application总共使用了cores

  • 在shell中编写第一个spark程序

sc.textFile("hdfs://node-1.51doit.cn:9000/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://node-1.51doit.cn:9000/out")
  1. Scala编写Spark的WorkCount

  1. 创建一个Maven项目

  1. 在pom.xml中添加依赖和插件

<!-- 定义的一些常量 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <spark.version>3.2.3</spark.version>
    <scala.version>2.12.15</scala.version>
</properties>

<dependencies>
    <!-- scala的依赖 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>

</dependencies>

<!-- 配置Maven的镜像库 -->
<!-- 依赖下载国内镜像库 -->
<repositories>
    <repository>
        <id>nexus-aliyun</id>
        <name>Nexus aliyun</name>
        <layout>default</layout>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
    </repository>
</repositories>

<!-- maven插件下载国内镜像库 -->
<pluginRepositories>
    <pluginRepository>
        <id>ali-plugin</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
    </pluginRepository>
</pluginRepositories>

<build>
    <pluginManagement>
        <plugins>
            <!-- 编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <!-- 编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <!-- 打jar插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  1. 编写Spark程序

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 1.创建SparkContext
  * 2.创建RDD
  * 3.调用RDD的Transformation(s)方法
  * 4.调用Action
  * 5.释放资源
  */
object WordCount {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("WordCount")
    //创建SparkContext,使用SparkContext来创建RDD
    val sc: SparkContext = new SparkContext(conf)
    //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API
    //使用SparkContext创建RDD
    val lines: RDD[String] = sc.textFile(args(0))

    //Transformation 开始 //
    //切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //将单词和一组合放在元组中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    //分组聚合,reduceByKey可以先局部聚合再全局聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    //Transformation 结束 //

    //调用Action将计算结果保存到HDFS中
    sorted.saveAsTextFile(args(1))
    //释放资源
    sc.stop()
  }
}
  1. Java编写Spark的WordCount

  2. import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class JavaWordCount {
    
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
            //创建JavaSparkContext
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            //使用JavaSparkContext创建RDD
            JavaRDD<String> lines = jsc.textFile(args[0]);
            //调用Transformation(s)
            //切分压平
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
            });
            //将单词和一组合在一起
            JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(String word) throws Exception {
                            return Tuple2.apply(word, 1);
                        }
            });
            //分组聚合
            JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            //排序,先调换KV的顺序VK
            JavaPairRDD<Integer, String> swapped = reduced.mapToPair(
                    new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                    return tp.swap();
                }
            });
            //再排序
            JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
            //再调换顺序
            JavaPairRDD<String, Integer> result = sorted.mapToPair(
                    new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                    return tp.swap();
                }
            });
            //触发Action,将数据保存到HDFS
            result.saveAsTextFile(args[1]);
            //释放资源
            jsc.stop();
        }
    }
     

  3. 使用Lambda表达式方式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount");
        //创建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //创建RDD
        JavaRDD<String> lines = jsc.textFile(args[0]);
        //切分压平
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        //将单词和一组合
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1));
        //分组聚合
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
        //调换顺序
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(tp -> tp.swap());
        //排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        //调换顺序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());
        //将数据保存到HDFS
        result.saveAsTextFile(args[1]);
        //释放资源
        jsc.stop();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/713366.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ctfshow—红包4

0x00 前言 CTF 加解密合集&#xff1a;CTF 加解密合集 0x01 题目 from secrets import randbelow from nationalsecret import p, r, k, flagg 2 y pow(g, k, p)def gogogo():print("Another chance:")t int(input(t ))c randbelow(p)print("Here is my…

关于anaconda的python虚拟环境

1.查看anaconda的虚拟环境 在cmd中输入 conda env list //查看conda中的虚拟环境和 activate 环境名称 //激活环境 pip list //查看该环境的包 python // 可以查看该环境python的版本 deactivate //退出环境2.使用anaconda创建新的…

Java Web HTMLCSS(2)23.6.30

2&#xff0c;CSS 2.1 概述 CSS 是一门语言&#xff0c;用于控制网页表现。我们之前介绍过W3C标准。W3C标准规定了网页是由以下组成&#xff1a; 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript CSS也有一个专业的名字&#xff1a;Cascading Style Sh…

如何在 macOS 上同时使用 Flutter2 和 Flutter3 进行 ios 开发

如何在 macOS 上同时使用 Flutter2 和 Flutter3 进行 ios 开发 前言 猫哥主打系统环境是: macos flutter 3.7.12 ruby 3.2.2 cocoapods 1.12.1 xcode 14.3.1 这套配置运行最新的项目没问题&#xff0c;但是最近需要维护 flutter 2.10.5 这种老项目&#xff0c;虽然用了 fvm 进…

武汉理工大学第四届ACM校赛(部分补题与写题)

开裂 目录 k-雇佣农民 题目描述 输入描述: 输出描述: 输入 输出 备注: 小e的苹果树 不降序列 k-雇佣农民 题目描述 Ly很喜欢星际争霸二这款游戏&#xff0c;但是他现在玩不到了。所以Ly现在只能做一个关于农民的题消磨时光。 开始时Ly没有任何农民&#xff0c;第i天白…

三、QPushButton的使用,信号和槽

QT从入门到实战学习笔记 一、QPushButton的创建二、中文要设置成UTF-8格式才不会乱码三、对象树1、验证被释放掉 四、QT窗口坐标系九、信号和槽---点击按钮关闭窗口1、查询signal信号的定义&#xff08;帮助文档&#xff09;2、搜索QWidget查找槽函数&#xff08;slot是槽的意思…

华为freebuds 5无线充电充不上电怎么办?

相信很多人都会遇到跟我一样的问题&#xff0c;华为FreeBuds 5无线充电充不进电是怎么回事&#xff1f;为此我专门整理了以下的经验&#xff0c;相信对大家有所帮助。 1. 充电时要把耳机盒保护套拆下来&#xff0c;耳机盒与充电底座之间不要有东西挡着。这样耳机盒充电时可以更…

jenkins的环境搭建

jenkins 环境 安装 我之前使用war安装、安装比较简单、就是jenkins的 对应的插件不能下载下来、后来发现是版本的问题、使用docker-compose 安装、jenkins安装 插件很容易安装下来 1、安装jdk 解压jdk 配置环境变量 #set java environment JAVA_HOME/usr/local/jdk1.8.0_281…

基于单片机和GSM短信模块的家庭防盗火灾安全报警系统

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;627短信 获取完整论文报告&#xff08;含无水印图片和代码&#xff09; 本系统主要由单片机和GSM短信模块组成&#xff0c;借助最可靠、最成熟的GSM移动网络&#xff0c;以最直观的中文短消息或电话形式&#xff0c;直接把…

392.04亿元?台积电公布下一代工艺发展路线图,2纳米2025 年投产

台积电近日在日本举办了一次研讨会&#xff0c;详细介绍了N3E工艺节点的最新进展和引人注目的性能提升。此外&#xff0c;台积电还公布了令人期待的下一代N2工艺的发展路线图。 台积电副总裁Kevin Zhang透露&#xff0c;公司正以迅猛速度发展&#xff0c;预计2022年的投资金额将…

360测试开发技术面试题目

目录 一、java方面 二、Linux方面 三、数据库方面 四、性能测试方面 五、HTTP协议方面 六、其他 总结&#xff1a; 最近面试了360测试开发的职位&#xff0c;将面试题整理出来分享~ 一、java方面 1、java重载和重写的区别 重载overloading 多个方法、相同的名字&#x…

CF578A(直线方程 + 数学) 1700

CF578A(直线方程 数学) 1700 有一条折线 &#xff0c; 这个折线经过这样一组点&#xff1a; (0,0) - (x,x) - (2x,0) - (3x,x) - (4x,0) - ....现给出折线上一点 &#xff0c; 求 x 的最小值 思路&#xff1a;我们不妨用解方程的思想 &#xff0c;先写出折线的方程&#xf…

AIGC-stable-diffusion系列1- stable-diffusion-webui

安装方法1&#xff0c;源码安装 参考 repo参考地址&#xff1a;https://github.com/AUTOMATIC1111/stable-diffusion-webui python下载地址&#xff1a;https://www.python.org/downloads/release/python-3106/ git下载地址&#xff1a;https://git-scm.com/download/win 官…

逻辑回归模型

目录 引言 逻辑回归的理论基础 逻辑回归的实践 实战案例&#xff1a;银行营销预测 超越逻辑回归 引言 我们在上一篇文章中讨论了线性回归模型&#xff0c;探讨了如何利用它来解决连续变量预测的问题。今天&#xff0c;我们将转向一种新的模型——逻辑回归&#xff0c;它用…

TIDB v7.1 reource control资源管控特性体验贴

作者&#xff1a; bert 原文来源&#xff1a; https://tidb.net/blog/60c87e38 TIDB v7.1 reource control资源管控特性体验贴 1. 使用场景&#xff1a; 定义&#xff1a;TIDB的资源管控 (Resource Control) &#xff0c;使用资源管控特性&#xff0c;将用户绑定到某个资源…

摇骰子设计与实现(uni-app微信小程序)

文章目录 摇骰子设计与实现准备工作实现步骤以及思路第一步&#xff1a;实现准备状态第二步&#xff1a;实现晃动中状态第三步&#xff1a;等待开起状态第四步&#xff1a;开启后状态部分优化 总代码 摇骰子设计与实现 手机摇一摇可以摇骰子&#xff0c;上划可查看结果&#x…

桥梁健康监测:时刻感知桥梁“脉搏”

随着交通量的不断增加&#xff0c;桥梁作为一种重要的交通基础设施&#xff0c;其安全性和可靠性面临着日益严峻的挑战。桥梁健康监测是保障桥梁安全和预防桥梁事故的重要手段&#xff0c;本文将介绍桥梁健康监测的意义、技术手段和应用案例。 一、桥梁健康监测的意义 保障交通…

解决React18+ts项目导入模块的声明报错

路径配置 项目路径别名的配置 ts对指向src的目录提示是不支持的 所以需要手动配置符号指向 在vite.config.ts import path from path export default defineConfig({plugins:[react()],resolve:{alias:{"":path.resolve(__dirname, ./src)}} })但这时path模块引入会…

阿里30K测试开发岗位面试过程

面试总结 a.测开岗考察内容与软开岗类似&#xff0c;难度相对较小 b.阿里是一面技术面试官协调推进面试流程&#xff0c;HR参与较少 c.遇到的面试官都很nice 一面 自我介绍项目C基础 C底层如何进行内存分配 C是面向对象的编程&#xff0c;类中默认的拷贝构造函数是浅复制…

使用 ANTMAN 工具替换 OceanBase 云平台节点

OceanBase 环境基本都会先安装 OCP 来部署、监控、运维数据库集群。但如果有机器过保等问题&#xff0c;就需要有平稳的 OCP 节点的替换方案。 作者&#xff1a;张瑞远 上海某公司 DBA&#xff0c;曾经从事银行、证券数仓设计、开发、优化类工作&#xff0c;现主要从事电信级 I…